Hi, Xinyu
In the logs I see:
MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f
spp-driver-0 -c spp-driver | grep 'obtained consensus successfully'
INFO : JobModel version 53 obtained consensus successfully!
INFO : JobModel version 54 obtained consensus successfully!
MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f
spp-driver-1 -c spp-driver | grep 'obtained consensus successfully'
INFO : JobModel version 54 obtained consensus successfully!
MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs
spp-driver-2 -c spp-driver | grep 'obtained consensus successfully'
INFO : JobModel version 53 obtained consensus successfully!
INFO : JobModel version 54 obtained consensus successfully!
And yes, I can connect to ZK:
[zk: localhost:2181(CONNECTED) 1] ls
/app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/processors
[0000000041, 0000000043, 0000000042]
[zk: localhost:2181(CONNECTED) 5] ls
/app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/jobModelGeneration/jobModels
[45, 46, 47, 48, 49, 50, 51, 52, 53, 54]
[zk: localhost:2181(CONNECTED) 11] get
/app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/jobModelGeneration/jobModels/54
??t?{
"config" : { },
"containers" : {
"f2aa8e7f-2912-4030-b1ff-8b8962b6a48a" : {
"tasks" : {
"Partition 3" : {
"task-name" : "Partition 3",
"system-stream-partitions" : [ {
"system" : "kafka",
"partition" : 3,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
}, {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 3,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
} ],
"changelog-partition" : 5,
"task-mode" : "Active"
},
"Partition 6" : {
"task-name" : "Partition 6",
"system-stream-partitions" : [ {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 6,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
}, {
"system" : "kafka",
"partition" : 6,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
} ],
"changelog-partition" : 8,
"task-mode" : "Active"
},
"Partition 9" : {
"task-name" : "Partition 9",
"system-stream-partitions" : [ {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 9,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
}, {
"system" : "kafka",
"partition" : 9,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
} ],
"changelog-partition" : 11,
"task-mode" : "Active"
},
"Partition 10" : {
"task-name" : "Partition 10",
"system-stream-partitions" : [ {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 10,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
}, {
"system" : "kafka",
"partition" : 10,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
} ],
"changelog-partition" : 2,
"task-mode" : "Active"
}
},
"processor-id" : "f2aa8e7f-2912-4030-b1ff-8b8962b6a48a"
},
"49939588-6d2f-4b32-b462-b60cbbf193a2" : {
"tasks" : {
"Partition 0" : {
"task-name" : "Partition 0",
"system-stream-partitions" : [ {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 0,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
}, {
"system" : "kafka",
"partition" : 0,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
} ],
"changelog-partition" : 0,
"task-mode" : "Active"
},
"Partition 4" : {
"task-name" : "Partition 4",
"system-stream-partitions" : [ {
"system" : "kafka",
"partition" : 4,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
}, {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 4,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
} ],
"changelog-partition" : 6,
"task-mode" : "Active"
},
"Partition 7" : {
"task-name" : "Partition 7",
"system-stream-partitions" : [ {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 7,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
}, {
"system" : "kafka",
"partition" : 7,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
} ],
"changelog-partition" : 9,
"task-mode" : "Active"
},
"Partition 11" : {
"task-name" : "Partition 11",
"system-stream-partitions" : [ {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 11,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
}, {
"system" : "kafka",
"partition" : 11,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
} ],
"changelog-partition" : 3,
"task-mode" : "Active"
}
},
"processor-id" : "49939588-6d2f-4b32-b462-b60cbbf193a2"
},
"99ff20c4-fb5f-4ada-b3f8-446054036ce7" : {
"tasks" : {
"Partition 1" : {
"task-name" : "Partition 1",
"system-stream-partitions" : [ {
"system" : "kafka",
"partition" : 1,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
}, {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 1,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
} ],
"changelog-partition" : 1,
"task-mode" : "Active"
},
"Partition 2" : {
"task-name" : "Partition 2",
"system-stream-partitions" : [ {
"system" : "kafka",
"partition" : 2,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
}, {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 2,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
} ],
"changelog-partition" : 4,
"task-mode" : "Active"
},
"Partition 5" : {
"task-name" : "Partition 5",
"system-stream-partitions" : [ {
"system" : "kafka",
"partition" : 5,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
}, {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 5,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
} ],
"changelog-partition" : 7,
"task-mode" : "Active"
},
"Partition 8" : {
"task-name" : "Partition 8",
"system-stream-partitions" : [ {
"system" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_",
"partition" : 8,
"stream" :
"0-read_from_kafka_KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_"
}, {
"system" : "kafka",
"partition" : 8,
"stream" : "spp-test-driver-1-1-001-partition_by-gbk-7"
} ],
"changelog-partition" : 10,
"task-mode" : "Active"
}
},
"processor-id" : "99ff20c4-fb5f-4ada-b3f8-446054036ce7"
}
},
"max-change-log-stream-partitions" : 12,
"all-container-locality" : {
"49939588-6d2f-4b32-b462-b60cbbf193a2" : null,
"99ff20c4-fb5f-4ada-b3f8-446054036ce7" : null,
"f2aa8e7f-2912-4030-b1ff-8b8962b6a48a" : null
}
}
cZxid = 0xe000003be
ctime = Thu Aug 15 19:25:42 UTC 2019
mZxid = 0xe000003be
mtime = Thu Aug 15 19:25:42 UTC 2019
pZxid = 0xe000003be
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7677
numChildren = 0
[zk: localhost:2181(CONNECTED) 12]
Please, let me know, if you need any other info
------------------
Michael Benenson
From: Xinyu Liu <[email protected]>
Date: Thursday, August 15, 2019 at 2:38 PM
To: "Benenson, Mikhail" <[email protected]>, Prateek Maheshwari
<[email protected]>, Hai Lu <[email protected]>
Cc: "Deshpande, Omkar" <[email protected]>, "Ho, Tom"
<[email protected]>, "LeVeck, Matt" <[email protected]>, Xinyu Liu
<[email protected]>, Samarth Shetty <[email protected]>, "Audo,
Nicholas" <[email protected]>, "Cesar, Scott" <[email protected]>,
Miguel Sanchez <[email protected]>, "Bansal, Ritesh"
<[email protected]>, Jagadish Venkatraman <[email protected]>,
"[email protected]" <[email protected]>, Shanthoosh Venkataraman
<[email protected]>, Bharath Kumarasubramanian
<[email protected]>
Subject: Re: Beam w Samza Runner : problem with partition assignment.
This email is from an external sender.
+Shanthoosh and Bharath for more investigation.
@Michael: do you happen to have some tool that can connect to your zk? I am
wondering how many samza job instances you see there. And if will be helpful if
you can look into the JobModels node and see what's the last one looks like
(maybe compared with the previous ones too). If the leader was disconnected
from the followers, we should see the job model changing from assigning the
partitions across all 3 hosts to only one host (leader) has all the partitions.
Thanks,
Xinyu
________________________________
From: Benenson, Mikhail <[email protected]>
Sent: Thursday, August 15, 2019 1:27 PM
To: Xinyu Liu <[email protected]>; Prateek Maheshwari
<[email protected]>; Hai Lu <[email protected]>
Cc: Deshpande, Omkar <[email protected]>; Ho, Tom <[email protected]>;
LeVeck, Matt <[email protected]>; Xinyu Liu <[email protected]>;
Samarth Shetty <[email protected]>; Audo, Nicholas
<[email protected]>; Cesar, Scott <[email protected]>; Miguel
Sanchez <[email protected]>; Bansal, Ritesh <[email protected]>;
Jagadish Venkatraman <[email protected]>; [email protected]
<[email protected]>
Subject: Beam w Samza Runner : problem with partition assignment.
Hi, folks
We are running 3 instances of the Beam 2.13.0 with Samza Runner 1.1.0
application, that read from kafka topic with 12 partitions.
Two of these instances understand that there are 3 instances, and picks 4
partitions for processing: Instance 0: (3,6,9, 10) and instance 1: (1,2,5,8)
But the third instance (ZK leader) somehow think that it is the only running
instance, so it reads from all 12 partitions.
This cause the situation, that some records are processed twice, on different
instances, and this seems as incorrect behavior.
This problem is very hard to reproduce, but I have it running now in my
environment.
After one instance has been restarted, here is new partition assignment:
MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f
spp-driver-1 -c spp-driver | grep 'Starting table manager in task instance
Partition'
INFO : Starting table manager in task instance Partition 3
INFO : Starting table manager in task instance Partition 6
INFO : Starting table manager in task instance Partition 9
INFO : Starting table manager in task instance Partition 10
MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f
spp-driver-2 -c spp-driver | grep 'Starting table manager in task instance
Partition'
INFO : Starting table manager in task instance Partition 3
INFO : Starting table manager in task instance Partition 9
INFO : Starting table manager in task instance Partition 1
INFO : Starting table manager in task instance Partition 7
INFO : Starting table manager in task instance Partition 5
INFO : Starting table manager in task instance Partition 11
MTVL16092bfbe:data-strmprocess-samza-driver mbenenson$ kubectl logs -f
spp-driver-0 -c spp-driver | grep 'Starting table manager in task instance
Partition'
INFO : Starting table manager in task instance Partition 6
INFO : Starting table manager in task instance Partition 0
INFO : Starting table manager in task instance Partition 4
INFO : Starting table manager in task instance Partition 8
INFO : Starting table manager in task instance Partition 10
INFO : Starting table manager in task instance Partition 2
Looks like two instances think that there are only two instances, so each
process 6 partitions, but the third instance think there are three instances,
so it process 4 partitions, so partition 3 is read by two instances, so data
from partition 3 are processed twice.
Any idea, what could cause this issue?
Please, find the log from misbehaved node attached.
Extract from Instance 0 log:
logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:44.316; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 3
logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:44.317; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 6
logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:44.317; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 9
logs/problem/logs-0/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:44.317; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 10
Extract from Instance 1 log:
logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:27.957; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 1
logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:27.958; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 2
logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:27.958; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 5
logs/problem/logs-1/app.log: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:27.958; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 8
Extract from Instance 2 log:
INFO
ZkClient-EventThread-66-zk-cs.data-strmprocess-spp-api-usw2-ppd-e2e.svc.cluster.local:2181
- 2019-08-15 16:28:15.092; - org.apache.samza.zk.ZkJobCoordinator -
ZkJobCoordinator::onBecomeLeader - I became the leader
…
INFO
ZkClient-EventThread-66-zk-cs.data-strmprocess-spp-api-usw2-ppd-e2e.svc.cluster.local:2181
- 2019-08-15 16:29:41.401; - org.apache.samza.zk.ZkJobCoordinator -
ProcessorChangeHandler::handleChildChange - Path:
/app-spp-test-driver-1-1-001/spp-test-driver-1-1-001-coordinationData/processors
Current Children: [0000000036, 0000000038, 0000000037]
…
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.206; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 6
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 3
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 9
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 0
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 4
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 8
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 1
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 10
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 7
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 5
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 11
logs/problem/logs-2/app.log.1: INFO Samza StreamProcessor Container Thread-0 -
2019-08-15 16:31:29.207; - org.apache.samza.container.SamzaContainer - Starting
table manager in task instance Partition 2
------------------
Michael Benenson