2020-09-18 17:08:41 UTC - Piyush: Hey Everyone, I was wondering if we could
reset topic cursor from a function.. if yes, how do we do that?
----
2020-09-18 17:15:45 UTC - Alexander Brown: @Alexander Brown has joined the
channel
----
2020-09-18 17:25:37 UTC - Addison Higham: the right place to ask
:slightly_smiling_face: Yes, there is a default that if there is no producers
or consumer and no data, the topic will be deleted after 60 seconds, but this
is highly configurable
----
2020-09-18 17:27:20 UTC - Addison Higham: There are rate limits that can limit
that like `pulsar-admin namespaces set-dispatch-rate`, you can see all the
limits under `pulsar-admin namespaces policies`
----
2020-09-18 17:28:00 UTC - Addison Higham: hrm... that does seem strange...
@Sijie Guo have you ever seen anything like that?
----
2020-09-18 18:14:17 UTC - Sijie Guo: Interesting… I don’t think it will happen.
@Sankararao Routhu how did you setup the clusters?
----
2020-09-18 18:15:27 UTC - Sijie Guo: You can call the Pulsar restful admin API
to reset the cursor
----
2020-09-18 18:26:43 UTC - Yarden Arane: Hi @Sijie Guo,
I tried a setup using pulsar standalone with a function that looks like
follows:
```public class CursorManagementFunction implements Function<String,
Void> {
PulsarAdmin admin = PulsarAdmin.builder()
.connectionTimeout(120, TimeUnit.SECONDS)
.readTimeout(120, TimeUnit.SECONDS)
.serviceHttpUrl("<http://127.0.0.1:8080>")
.build()
MessageId prevMessageId;
@Override
public void process(String input, Context context) {
String topic = context.getCurrentRecord().getTopicName()
String subName = context.getFunctionName
admin.topics().resetCursor(topic, subName, prevMessageId);
}
}```
However, the call to resetCursor throws this exception:
```java.lang.IllegalStateException: InjectionManagerFactory not found.
at
org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.lambda$lookupInjectionManagerFactory$0(Injections.java:98)
~[?:?]
at java.util.Optional.orElseThrow(Optional.java:290) ~[?:1.8.0_242]
at
org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.lookupInjectionManagerFactory(Injections.java:98)
~[?:?]
at
org.apache.pulsar.shade.org.glassfish.jersey.internal.inject.Injections.createInjectionManager(Injections.java:68)
~[?:?]
at
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig$State.initRuntime(ClientConfig.java:432)
~[?:?]
at
org.apache.pulsar.shade.org.glassfish.jersey.internal.util.collection.Values$LazyValueImpl.get(Values.java:341)
~[?:?]
at
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig.getRuntime(ClientConfig.java:826)
~[?:?]
at
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRequest.getConfiguration(ClientRequest.java:285)
~[?:?]
at
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.validateHttpMethodAndEntity(JerseyInvocation.java:143)
~[?:?]
at
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.<init>(JerseyInvocation.java:112)
~[?:?]
at
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.<init>(JerseyInvocation.java:108)
~[?:?]
at
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.<init>(JerseyInvocation.java:99)
~[?:?]
at
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$AsyncInvoker.method(JerseyInvocation.java:706)
~[?:?]
at
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$AsyncInvoker.get(JerseyInvocation.java:566)
~[?:?]
at
org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:168)
~[org.apache.pulsar-pulsar-client-admin-original-2.6.1.jar:2.6.1]```
The same code not running in a Pulsar Function does work:
```public class CursorManagementApplication {
PulsarAdmin admin = PulsarAdmin.builder()
.connectionTimeout(120, TimeUnit.SECONDS)
.readTimeout(120, TimeUnit.SECONDS)
.serviceHttpUrl("<http://127.0.0.1:8080>")
.build()
MessageId prevMessageId;
public static void main(String[] args){
String topic = "my_topic"
String subName = "my_subscription"
admin.topics().resetCursor(topic, subName, prevMessageId);
}
}```
----
2020-09-18 18:31:20 UTC - Sijie Guo: It seems that there is a library conflict
----
2020-09-18 18:31:27 UTC - Sijie Guo: Can you create a github issue?
----
2020-09-18 18:31:57 UTC - Yarden Arane: Thanks, will do.
----
2020-09-18 18:42:37 UTC - Yarden Arane:
<https://github.com/apache/pulsar/issues/8089>
----
2020-09-18 19:22:15 UTC - Sankararao Routhu: Hi @Sijie Guo I setup two clusters
and enabled geo-replication using global zookeeper
----
2020-09-18 19:26:23 UTC - Linton: awesome, thanks @Addison Higham
----
2020-09-18 19:38:36 UTC - Addison Higham: correct, you should be able to just
provide mysql configurations
----
2020-09-18 19:54:36 UTC - aaron wang: @aaron wang has joined the channel
----
2020-09-19 00:37:56 UTC - Sankararao Routhu: looks like enabling dedup in
broker stops this issue. Is it the reason for above issue @Addison Higham
----
2020-09-19 01:00:18 UTC - Addison Higham: It should not be necessary... It
likely would mean some sort of misconfiguration, are the different clusters
both properly named? They way this works is when a broker is replicating a
topic, it attaches additional metadata to the messages, with global config, the
brokers know when a message is coming from a replicating cluster and then do
not re-replicate. If the clusters are not properly configured somehow I wonder
if that could cause this
----
2020-09-19 06:32:51 UTC - Joe Francis: Every replicated message has the source
cluster and a replicated marker set before its sent out. Messages with the
replication marker set are never replicated out . Unless some protobuf
dependency issues are messing with the marker bit offsets. Anyway, highly
unlikely this is a broker issue - I have been doing full mesh n-way Pulsar
replication for many years and never seen this issue
----
2020-09-19 06:41:49 UTC - Joe Francis: I can theorize about a case where
replication repeatedly times out , and, for eg: cluster A repeatedly tries to
push a message to cluster B, So just one message in A could get duplicated many
times to B under such edge conditions of network issues. But bouncing back and
forth? Can't think of a case..
----