2019-09-20 10:12:53 UTC - Diego Salvi: Hello! I'm looking for a way to set a 
ttl for unaknowledged 
(<https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#time-to-live-ttl>)
 cluster wide to avoid to set it for every namespace created. There is a way? 
Additionally defaultRetentionTime is in minutes as show from documentation 
(<https://pulsar.apache.org/docs/en/reference-configuration/#broker-defaultRetentionTimeInMinutes>)
 or in seconds as seen in ServiceConfiguration (private int 
ttlDurationDefaultInSeconds = 0)?
----
2019-09-20 10:15:33 UTC - Penghui Li: @Diego Salvi I think you can set in 
broker.conf
```
# Default ttl for namespaces if ttl is not already configured at namespace 
policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0
```
----
2019-09-20 10:19:26 UTC - Diego Salvi: @Penghui Li I presume that one is to 
still retain data even if aknowledge, configured as default at broker level and 
not a namespace level, right? I'm looking for a similar configuration but for 
unaknowledge data ttl like 
<https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#time-to-live-ttl>
 but still configurable ad broker level. Is it possible?
----
2019-09-20 10:23:57 UTC - Penghui Li: I have check the logic in PersistentTopic:
```
public void checkMessageExpiry() {
        TopicName name = TopicName.get(topic);
        Policies policies;
        try {
            policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
                    .get(AdminResource.path(POLICIES, name.getNamespace()))
                    .orElseThrow(() -&gt; new 
KeeperException.NoNodeException());
            int defaultTTL = 
brokerService.pulsar().getConfiguration().getTtlDurationDefaultInSeconds();
            int message_ttl_in_seconds = (policies.message_ttl_in_seconds &lt;= 
0 &amp;&amp; defaultTTL &gt; 0) ? defaultTTL
                    : policies.message_ttl_in_seconds;
            if (message_ttl_in_seconds != 0) {
                subscriptions.forEach((subName, sub) -&gt; 
sub.expireMessages(message_ttl_in_seconds));
                replicators.forEach((region, replicator) -&gt; 
((PersistentReplicator)replicator).expireMessages(message_ttl_in_seconds));
            }
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error getting policies", topic);
            }
        }
    }
```
----
2019-09-20 10:24:51 UTC - Penghui Li: If message ttl of namespace is not set, 
ttlDurationDefaultInSeconds will be used
----
2019-09-20 10:27:04 UTC - Penghui Li: In broker.conf, if you want to retain 
data even if acknowledged, the config would be :
```
# Default message retention time
defaultRetentionTimeInMinutes=0

# Default retention size
defaultRetentionSizeInMB=0
```
----
2019-09-20 11:29:34 UTC - Diego Salvi: @Penghui Li Thank you for the answer. 
I'm just still a little confused. The defaultRetentionTimeInMinutes so is the 
default value for retention by time at ns level or of message TTL (still ad ns 
level)? Judging by posted code I presume the second one. Just to be more clear: 
If i set such configuration.setTtlDurationDefaultInSeconds to 60 second 
unaknowledged messages will be deleted after 60 seconds?
----
2019-09-20 11:45:44 UTC - Ming Fang: When trying to start an entire cluster 
consisting of zookeepers, bookkeepers, and pulsar, there seems to be a race 
condition causing one of the pulsar node to enter a crash loop and unable to 
start. This is the error
```
10:19:35.098 [main] INFO  org.apache.pulsar.broker.PulsarService - Starting 
load management service ...                              │
│ 10:19:35.130 [main] ERROR 
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Broker znode 
- [/loadbalance/brokers/pu │
│ lsar-0.pulsar.pulsar-example:8080] is own by different zookeeper-ssession 
144156858040267453                                         │
│ 10:19:35.131 [main] ERROR 
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Unable to 
create znode - [/loadbalance/ │
│ brokers/pulsar-0.pulsar.pulsar-example:8080] for load balance on zookeeper    
                                                       │
│ org.apache.pulsar.broker.PulsarServerException: Broker-znode owned by 
different zk-session 144156858040267453 
```
----
2019-09-20 12:00:23 UTC - Penghui Li: @Diego Salvi yes, unack messages will be 
auto acked. 
----
2019-09-20 12:01:39 UTC - Penghui Li: i think use auto acked here is more 
suitable
----
2019-09-20 12:04:21 UTC - Nicolas Ha: What is the difference between Consumer 
close and consumer unsubscribe methods? Its not clear from the javadoc 
<https://pulsar.apache.org/api/client/>
----
2019-09-20 12:07:53 UTC - Penghui Li: Unsubscribe will delete the subscription, 
close a consumer just close or delete a consumer of the subscription
+1 : Nicolas Ha
----
2019-09-20 13:06:53 UTC - Junli Antolovich: @Vladimir Shchur appreciated for 
the info. I  have not worked with F#, and neither do our teams (as far as I 
know). With this client written in F#, we would have to either learn F# or 
depend on you guys for any new features we need.
----
2019-09-20 13:15:56 UTC - Vladimir Shchur: I see, but I will be glad to guide 
anyone of your team if you want to implement a feature. It's not that scary as 
most people think. Here is the presentation with some implementation details 
that I had 2 weeks ago.
<https://www.slideshare.net/Odin_cool/fsharp-goodness-for-everyday-work>
+1 : Matteo Merli
----
2019-09-20 13:30:32 UTC - Junli Antolovich: Thanks much for the offer - not 
that we are not willing to learn a new language, but we are under rather tight 
time constraint. If time permits for the tasks ahead of us, I would be glad to 
pick up F#, or GO.
----
2019-09-20 13:35:13 UTC - Tarek Shaar: @David Kjerrumgaard I attached one 
listener to the consumer
----
2019-09-20 14:07:24 UTC - Tarek Shaar: @David Kjerrumgaard I am also assuming 
the thread is returned to the pool when the message is acked?
----
2019-09-20 15:40:24 UTC - Diego Salvi: @Penghui Li I attempted to remove 
unacknowledge messages but with no success:
1) I set ttl to 5 seconds. Wrote with a Produce 1000+ messages (actually 1200)
2) wait 15 seconds just to be safe
3) invoke the pulsarService.getBrokerService().checkMessageExpiry()
4) attempt to read events with a Reader expecting noone but still i find 1200 
messages
I attempted to debug the code and I noticed that 
`subscriptions.forEach((subName, sub) -&gt; 
sub.expireMessages(message_ttl_in_seconds));` in your posted code is never 
invoked due to empty "subscriptions". I'm missing something?
----
2019-09-20 15:41:25 UTC - Matteo Merli: @Diego Salvi TTL is enforced in batches 
for efficiency reasons. There’s a thread that checks TTL every 5 mins
+1 : Penghui Li
----
2019-09-20 16:19:10 UTC - Julien Lechalupé: @Julien Lechalupé has joined the 
channel
----
2019-09-20 18:14:13 UTC - Nitin Mahadik: @Nitin Mahadik has joined the channel
----
2019-09-20 20:17:44 UTC - Badrul Chowdhury: @Badrul Chowdhury has joined the 
channel
----
2019-09-20 20:45:10 UTC - Tarek Shaar: I just want to confirm if I want to send 
and receive short String messages in Java then will need to send and receive 
byte[] and convert back and forth each time?
----
2019-09-20 20:57:36 UTC - Matteo Merli: You can declare the schema of the topic 
as `String`.
----
2019-09-20 20:58:22 UTC - Matteo Merli: eg.

