2020-04-29 10:48:40 UTC - Vincent: Hi guys, (running pulsar standalone) was
trying to putstate using pulsar-admin
`./bin/pulsar-admin functions putstate --tenant public --namespace default
--name word_count --state '{"key":"pulsar-1", "stringValue":"hello"}'`
but was greeted with this message
"State storage client is not done initializing. Please try again in a little
while."
Wondering what's wrong with it, previously on 2.4.1 it's working fine
Using 2.4.2 now
----
2020-04-29 11:23:55 UTC - Subash Kunjupillai: As we were running our
application on the same machine of broker, so we pointed pulsar/lib directory
for pulsar client jar as classpath. But now we provided only the pulsar-client
shaded jar as you mentioned and we are able to bring up our application without
any class conflict. Thanks @Sijie Guo! But I'm wondering why pulsar-client jar
was not packaged with Pulsar? Because we always depend on the libraries
available in lib directory on an application for runtime rather than deploying
them through our application. Any reason behind this?
----
2020-04-29 12:36:25 UTC - Kirill Kosenko: Try to add
--state-storage-service-url <bk://localhost:4181>
`bin/pulsar-admin functions localrun --jar
examples/pulsar-function-test-1.0-SNAPSHOT.jar --classname
org.apache.pulsar.functions.api.examples.WordCountFunction --inputs
<persistent://public/default/wordscount> --name wordcount
--state-storage-service-url <bk://localhost:4181>`
----
2020-04-29 12:43:07 UTC - Frank Kelly: Perfect - thanks so much
----
2020-04-29 13:16:09 UTC - Guilherme Perinazzo: The "KeyShared" subscription
type is not in the 1.1.0 release
----
2020-04-29 13:35:43 UTC - Gilles Barbier: oh, I did check on master, but not on
the 1.1 branch. You are right, sorry
----
2020-04-29 13:54:21 UTC - Gilles Barbier: Following the discussion
<https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1584035280260400?thread_ts=1584012254.251600&cid=C5Z4T36F7>
with @Sijie Guo, I’m trying to create a function with a keyShared subscription:
• I’ve tried to use --processing-guarantee but we can obtain only `failover` or
`shared`
• I’ve tried to create a subscription with keyShared before creating the
function with the same subscription, but then pulsar told me “Failed to create
consumer: Subscription is of different type”
So I guess this feature is not supported - could someone guide me into the
code, I would like to propose a PR? Thx
----
2020-04-29 14:03:33 UTC - Jannis: @Jannis has joined the channel
----
2020-04-29 14:03:41 UTC - Konstantinos Papalias: I'm pretty sure there is an
open issue around this
----
2020-04-29 14:03:57 UTC - Konstantinos Papalias: so this functionality is not
supported currently
----
2020-04-29 14:04:23 UTC - Gilles Barbier: yes, I did open it
<https://github.com/apache/pulsar/issues/6527>
----
2020-04-29 16:20:29 UTC - Gary Fredericks: @Gary Fredericks has joined the
channel
----
2020-04-29 16:26:31 UTC - Gary Fredericks: I've had the following experience
repeatedly, and was hoping somebody could confirm that this should _not_ happen:
• create a reader on a persistent non-partitioned topic with infinite retention
• call `seek(t1)`
• read some initial messages that don't reflect the seek call, because seek
doesn't seem to be immediately effective for reasons I don't understand
• once we get a message with publish timestamp `>= t1`, read (and count!)
all messages until we see one with publish timestamp `>= t2`
• print the count from the previous step
• do all of the above steps repeatedly, and look at the different counts that
result
• observe that they're not all the same, even though I would think they should
be
E.g., I did this 30 times and got these counts:
```6909,6909,6909,6909,6909,3008,6909,
6909,6909,6909,6909,6909,6909,6909,
6909,5027,6909,6909,6909,6909,6909,
6909,2983,6909,6909,6909,6909,6909,
6909,2845```
Is there anything that would account for this behavior? I'm having a hard time
getting a really-minimal reproductive case, so it's hard for me to file a good
bug report.
----
2020-04-29 16:27:28 UTC - Gary Fredericks: when I look at the first `>= t2`
message, I see that the reader has skipped forward a significant amount, though
not always to or from the same spot
----
2020-04-29 16:27:49 UTC - Gary Fredericks: I use tiered storage, but I have
observed this happening when reading both from offloaded sections and from
not-offloaded sections
----
2020-04-29 16:29:40 UTC - Gary Fredericks: I've seen this behavior both from
the java client and the python client, and across multiple server versions
----
2020-04-29 16:33:30 UTC - Damien Roualen: @Damien Roualen has joined the channel
----
2020-04-29 17:08:12 UTC - Sijie Guo: Interesting observation. How are the
messages produced? Are they produced from different producers?
----
2020-04-29 17:08:38 UTC - Gary Fredericks: just one
----
2020-04-29 17:11:05 UTC - Gary Fredericks: at least once (maybe all the time)
the different runs have the following characteristics:
• they all start at the same message (presumably the correct one?)
• all of the messages read within the time window are the same and in the same
order, but _some_ of the processes randomly jump forward out of the time window
in other words, the faulty runs are at least a prefix of the correct runs
----
2020-04-29 17:23:07 UTC - Gary Fredericks: it may be true that these topics
have all had more than one producer _over time_
----
2020-04-29 17:23:26 UTC - Gary Fredericks: well it's certainly true in the
sense that producer processes get restarted
----
2020-04-29 18:03:56 UTC - Sijie Guo: Okay.
I just want to point out one fact is that currently publish time is generated
at the producer side. The seek operation is currently based on publish time. So
seek is a best-effort operation.
In other hand, if you always seek to one specific time and count the difference
between t1 and t2. It should *most likely* give you the same number.
So one thing you can try is to dump the messages (at least print the publish
time) between t1 and t2.
----
2020-04-29 18:04:29 UTC - Gary Fredericks: yeah, I've looked at the message IDs
and they had the prefix-characteristic I described just above
----
2020-04-29 18:05:46 UTC - Gary Fredericks: the semantics of the timestamp seem
like they shouldn't matter, since the result should at least be deterministic,
as I think you just said, so I haven't on what the timestamps are exactly
----
2020-04-29 18:19:29 UTC - Gary Fredericks: the publish timestamps should be
close to the current time, and I've reproduced this using days-old segments of
the topic, so small variations and race conditions in the timestamp semantics
shouldn't explain it
----
2020-04-29 20:57:01 UTC - Raffaele: @Raffaele has joined the channel
----
2020-04-29 21:29:42 UTC - Sijie Guo: i see. that sounds like a bug. Can you
create a github issue for it?
----
2020-04-29 21:30:14 UTC - Gary Fredericks: sure
I'll likely be working on this full time for the short-term future
100 : Sijie Guo
----
2020-04-29 21:30:42 UTC - Sijie Guo: Awesome. Thanks!
----
2020-04-29 21:44:03 UTC - Gary Fredericks:
<https://github.com/apache/pulsar/issues/6847>
----
2020-04-30 02:17:14 UTC - Rodrigo Batista: is there any way to merge two
messages, from different topics, into a single message, if they have a common
field. I.E: the field task_id with same id in both messages?
----
2020-04-30 07:17:25 UTC - Tymm: `Hello, I have the same problem with 2.4.2`
I am running pulsar standalone and is writing a function to listen for input on
a topic set with a schema:
`"schemaInfo": { "name": "ffc-liveoccupancydata", "schema": { "type": "record",
"name": "LIVEOCCUPANCYDATA", "namespace": "public.default", "fields": [ {
"name": "DataID", "type": "long" }, { "name": "CameraSerial", "type": [ "null",
"string" ] }, { "name": "Timestamp", "type": "long" }, { "name": "OutValue",
"type": "long" }, { "name": "Create_at", "type": [ "null", "string" ] }, {
"name": "BranchCode", "type": [ "null", "string" ] }, { "name":
"UploadedLocalDateTime", "type": [ "null", "string" ] }, { "name":
"UploadedUTCDateTime", "type": [ "null", "string" ] }, { "name": "UtcDateTime",
"type": [ "null", "string" ] }, { "name": "InValue", "type": "long" }, {
"name": "Serial", "type": [ "null", "string" ] } ] }, "type": "JSON",
"properties": {} }`
and when i run the function with localrunner, i get the following exception:
`[public/default/occupancy-func-0] ERROR
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Source open
produced uncaught exception: java.util.concurrent.CompletionException:
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException:
Trying to subscribe with incompatible schema at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:714)
at
java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:598) at
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:157)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
<http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at
<http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at
org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at
org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:433)
at
org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:330)
at
<http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException:
Trying to subscribe with incompatible schema at
org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:913)
... 20 more [public/default/occupancy-func-0] ERROR
org.apache.pulsar.functions.instance.JavaInstanceRunnable -
[public/default/occupancy-func:0] Uncaught exception in Java Instance
java.util.concurrent.CompletionException:
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException:
Trying to subscribe with incompatible schema at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:714)
at
java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:598) at
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:157)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
<http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at
<http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at
org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at
org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:433)
at
org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:330)
at
<http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException:
Trying to subscribe with incompatible schema at
org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:913)
... 20 more [public/default/occupancy-func-0] INFO
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance
[public/default/occupancy-func-0] INFO
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Unloading JAR files
for function InstanceConfig(instanceId=0,
functionId=9344e5d2-14f0-4970-b837-730808abd846,
functionVersion=65a3a001-8bbb-4ed2-b3a3-a779ba636372, functionDetails=tenant:
"public" namespace: "default" name: "occupancy-func" className:
"pulsar.FFCLiveOccupancySerialFunc" autoAck: true parallelism: 1 source {
typeClassName: "java.lang.String" inputSpecs { key: "ffc-liveoccupancydata"
value { } } cleanupSubscription: true } sink { typeClassName: "java.lang.Void"
} resources { cpu: 1.0 ram: 1073741824 disk: 10737418240 }`
Please advice, thanks.
----
2020-04-30 07:46:17 UTC - Raffaele: Any stateful streaming product (i.e. Flink)
----
2020-04-30 08:11:57 UTC - Raffaele: Morning
----
2020-04-30 08:14:34 UTC - Raffaele: I have a few technical questions on Pulsar
behavior:
Since a ledger contains multiple topics, if a single one expires will the data
related to the expired topic be deleted or kept?
How do topics with unlimited retention affect this behavior?
----