rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r797236185
##########
File path:
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -172,7 +172,10 @@ class SystemConsumers (
def start {
for ((systemStreamPartition, offset) <- sspToRegisteredOffsets.asScala) {
val consumer = consumers(systemStreamPartition.getSystem)
- consumer.register(systemStreamPartition, offset)
+ // If elasticity is enabled then the RunLoop gives SSP with keybucket
+ // but the actual systemConsumer which consumes from the input does not
know about KeyBucket.
+ // hence, use an SSP without KeyBucket
+ consumer.register(new
SystemStreamPartition(systemStreamPartition.getSystemStream,
systemStreamPartition.getPartition), offset)
Review comment:
Is there another way we can do this, adding this discrepancy adds
additional confusion
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]