2020-06-26 09:37:28 UTC - Ali Ahmed: some discussion of pulsar is the real 
world it’s the last part of the talk.
<https://www.youtube.com/watch?v=CbirFaTDpWE>
----
2020-06-26 10:57:23 UTC - Pierre-Yves Lebecq: Hello :wave: Sorry this is going 
to be a long post but I have an issue with Pulsar Functions and more precisely 
the state storage of the functions and I’ll try to give as much as possible 
information about what is happening.

I’m basically running 3 functions on a pulsar standalone instance, using docker 
image apachepulsar/pulsar-all:2.6.0. There is only one instance of each 
function for now:
• tasks-JobEnginePulsarFunction: This function keeps track of the state of some 
jobs (they have a name and an ID basically). This function saves an entry in 
the state for every job to run (one key per job) and send a message in a topic 
for a consumer that will try to process the job and send various events back, 
like job start, and then failed / completed. The function will record in the 
state the events that happened for that job and when failed it will send a new 
message to have the job retried. When it receives an event saying the job 
completed, it will delete the associated state because it won’t be used 
anymore. When the status of a job changes, the function publishes a message to 
a topic named “tasks-monitoring-per-name” containing the old status and the new 
status for a job)
• MonitoringPerName: This function uses counters and state to store how many 
jobs are in a given state. Based on the state update published by the previous 
function, it will decrement the counter corresponding to the old status of the 
job, and increment the counter corresponding to the new status. Because 
counters are not easy to retrieve from the outside world, this function also 
stores the values of the counters in function state. When a new job name is 
received, the function publishes in a “tasks-monitoring-global” topic the fact 
that it has received a job which was previously not know, and will initialise 
counters and state for it.
• MonitoringGlobal: This function uses state to store a list of all job names 
that are known, based on the message published by the previous function.
When I feed the first function with 10k messages, at some point it will just 
get stuck and stop doing anything. Usually between 2k - 5k message. It also 
failed sometimes after a few hundreds messages. And one time it went through 
the 10k messages without issue. I tried to run some commands while it was stuck 
to get more information and here are some things I observed:

The function instance seems to be considered running by pulsar:

```# bin/pulsar-admin functions stats --name tasks-JobEnginePulsarFunction
{
  "receivedTotal" : 315,
  "processedSuccessfullyTotal" : 314,
  "systemExceptionsTotal" : 0,
  "userExceptionsTotal" : 0,
  "avgProcessLatency" : 118.07828642993623,
  "1min" : {
    "receivedTotal" : 0,
    "processedSuccessfullyTotal" : 0,
    "systemExceptionsTotal" : 0,
    "userExceptionsTotal" : 0,
    "avgProcessLatency" : null
  },
  "lastInvocation" : 1593099037904,
  "instances" : [ {
    "instanceId" : 0,
    "metrics" : {
      "receivedTotal" : 315,
      "processedSuccessfullyTotal" : 314,
      "systemExceptionsTotal" : 0,
      "userExceptionsTotal" : 0,
      "avgProcessLatency" : 118.07828642993623,
      "1min" : {
        "receivedTotal" : 0,
        "processedSuccessfullyTotal" : 0,
        "systemExceptionsTotal" : 0,
        "userExceptionsTotal" : 0,
        "avgProcessLatency" : null
      },
      "lastInvocation" : 1593099037904,
      "userMetrics" : { }
    }
  } ]
}```
Gettings stats about the topic shows the function has some message to process 
on the topic:


+1 : Kirill Merkushev
----
2020-06-26 10:57:23 UTC - Pierre-Yves Lebecq: ```# bin/pulsar-admin topics 
stats tasks-engine                                                              
                                                                                
                
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 301946,
  "msgInCounter" : 1504,
  "bytesOutCounter" : 301946,
  "msgOutCounter" : 1504,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 301946,
  "backlogSize" : 239146,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "metadata" : { },
    "producerName" : "standalone-0-11",
    "connectedSince" : "2020-06-25T15:32:32.654Z",
    "clientVersion" : "2.5.0",
    "address" : "/10.0.2.2:54818"
  }, {
    "msgRateIn" : 0.0,                                                          
                                                                                
                                                                   
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 2,
    "metadata" : {
      "instance_id" : "0",
      "application" : "pulsar-function",
      "id" : "public/default/tasks-JobEnginePulsarFunction"
    },
    "producerName" : "standalone-0-6",
    "connectedSince" : "2020-06-25T15:29:26.603Z",
    "clientVersion" : "2.6.0",
    "address" : "/127.0.0.1:56128"
  } ],
  "subscriptions" : {
    "public/default/tasks-JobEnginePulsarFunction" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 301946,
      "msgOutCounter" : 1504,
      "msgRateRedeliver" : 0.0,
      "chuckedMessageRate" : 0,
      "msgBacklog" : 1190,
      "msgBacklogNoDelayed" : 1190,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 1190,
      "type" : "Shared",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1593099153207,
      "lastConsumedTimestamp" : 1593099153689,
      "lastAckedTimestamp" : 1593099037935,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 301946,
        "msgOutCounter" : 1504,
        "msgRateRedeliver" : 0.0,
        "chuckedMessageRate" : 0.0,
        "consumerName" : "75d3c",
        "availablePermits" : 496,
        "unackedMessages" : 1190,
        "avgMessagesPerEntry" : 6,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 1593099037935,
        "lastConsumedTimestamp" : 1593099153689,
        "metadata" : {
          "instance_id" : "0",
          "application" : "pulsar-function",
          "id" : "public/default/tasks-JobEnginePulsarFunction"
        },
        "connectedSince" : "2020-06-25T15:21:10.978Z",
        "clientVersion" : "2.6.0",
        "address" : "/127.0.0.1:56128"
      } ],
      "isDurable" : true,
      "isReplicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled"
}```
----
2020-06-26 10:57:23 UTC - Pierre-Yves Lebecq: Restarting the function or 
stopping/starting it does nothing. After being restarted, the function does not 
process messages. I did not mention it bug I don’t see anything special in the 
container output. There is not error in the function log file and the 
pulsar.log and pulsar-standalone.log file are empty.

