[ 
https://issues.apache.org/jira/browse/AMQ-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ken Liao updated AMQ-9592:
--------------------------
    Description: 
To replicate:
 - Run a subscriber with QOS2 to a topic with client ID say "1"and stop it
 - Publish 5 messages to the topic, do not consume these yet.
 - Run another subscriber with QOS2 to the topic with a different client ID say 
"2'.
 - This subscriber with different cliend ID receives 5 messages that were 
intended for the clientID "1". However, this does not mean that client ID "1" 
will not receive the 5 messages when it comes back online. I noticed that 
client ID "1" also received the 5 message.

Now that both clientID 1 and 2 received messages, subscribing a new clientID 
doesn't receive these message. However, if we send 5 message again to the topic 
and do not consume it for all clientIDs, it would delivered to any new 
subscription.

For new client Id broker must create new session, so messages sent after client 
1 disconnected should route to client 1 only since client 2 because it was 
connected after those 5 messages were sent (hence the new session of client 2 
shouldn't contain old messages).

Here are sample codes to reproduce:

The producer Python script
{code:python}
import paho.mqtt.client as mqttClient
import time
import ssl

# context = ssl.create_default_context()

Connected = False 
broker_address= "localhost" 
port = 1883
user = "admin"
password = "admin"

def on_connect(client, userdata, flags, rc):

    if rc == 0:
        print("Connected to broker")
        global Connected                #Use global variable
        Connected = True                #Signal connection  
    else:
        print("Connection failed")

client = mqttClient.Client(client_id="producer",transport="tcp") #
client.username_pw_set(user, password=password) #set username and password
client.on_connect=on_connect
# client.tls_set_context(context=context)
client.connect(broker_address, port=port)
client.loop_start()
i=0
while True:
    time.sleep(1)
    i=i+1
    client.publish("test/data", "This is my test msg 123.",2,False)
    print("Sent"+str(i))
{code}

The consumer Python script
{code:python}
import paho.mqtt.client as mqttClient
import time
import ssl

Connected = False 
broker_address= "localhost" 
port = 1883
user = "admin"
password = "admin"

def on_connect(client, userdata, flags, rc):

    if rc == 0:
        print("Connected to broker")
        global Connected                #Use global variable
        Connected = True                #Signal connection  
        print(flags)
        client.subscribe("test/data",2)
    else:
        print("Connection failed")

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqttClient.Client(client_id="id2",transport="tcp",clean_session=False) 
#
client.username_pw_set(user, password=password) #set username and password
client.on_connect=on_connect
client.on_message=on_message
# client.tls_set_context(context=context)
client.connect(broker_address, port=port)
client.loop_start()

while Connected != True:
    time.sleep(0.1)

time.sleep(1000)
{code}

  was:
To replicate:
 - Run a subscriber with QOS2 to a topic with client ID say "1"and stop it
 - Publish 5 messages to the topic, do not consume these yet.
 - Run another subscriber with QOS2 to the topic with a different client ID say 
"2'.
 - This subscriber with different cliend ID receives 5 messages that were 
intended for the clientID "1". However, this does not mean that client ID "1" 
will not receive the 5 messages when it comes back online. I noticed that 
client ID "1" also received the 5 message.

Now that both clientID 1 and 2 received messages, subscribing a new clientID 
doesn't receive these message. However, if we send 5 message again to the topic 
and do not consume it for all clientIDs, it would delivered to any new 
subscription.

For new client Id broker must create new session, so messages sent after client 
1 disconnected should route to client 1 only since client 2 because it was 
connected after those 5 messages were sent (hence the new session of client 2 
shouldn't contain old messages).

Here are sample codes to reproduce:

The producer Python script
{code:python}
import paho.mqtt.client as mqttClient
import time
import ssl

Connected = False 
broker_address= "localhost" 
port = 1883
user = "admin"
password = "admin"

def on_connect(client, userdata, flags, rc):

if rc == 0:
print("Connected to broker")
global Connected #Use global variable
Connected = True #Signal connection 
else:
print("Connection failed")

client = mqttClient.Client(client_id="producer",transport="tcp") #
client.username_pw_set(user, password=password) #set username and password
client.on_connect=on_connect
 # client.tls_set_context(context=context)
client.connect(broker_address, port=port)
client.loop_start()
i=0
while True:
time.sleep(1)
i=i+1
client.publish("test/data", "This is my test msg 123.",2,False)
{code:python}

The consumer Python script
{code:python}
import paho.mqtt.client as mqttClient
import time
import ssl

Connected = False 
broker_address= "localhost" 
port = 1883
user = "admin"
password = "admin"

def on_connect(client, userdata, flags, rc):

if rc == 0:
print("Connected to broker")
global Connected #Use global variable
Connected = True #Signal connection 
print(flags)
client.subscribe("test/data",2)
else:
print("Connection failed")

def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))

client = mqttClient.Client(client_id="id2",transport="tcp",clean_session=False) 
#
client.username_pw_set(user, password=password) #set username and password
client.on_connect=on_connect
client.on_message=on_message
 # client.tls_set_context(context=context)
client.connect(broker_address, port=port)
client.loop_start()

while Connected != True:
time.sleep(0.1)

time.sleep(1000)
{code:python}


> Incorrect MQTTv3.1.1 behaviour: QoS2 messages were delivered to unintended 
> consumers 
> -------------------------------------------------------------------------------------
>
>                 Key: AMQ-9592
>                 URL: https://issues.apache.org/jira/browse/AMQ-9592
>             Project: ActiveMQ Classic
>          Issue Type: Bug
>          Components: MQTT
>    Affects Versions: 5.18.5, 6.1.3
>            Reporter: Ken Liao
>            Priority: Major
>
> To replicate:
>  - Run a subscriber with QOS2 to a topic with client ID say "1"and stop it
>  - Publish 5 messages to the topic, do not consume these yet.
>  - Run another subscriber with QOS2 to the topic with a different client ID 
> say "2'.
>  - This subscriber with different cliend ID receives 5 messages that were 
> intended for the clientID "1". However, this does not mean that client ID "1" 
> will not receive the 5 messages when it comes back online. I noticed that 
> client ID "1" also received the 5 message.
> Now that both clientID 1 and 2 received messages, subscribing a new clientID 
> doesn't receive these message. However, if we send 5 message again to the 
> topic and do not consume it for all clientIDs, it would delivered to any new 
> subscription.
> For new client Id broker must create new session, so messages sent after 
> client 1 disconnected should route to client 1 only since client 2 because it 
> was connected after those 5 messages were sent (hence the new session of 
> client 2 shouldn't contain old messages).
> Here are sample codes to reproduce:
> The producer Python script
> {code:python}
> import paho.mqtt.client as mqttClient
> import time
> import ssl
> # context = ssl.create_default_context()
> Connected = False 
> broker_address= "localhost" 
> port = 1883
> user = "admin"
> password = "admin"
> def on_connect(client, userdata, flags, rc):
>     if rc == 0:
>         print("Connected to broker")
>         global Connected                #Use global variable
>         Connected = True                #Signal connection  
>     else:
>         print("Connection failed")
> client = mqttClient.Client(client_id="producer",transport="tcp") #
> client.username_pw_set(user, password=password) #set username and password
> client.on_connect=on_connect
> # client.tls_set_context(context=context)
> client.connect(broker_address, port=port)
> client.loop_start()
> i=0
> while True:
>     time.sleep(1)
>     i=i+1
>     client.publish("test/data", "This is my test msg 123.",2,False)
>     print("Sent"+str(i))
> {code}
> The consumer Python script
> {code:python}
> import paho.mqtt.client as mqttClient
> import time
> import ssl
> Connected = False 
> broker_address= "localhost" 
> port = 1883
> user = "admin"
> password = "admin"
> def on_connect(client, userdata, flags, rc):
>     if rc == 0:
>         print("Connected to broker")
>         global Connected                #Use global variable
>         Connected = True                #Signal connection  
>         print(flags)
>         client.subscribe("test/data",2)
>     else:
>         print("Connection failed")
> def on_message(client, userdata, msg):
>     print(msg.topic+" "+str(msg.payload))
> client = 
> mqttClient.Client(client_id="id2",transport="tcp",clean_session=False) #
> client.username_pw_set(user, password=password) #set username and password
> client.on_connect=on_connect
> client.on_message=on_message
> # client.tls_set_context(context=context)
> client.connect(broker_address, port=port)
> client.loop_start()
> while Connected != True:
>     time.sleep(0.1)
> time.sleep(1000)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@activemq.apache.org
For additional commands, e-mail: issues-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to