```
Producer&lt;String&gt; producer = client.newProducer(Schema.STRING)
     .topic("my-topic")
     .create();
```
----
2019-09-20 22:10:33 UTC - Addison Higham: :thinking_face: hrm... another 
question, what happens if a bookie starts back up without it's journal? Toying 
with the idea of using ephemeral storage (specifically a k8s local persistent 
volume) for the journal FS and wondering what that would mean in terms of 
recovery of a lost node. So the ledger storage will still be there, just the 
journal missing.
----
2019-09-20 22:11:59 UTC - Matteo Merli: Current behavior is the bookie will 
fail to startup. There’s a “cookie” validation for the bookie identity that has 
to match against what’s in ZK, on both the journal and the ledger storage 
directories.
----
2019-09-20 22:13:11 UTC - Matteo Merli: There was a proposal by @Rajan Dhabalia 
to allow for Journal bypass directly. That would be a better option when the 
durability is not required
----
2019-09-20 22:15:13 UTC - Addison Higham: okay, so there isn't real handling of 
that case by BK, so it probably isn't advised to say... take that cookie value 
and just copy it to the journal storage and restart? it seems like that would 
obviously invalidate any segments it had that were open, but for any closed 
segments...
----
2019-09-20 22:17:52 UTC - Matteo Merli: sure, you could automate that
----
2019-09-20 22:18:14 UTC - Matteo Merli: the downside is just that what was in 
journal might got lost
----
2019-09-20 22:21:51 UTC - Addison Higham: but assuming a Qs of 3, the other 
members would still have those messages, I am just wondering if that 
inconsistency would be noticed and fixed
----
2019-09-20 22:22:47 UTC - Addison Higham: like do bookies checksum closed 
segments and then repair any segments that don't match the quorum?
----
2019-09-20 22:25:06 UTC - Matteo Merli: yes, the client might get EntryNotFound 
from one bookie, and will retry to read from the other 2 bookies
----
2019-09-20 22:25:26 UTC - Matteo Merli: that also happens if one entry is 
corrupted on one bookie
----
2019-09-20 22:31:03 UTC - Rajiv Abraham: Hi, for the file io source connector, 
for the `inputDirectory` variable, I’m guessing the file path is local to the 
worker node? If so, is there a way of using the REST api to upload a file. If 
there are examples online on how to do it, that would be great.
----
2019-09-20 23:30:54 UTC - Rajiv Abraham: Do I have to publish the file out of 
the Pulsar API(e.g through scp) and how does it work for multiple nodes in the 
cluster?
----
2019-09-21 00:14:14 UTC - Rajiv Abraham: Hi, another question, is there a 
python client to create a source/sink? or do I have to call the REST API? I 
looked at the Python API and didn’t see it but just wanted to double check.
----

Reply via email to