2019-04-08 13:24:50 UTC - Mark Marijnissen: How do I enable websockets on the
Pulsar Proxy?
----
2019-04-08 16:00:01 UTC - Gene Fojtik: @Gene Fojtik has joined the channel
----
2019-04-08 16:11:46 UTC - Devin G. Bost: I'm looking for ideas of how to create
graceful continuous deployments at-scale (with downtime < 300 ms). I'm
considering deploying to a new tenant, renaming the old tenant, and then
renaming the new tenant to the old tenant name. However, I'm not sure if that
will allow me to gracefully roll over existing consumers and producers.
Are there any thoughts on this approach or suggestions of how to create a
better continuous deployment process?
----
2019-04-08 16:16:05 UTC - Devin G. Bost: We also can't have data loss, so
that's another issue that must be considered.
----
2019-04-08 16:22:49 UTC - Kenan Dalley: Hi. I've run into an issue with Pulsar
Functions and the Jackson JSON library. My function is defined as
Function<MyModel,MyModel>, where MyModel has a String id, FruitEnum
fruitType (Apple, Orange, etc) and a long stateCounter. My function fills in
the id & counter and my Producer sends in the fruitType.
When I run with my own Producer, which sends out a MyModel, it runs fine. But
when I attempted to use "bin/pulsar-client functions produce -m
"{fruitType=\"Orange\"}", Jackson blew up the function with the message
"JsonParseException: Unexpected character ('f' (code 102)): was expecting
double-quote to start field name."
I understand what Jackson wants here, but the underlying issue is that, because
of this error and that the offset wasn't committed, my function was pushed into
an infinite loop of restarts on the server because it kept trying to read that
message over and over when it was restarted.
3 questions
1. Is there a way to configure Jackson to be looser with it's parsing for
Functions?
2. Is there a way to manually, or automatically, change the commit the f(n) is
looking at on the fly?
3. Is there a way to intercept the message prior to the Jackson call to do
message type validation so that this can be prevented?
This situation is definitely not good for an Enterprise-level application.
----
2019-04-08 16:42:08 UTC - David Kjerrumgaard: @Kenan Dalley For #1, you can
configure the Jackson parser to allow unquoted field names.
`mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);`
----
2019-04-08 16:46:51 UTC - David Kjerrumgaard: @Kenan Dalley for #2, you can add
the following property to the function deployment which will cause the messages
to be acknowledged automatically upon processing, rather than requiring you to
perform the ack in the code. `"autoAck": true,`
----
2019-04-08 17:05:04 UTC - Kenan Dalley: Adding "--auto-ack true" to the
function's create/update statement didn't fix anything. It's still going into
an infinite loop.
----
2019-04-08 17:12:15 UTC - Matteo Merli: @Mark Marijnissen Currently the
websocket service is not integrated in the Pulsar proxy. You have to start it
on its own with `pulsar websocket`
----
2019-04-08 17:13:15 UTC - Matteo Merli: @Devin G. Bost You mean Pulsar
deployments or your application deployments?
----
2019-04-08 17:21:44 UTC - Kenan Dalley: @David Kjerrumgaard Where would I go to
configure the mapper for either that function or all functions? Since the
function class itself is after the conversion, that would be too late and I
don't know anywhere else that makes sense.
----
2019-04-08 17:22:38 UTC - Mandi Goddard: @Jon Bock I am trying to find some way
to communicate with students who don't read their emails. Any suggestions?
----
2019-04-08 17:34:17 UTC - Devin G. Bost: @Matteo Merli
Great question. Thanks for asking. I mean application deployments (e.g. a set
of Pulsar functions, sinks, sources, topics, and namespaces).
----
2019-04-08 17:37:29 UTC - Guy Feldman: @Mandi Goddard sounds like more of a job
for an email sending service like Mailjet or easysendy
----
2019-04-08 17:37:31 UTC - Matteo Merli: Ok, in any case (even if it were for a
Pulsar deployement) there would be no downtime or data loss.
For functions, you will issue an “update” of a running function and that will
trigger a rolling restart of the instances. Across multiple instances there
will still be some instance running.
Regarding data loss, functions use a consumer which has a subscription
associated. If the function doesn’t process the messages, the messages won’t be
acknowledged and therefore the broker will keep them.
----
2019-04-08 17:40:14 UTC - Devin G. Bost: Thanks for the guidance.
----
2019-04-08 17:43:16 UTC - Sanjeev Kulkarni: @Kenan Dalley There is a
maxMessageRetries parameter in the FunctionConfig. By default its -1(which is
forever), but you can send in a finite value which should attempt the framework
send the message at max those many times before giving up
----
2019-04-08 17:43:40 UTC - Devin G. Bost: What would be involved if I wanted to
create something like an "upsert" operation that would create a component (if
it doesn't exist) or update a component (if it does)?
----
2019-04-08 17:48:35 UTC - Kenan Dalley: @Sanjeev Kulkarni I tried that too (set
to 1) and I tried the dead-letter queue setup as well and nothing has worked.
It continues to fail the function, no ack and restarts the function an infinite
number of times. It's as though all of these other config settings are
activated after the failure occurs and are made irrelevant.
----
2019-04-08 17:50:01 UTC - Sanjeev Kulkarni: when/where is the error occuring?
----
2019-04-08 17:50:04 UTC - Sanjeev Kulkarni: is there a stack trace
----
2019-04-08 17:56:36 UTC - Matteo Merli: Not sure :slightly_smiling_face:
----
2019-04-08 17:57:05 UTC - Matteo Merli: You can try “functions create” and if
it fails fallback to update.. I guess it’s not ideal..
----
2019-04-08 17:58:33 UTC - Jerry Peng: i see
----
2019-04-08 17:58:50 UTC - Kenan Dalley: Not easy on my phone.
:slightly_smiling_face:
Here's what the pulsar function log has:
17:01:23.343 [pulsar-client-io-1-1] INFO
com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider
initialized
17:01:23.413 [public/default/FnFruit-0] ERROR
org.apache.pulsar.functions.instance.JavaInstanceRunnable -
[public/default/FnFruit:0] Uncaught exception in Java Instance
org.apache.pulsar.client.api.SchemaSerializationException:
org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParseException:
Unexpected character ('f' (code 102)): was expecting double-quote to start
field name
at [Source: (byte[])"{fruitType="Pear"}"; line: 1, column: 3]
at
org.apache.pulsar.client.impl.schema.JSONSchema.decode(JSONSchema.java:84)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:233)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:458)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:243)
[java-instance.jar:2.3.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by:
org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParseException:
Unexpected character ('f' (code 102)): was expecting double-quote to start
field name
at [Source: (byte[])"{fruitType="Pear"}"; line: 1, column: 3]
at
org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:669)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:567)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:1988)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1639)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:725)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
~[java-instance.jar:2.3.0]
at
org.apache.pulsar.client.impl.schema.JSONSchema.decode(JSONSchema.java:82)
~[java-instance.jar:2.3.0]
... 5 more
17:01:23.420 [public/default/FnFruit-0] INFO
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance
----
2019-04-08 17:58:56 UTC - Jerry Peng: and interesting logs printed in the
broker log related to that topic?
----
2019-04-08 17:59:22 UTC - Jerry Peng: partitioned topic or not partitioned
topic?
----
2019-04-08 18:19:28 UTC - Sanjeev Kulkarni: so this is the pulsar’s schema
trying to intepret the json and failing. For now, can you interprit the
messages as bytes[] and do the conversion yourself/
----
2019-04-08 18:19:53 UTC - Sanjeev Kulkarni: essentially your function signature
will change to <byte[], MyModel>
----
2019-04-08 18:20:15 UTC - Sanjeev Kulkarni: and inside your function, you can
then try to interpret loosely the bytes to json object and then do the
manipulation
----
2019-04-08 18:24:57 UTC - Kenan Dalley: That seems like a valid approach, but
definitely not ideal. I'll try it out.
----
2019-04-08 18:43:31 UTC - Sanjeev Kulkarni: @Kenan Dalley agreed.
<https://github.com/apache/pulsar/pull/4004> should fix the behavior for next
release
----
2019-04-08 18:49:24 UTC - Devin G. Bost: I'm adding an `upsert` method to
Pulsar-Admin to enable conditional creation/update (where it creates a
component if it doesn't exist and updates the component if it does exist). This
method will make rolling deployments easier because we can just call this
single method on every component in our project tree during our deployments.
In my fork, I added `upsertFunction` to
`pulsar-functions/worker/src/main/java/org.apache.pulsar.functions.worker/rest/api/ComponentImpl.java`
and added upsert methods to:
`pulsar-broker/src/main/java/org.apache.pulsar/broker/admin/impl/FunctionsBase.java`,
`SinkBase.java`, and `SourceBase.java`.
Adding upsert to NamespacesBase and TenantsBase looks more involved, so I
haven't done that yet.
Are there any other points that I need to include before I submit a PR with
these changes?
----
2019-04-08 18:50:02 UTC - Devin G. Bost: At some point, I’d also like to
implement a bulk-upsert method where I can pass in a manifest file and trigger
upserts in parallel for everything in the manifest file.
----
2019-04-08 18:50:16 UTC - Devin G. Bost: The purpose of these changes is to
simplify (and speed up) continuous deployment.
+1 : David Kjerrumgaard, Karthik Ramasamy
----
2019-04-08 19:08:06 UTC - John Crawford: non-partitioned
----
2019-04-08 19:08:10 UTC - John Crawford: seeing this log again:
----
2019-04-08 19:08:36 UTC - John Crawford: ```
19:02:46.150 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] WARN
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl -
[02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/persistent/raw-pallets]-4372
read entry timeout for 0-0 after 120 sec
19:02:46.151 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] WARN
org.apache.bookkeeper.mledger.impl.OpReadEntry -
[02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/persistent/raw-pallets][etl-production]
read failed from ledger at position:4372:0 : Bookie operation timeout
19:02:46.151 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] ERROR
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
-
[<persistent://02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/raw-pallets>
/ etl-production] Error reading entries at 4372:0 : Bookie operation timeout,
Read Type Normal - Retrying to read in 58.832 seconds
```
----
2019-04-08 19:08:37 UTC - Devin G. Bost: Alternatively, if there are objections
to the `upsert` approach, we could still build a tree of components from a
manifest file with a `bulkDeploy` method, compute differences between the
manifest tree and the tree of components currently existing in Pulsar, and then
conditionally call create, update, and/or delete methods to bring Pulsar
in-sync with the manifest file.
Feedback on preferences would be helpful.
----
2019-04-08 19:09:59 UTC - John Crawford: the further down the log (58 seconds
later):
----
2019-04-08 19:10:07 UTC - John Crawford: ```
19:03:44.983 [pulsar-io-23-3] INFO
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
-
[<persistent://02efa741-a96f-4124-a463-ae13a704b8fc/pXeyflmLEUEnpjUa5radVPTTj44PSG1P/raw-pallets>
/ etl-production] Retrying read operation
19:03:44.983 [pulsar-io-23-3] ERROR
org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught
java.lang.ArrayIndexOutOfBoundsException: null
```
----
2019-04-08 19:10:34 UTC - John Crawford: don’t see anything but GC logs in
bookkeeper logs
----
2019-04-08 19:14:04 UTC - Grant Wu: @Kenan Dalley Why can’t you quote your
field names?
----
2019-04-08 19:14:24 UTC - Grant Wu: Where is the input coming from that is so
blatantly non-compliant with the spec
----
2019-04-08 19:14:46 UTC - Grant Wu: I was considering using the JSON schema in
the future, I would prefer for this to either be tunable or not changed
----
2019-04-08 19:16:04 UTC - Grant Wu: if we allow non-quoted field names, we
immediately break the Python JSON parser as well:
```
>>> import json
>>> json.loads('{foo: "bar"}')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File
"/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/__init__.py",
line 348, in loads
return _default_decoder.decode(s)
File
"/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/decoder.py",
line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File
"/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/decoder.py",
line 353, in raw_decode
obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting property name enclosed in double
quotes: line 1 column 2 (char 1)
```
----
2019-04-08 19:22:13 UTC - Grant Wu: To clarify my objection: if we allow this,
then consumers assuming that the input they get is valid JSON will all now need
to be able to handle this non-compliant JSON. So, for example, the standard
JSON parsing library in Python would no longer suffice; we would need to pull
in a third party dependency.
<https://stackoverflow.com/questions/39491420/python-jsonexpecting-property-name-enclosed-in-double-quotes>
----
2019-04-08 19:23:42 UTC - Kenan Dalley: @Grant Wu I'd prefer it to be tunable
because I cannot guarantee that the data that comes in will be 100% compliant
with the spec.
Also, I personally don't agree that field names not being surrounded by double
quotes is something that should completely fail the object mapper since it's a
very widely used pattern. Otherwise, it wouldn't be an option to override.
----
2019-04-08 19:28:58 UTC - Grant Wu: We should probably move this discussion to
the PR
+1 : Devin G. Bost
----
2019-04-08 19:30:06 UTC - Ryan Samo: Hey guys,
When using Pulsar SQL (presto) I do not see an obvious way to pass in the admin
certificates. Since we are using TLS, do you have any idea what config the
tlsCert, tlsKey, and rootCA would reside for presto?
----
2019-04-08 19:30:36 UTC - Ryan Samo: Get 401 unauthorized if we don’t provide
the certs somehow
----
2019-04-08 19:32:40 UTC - Grant Wu: I put a comment there
----
2019-04-08 19:44:05 UTC - Grant Wu: I’m not sure what the best approach is
here, but here are my thoughts:
1. This would be welcome, my deployment script for Pulsar Functions currently
is quite ugly, it greps the output of pulsar-admin functions get.
2. A manifest approach kind of feels somewhat heavyweight, and it might overlap
with native k8s tooling (for those of us running Pulsar in k8s). Pulsar
Functions are the exception here because they’re not materialized as k8s
resources anywhere (possible future avenue of work)
3. Possible alternative name to “upsert” - “put” - better matches the HTTP
verbs that pulsar-admin already uses
----
2019-04-08 19:44:16 UTC - Kenan Dalley: Dittos. :blush:
----
2019-04-08 20:46:50 UTC - Devin G. Bost: Thanks for the feedback.
----
2019-04-08 20:51:34 UTC - Devin G. Bost: If I were to create a PR, how long do
you think it would take before it would end up in a release?
----
2019-04-08 20:52:47 UTC - Grant Wu: I don’t know exactly - not a maintainer -
but <https://github.com/apache/pulsar/releases> can give you some sense of how
frequently releases happen
----
2019-04-08 21:50:39 UTC - Devin G. Bost: Is there going to be a Pulsar
conference?
----
2019-04-08 21:52:26 UTC - Sanjeev Kulkarni: Hopefully soon
+1 : Devin G. Bost, Ali Ahmed, Ezequiel Lovelle, Karthik Ramasamy, Yuvaraj
Loganathan, Shivji Kumar Jha
bananadance : Yuvaraj Loganathan
----
2019-04-08 23:34:53 UTC - Devin G. Bost: Regarding the upsert function, I made
the changes in my fork and got the build to succeed. I think I updated all of
the touch-points correctly (though I may be missing something). When I try to
call it from the admin API, I get null output with:
`Reason: javax.ws.rs.ProcessingException: Java heap space`
I tried updating my pulsar_env.sh line to:
`PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx8g -XX:MaxDirectMemorySize=12g"}`
but I got the same issue. Any ideas?
----
2019-04-08 23:45:59 UTC - Ali Ahmed: @Devin G. Bost you have to change
pulsar_tools_env.sh
----
2019-04-08 23:46:18 UTC - Devin G. Bost: Oh, thanks! I'll try that!
----
2019-04-08 23:52:38 UTC - Sijie Guo: @young sorry that I forgot to reply to you
yesterday. from the exception, it seems that you are using both pulsar-client
and pulsar-client-admin? what dependencies do you include for your java program?
----
2019-04-08 23:54:24 UTC - Devin G. Bost: It worked! Thanks!
----
2019-04-09 02:00:31 UTC - young: @Sijie Guo thanks for reply. I only import
org.apache.pulsar.client.api.Producer and
org.apache.pulsar.client.api.PulsarClientException , no pulsar-client-admin.
----
2019-04-09 02:07:19 UTC - Sijie Guo: are you using pulsar-client or
pulsar-client-original? what version are you using?
----
2019-04-09 02:08:04 UTC - young: further more,when I run a pulsar cluster by
standalone on my own machine, the java code pass the test. but the same code
running on the server machine will report exception.
----
2019-04-09 02:10:45 UTC - young: @Sijie Guo the pulsar-client, version is
pulsar-client-api-2.3.0.jar.
----
2019-04-09 02:11:40 UTC - Sijie Guo: are you using maven or gradle to build the
java program?
----
2019-04-09 02:14:31 UTC - young: both not yet, just a simple test.
----
2019-04-09 02:17:03 UTC - Sijie Guo: it seems that you are invoking the test
from eclipse and the exception seems to show it failed to load some default
configuration from async http client.
----
2019-04-09 02:17:19 UTC - Sijie Guo: I am not sure how your test is running in
eclipse.
----
2019-04-09 02:17:29 UTC - Sijie Guo: can you try to use pulsar-client-original
dependency?
----
2019-04-09 02:22:15 UTC - young: Running the test in eclipse connect to my own
machine that deploy the standalone pulsar is ok, but I package to a jar file
and exec the command" java -jar MessageProducer.jar " by command line on the
server machine is failed.
----
2019-04-09 02:23:27 UTC - Sijie Guo: > but I package to a jar file and exec
the command” java -jar MessageProducer.jar ” by command line on the server
machine is failed.
how do you package the jar file?
----
2019-04-09 02:25:59 UTC - young: eclipse file menu ---export ---runnable jar
file---Package required libraries into generated JAR...
----
2019-04-09 02:27:44 UTC - Sijie Guo: okay I am not sure how eclipse do that.
but asynchttpclient has some resources files which should be packaged into the
JAR, otherwise the asynchttpclient will fail to load those resources files.
----
2019-04-09 02:33:44 UTC - young: oh, and then,any normal style of deploying
java code to server machine for your recommendations?
----
2019-04-09 02:36:30 UTC - Sijie Guo: can you try using pulsar-client-original?
----
2019-04-09 02:41:01 UTC - young: thanks, I will try it.
----
2019-04-09 07:02:32 UTC - Mark Marijnissen: Thanks for the info.
:slightly_smiling_face:
----
2019-04-09 07:04:56 UTC - Mark Marijnissen: Do Pulsar clients need to be able
to connect to a specific broker, or will any broker do? The docs say they are
stateless, so I expect any broker to work.
(Follow up question: If any broker will do, why do is the Pulsar proxy enabled
in a Kubernetes deployment?)
(Context: I want to use nodejs to send messages. The node client with the c++
lib doesn't compile, so I want to use the websocket API as it's a low-volume
use case. But the websocket API is not supported on the proxy)
----
2019-04-09 07:10:59 UTC - Ali Ahmed: @Mark Marijnissen connecting to any broker
is fine the proxy is a transparent tcp proxy it’s there for convenience in
certain kind of deployments.
nodejs should start to stabilize it’s in in early stages of developmet, you can
open issues if you are unable to compile.
the websocket is supported by the proxy if it’s having problems please open a
issue in github.
----
2019-04-09 07:23:55 UTC - Matteo Merli: > Do Pulsar clients need to be able
to connect to a specific broker, or will any broker do? The docs say they are
stateless, so I expect any broker to work.
A client can connect initially to any broker, though it needs to be able to
connect directly to all the brokers.
If direct connection is not possible (or desirable), then the proxy is a
possible solution.
----
2019-04-09 08:50:33 UTC - Mark Marijnissen: A, that is useful to know.
In a kubernetes deployment, will the client receive an IP address of a pod? Or
will it receive the service-IP adress, thus obscuring the actual pod (i.e.
broker) that is there?
----