While this happens, I cannot even retrieve state using pulsar CLI:

```# bin/pulsar-admin functions querystate --name 
tasks-MonitoringGlobalPulsarFunction --key monitoringGlobal.state
null

Reason: java.util.concurrent.TimeoutException```
The only way to make it start processing message again is to stop the container 
and start it again. Doing this leads to other issues, although maybe it’s not 
worth mentioning them here otherwise we might have too many things to look at 
at the same time.

I found some github issues really close to this: 
<https://github.com/apache/pulsar/issues/6813> , 
<https://github.com/apache/pulsar/issues/6427> and 
<https://github.com/apache/pulsar/issues/7036> .
Unfortunately, they’re not getting much attention. Is there anything I can do 
or maybe provide additional details to help push them forward?

Any help would be much appreciated. Thanks!
----
2020-06-26 11:14:15 UTC - Jonas Kint: @Jonas Kint has joined the channel
----
2020-06-26 12:03:57 UTC - Kirill Merkushev: tried on 2.5.0 - no such param, and 
no mention in the api 
<http://pulsar.apache.org/staging/admin-rest-api/#operation/deleteSubscription> 
- also it stands that should be no active consumers
----
2020-06-26 13:56:43 UTC - rwaweber: I’m getting it directly from prometheus, 
but I can retrieve it directly from the brokers if that would help. (not sure 
if it’s a red herring, but only one of the brokers seem to report _that_ metric 
at a given time — we also only have three brokers ATM)

Metric with labels:
```pulsar_storage_size{cluster="pulsar-cluster-1",instance="brk-01q.local:8080",job="scrapes",namespace="public/dev",topic="<persistent://public/dev/beats.replica>"}
 2786956094```
----
2020-06-26 14:07:00 UTC - jinggang: thanks all. I have forked the code and 
create a generic authentication for my own
----
2020-06-26 15:00:17 UTC - Matteo Merli: The "force delete" for subscriptions 
was added in 2.6
----
2020-06-26 15:01:37 UTC - Matteo Merli: Keep in mind that, if the consumer is 
connected, it will recreate the subscription immediately on reconnection.

If you just want to get rid of the backlog, you can skip it all
----
2020-06-26 16:11:35 UTC - Kirill Merkushev: Thanks, I just got a stale 
connection and want to get rid of it
----
2020-06-26 17:06:05 UTC - Sijie Guo: Do you have the sequence to reproduce this 
issue?

Btw, <#C015BU8JWUW|aop> is the good place to ask questions related to 
<#C015BU8JWUW|aop>
----
2020-06-26 19:18:55 UTC - Jeff Schneller: What is the preferred 
authentication/authorization mechanism - TLS or JWT?
----
2020-06-26 19:31:20 UTC - Matteo Merli: JWT is generally easier to setup, given 
that the tooling around OpenSSL can be a bit cryptic.

Also, it can easier to share token (as strings) compared to certificates.

At the end, it really boils down to what auth schema you're already using for 
other services.
----
2020-06-26 20:07:12 UTC - Frank Kelly: Another Auth*n question - this time for 
a Custom Auth*n Plugin. I have `authorizationEnabled=true` defined on my proxy 
(so I'm expecting Authorization to occur on the proxy and not on the broker)
```root@pulsar-proxy-6f798754db-r9gbw:/pulsar/conf# grep -i authorization 
proxy.conf 
### ---Authorization --- ###
# Whether authorization is enforced by the Pulsar proxy
authorizationEnabled=true
# Authorization provider as a fully qualified class name
authorizationProvider=com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider
# Whether client authorization credentials are forwared to the broker for 
re-authorization.
forwardAuthorizationCredentials=false```
and I can see that the Plugin has been loaded successfully and initialized
```[16:02:04] fkelly@Franks-Cogito-Work-Computer:[~/platform2-test]: 
(feature/sdlc-31257-minikube-integration) klf pulsar-proxy-6f798754db-r9gbw | 
grep -i authorization
[conf/proxy.conf] Applying config authorizationEnabled = true
[conf/proxy.conf] Applying config authorizationProvider = 
com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider
19:55:31.069 [main] INFO  
com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider
 - ==&gt; Initialize()
19:55:31.074 [main] INFO  
org.apache.pulsar.broker.authorization.AuthorizationService - 
com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider
 has been loaded.```
From the Proxy logs I see my token was authenticated successfully but I see no 
traces of the AuthorizationProvider being accessed/executed. Any thoughts?
----
2020-06-26 21:58:37 UTC - Jared Marolf: @Jared Marolf has joined the channel
----
2020-06-26 22:06:41 UTC - Jared Marolf: Has anyone ever used pulsar with 
openstack? I am trying to deploy a multi broker setup but the issue is the 
openstack instances can't communicate via floating ip, only private ip. I am 
trying to figure out if I need to utilize the advertisedListeners and 
internalListenerName for this to work and what the appropriate setup would be. 
Would I need to advertise both the floating ip and private ip on the service 
and http ports or some other combination of that? The brokers need to be able 
to be connected to by the floating ip from external sources. Any input would be 
appreciated.
----

Reply via email to