2019-09-20 10:12:53 UTC - Diego Salvi: Hello! I'm looking for a way to set a
ttl for unaknowledged
(<https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#time-to-live-ttl>)
cluster wide to avoid to set it for every namespace created. There is a way?
Additionally defaultRetentionTime is in minutes as show from documentation
(<https://pulsar.apache.org/docs/en/reference-configuration/#broker-defaultRetentionTimeInMinutes>)
or in seconds as seen in ServiceConfiguration (private int
ttlDurationDefaultInSeconds = 0)?
----
2019-09-20 10:15:33 UTC - Penghui Li: @Diego Salvi I think you can set in
broker.conf
```
# Default ttl for namespaces if ttl is not already configured at namespace
policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0
```
----
2019-09-20 10:19:26 UTC - Diego Salvi: @Penghui Li I presume that one is to
still retain data even if aknowledge, configured as default at broker level and
not a namespace level, right? I'm looking for a similar configuration but for
unaknowledge data ttl like
<https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#time-to-live-ttl>
but still configurable ad broker level. Is it possible?
----
2019-09-20 10:23:57 UTC - Penghui Li: I have check the logic in PersistentTopic:
```
public void checkMessageExpiry() {
TopicName name = TopicName.get(topic);
Policies policies;
try {
policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new
KeeperException.NoNodeException());
int defaultTTL =
brokerService.pulsar().getConfiguration().getTtlDurationDefaultInSeconds();
int message_ttl_in_seconds = (policies.message_ttl_in_seconds <=
0 && defaultTTL > 0) ? defaultTTL
: policies.message_ttl_in_seconds;
if (message_ttl_in_seconds != 0) {
subscriptions.forEach((subName, sub) ->
sub.expireMessages(message_ttl_in_seconds));
replicators.forEach((region, replicator) ->
((PersistentReplicator)replicator).expireMessages(message_ttl_in_seconds));
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
}
}
}
```
----
2019-09-20 10:24:51 UTC - Penghui Li: If message ttl of namespace is not set,
ttlDurationDefaultInSeconds will be used
----
2019-09-20 10:27:04 UTC - Penghui Li: In broker.conf, if you want to retain
data even if acknowledged, the config would be :
```
# Default message retention time
defaultRetentionTimeInMinutes=0
# Default retention size
defaultRetentionSizeInMB=0
```
----
2019-09-20 11:29:34 UTC - Diego Salvi: @Penghui Li Thank you for the answer.
I'm just still a little confused. The defaultRetentionTimeInMinutes so is the
default value for retention by time at ns level or of message TTL (still ad ns
level)? Judging by posted code I presume the second one. Just to be more clear:
If i set such configuration.setTtlDurationDefaultInSeconds to 60 second
unaknowledged messages will be deleted after 60 seconds?
----
2019-09-20 11:45:44 UTC - Ming Fang: When trying to start an entire cluster
consisting of zookeepers, bookkeepers, and pulsar, there seems to be a race
condition causing one of the pulsar node to enter a crash loop and unable to
start. This is the error
```
10:19:35.098 [main] INFO org.apache.pulsar.broker.PulsarService - Starting
load management service ... │
│ 10:19:35.130 [main] ERROR
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Broker znode
- [/loadbalance/brokers/pu │
│ lsar-0.pulsar.pulsar-example:8080] is own by different zookeeper-ssession
144156858040267453 │
│ 10:19:35.131 [main] ERROR
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Unable to
create znode - [/loadbalance/ │
│ brokers/pulsar-0.pulsar.pulsar-example:8080] for load balance on zookeeper
│
│ org.apache.pulsar.broker.PulsarServerException: Broker-znode owned by
different zk-session 144156858040267453
```
----
2019-09-20 12:00:23 UTC - Penghui Li: @Diego Salvi yes, unack messages will be
auto acked.
----
2019-09-20 12:01:39 UTC - Penghui Li: i think use auto acked here is more
suitable
----
2019-09-20 12:04:21 UTC - Nicolas Ha: What is the difference between Consumer
close and consumer unsubscribe methods? Its not clear from the javadoc
<https://pulsar.apache.org/api/client/>
----
2019-09-20 12:07:53 UTC - Penghui Li: Unsubscribe will delete the subscription,
close a consumer just close or delete a consumer of the subscription
+1 : Nicolas Ha
----
2019-09-20 13:06:53 UTC - Junli Antolovich: @Vladimir Shchur appreciated for
the info. I have not worked with F#, and neither do our teams (as far as I
know). With this client written in F#, we would have to either learn F# or
depend on you guys for any new features we need.
----
2019-09-20 13:15:56 UTC - Vladimir Shchur: I see, but I will be glad to guide
anyone of your team if you want to implement a feature. It's not that scary as
most people think. Here is the presentation with some implementation details
that I had 2 weeks ago.
<https://www.slideshare.net/Odin_cool/fsharp-goodness-for-everyday-work>
+1 : Matteo Merli
----
2019-09-20 13:30:32 UTC - Junli Antolovich: Thanks much for the offer - not
that we are not willing to learn a new language, but we are under rather tight
time constraint. If time permits for the tasks ahead of us, I would be glad to
pick up F#, or GO.
----
2019-09-20 13:35:13 UTC - Tarek Shaar: @David Kjerrumgaard I attached one
listener to the consumer
----
2019-09-20 14:07:24 UTC - Tarek Shaar: @David Kjerrumgaard I am also assuming
the thread is returned to the pool when the message is acked?
----
2019-09-20 15:40:24 UTC - Diego Salvi: @Penghui Li I attempted to remove
unacknowledge messages but with no success:
1) I set ttl to 5 seconds. Wrote with a Produce 1000+ messages (actually 1200)
2) wait 15 seconds just to be safe
3) invoke the pulsarService.getBrokerService().checkMessageExpiry()
4) attempt to read events with a Reader expecting noone but still i find 1200
messages
I attempted to debug the code and I noticed that
`subscriptions.forEach((subName, sub) ->
sub.expireMessages(message_ttl_in_seconds));` in your posted code is never
invoked due to empty "subscriptions". I'm missing something?
----
2019-09-20 15:41:25 UTC - Matteo Merli: @Diego Salvi TTL is enforced in batches
for efficiency reasons. There’s a thread that checks TTL every 5 mins
+1 : Penghui Li
----
2019-09-20 16:19:10 UTC - Julien Lechalupé: @Julien Lechalupé has joined the
channel
----
2019-09-20 18:14:13 UTC - Nitin Mahadik: @Nitin Mahadik has joined the channel
----
2019-09-20 20:17:44 UTC - Badrul Chowdhury: @Badrul Chowdhury has joined the
channel
----
2019-09-20 20:45:10 UTC - Tarek Shaar: I just want to confirm if I want to send
and receive short String messages in Java then will need to send and receive
byte[] and convert back and forth each time?
----
2019-09-20 20:57:36 UTC - Matteo Merli: You can declare the schema of the topic
as `String`.
----
2019-09-20 20:58:22 UTC - Matteo Merli: eg.
```
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
```
----
2019-09-20 22:10:33 UTC - Addison Higham: :thinking_face: hrm... another
question, what happens if a bookie starts back up without it's journal? Toying
with the idea of using ephemeral storage (specifically a k8s local persistent
volume) for the journal FS and wondering what that would mean in terms of
recovery of a lost node. So the ledger storage will still be there, just the
journal missing.
----
2019-09-20 22:11:59 UTC - Matteo Merli: Current behavior is the bookie will
fail to startup. There’s a “cookie” validation for the bookie identity that has
to match against what’s in ZK, on both the journal and the ledger storage
directories.
----
2019-09-20 22:13:11 UTC - Matteo Merli: There was a proposal by @Rajan Dhabalia
to allow for Journal bypass directly. That would be a better option when the
durability is not required
----
2019-09-20 22:15:13 UTC - Addison Higham: okay, so there isn't real handling of
that case by BK, so it probably isn't advised to say... take that cookie value
and just copy it to the journal storage and restart? it seems like that would
obviously invalidate any segments it had that were open, but for any closed
segments...
----
2019-09-20 22:17:52 UTC - Matteo Merli: sure, you could automate that
----
2019-09-20 22:18:14 UTC - Matteo Merli: the downside is just that what was in
journal might got lost
----
2019-09-20 22:21:51 UTC - Addison Higham: but assuming a Qs of 3, the other
members would still have those messages, I am just wondering if that
inconsistency would be noticed and fixed
----
2019-09-20 22:22:47 UTC - Addison Higham: like do bookies checksum closed
segments and then repair any segments that don't match the quorum?
----
2019-09-20 22:25:06 UTC - Matteo Merli: yes, the client might get EntryNotFound
from one bookie, and will retry to read from the other 2 bookies
----
2019-09-20 22:25:26 UTC - Matteo Merli: that also happens if one entry is
corrupted on one bookie
----
2019-09-20 22:31:03 UTC - Rajiv Abraham: Hi, for the file io source connector,
for the `inputDirectory` variable, I’m guessing the file path is local to the
worker node? If so, is there a way of using the REST api to upload a file. If
there are examples online on how to do it, that would be great.
----
2019-09-20 23:30:54 UTC - Rajiv Abraham: Do I have to publish the file out of
the Pulsar API(e.g through scp) and how does it work for multiple nodes in the
cluster?
----
2019-09-21 00:14:14 UTC - Rajiv Abraham: Hi, another question, is there a
python client to create a source/sink? or do I have to call the REST API? I
looked at the Python API and didn’t see it but just wanted to double check.
----