xinyuiscool commented on a change in pull request #950: SAMZA-2126: Bug fixes
for batch-mode generated stream specs
URL: https://github.com/apache/samza/pull/950#discussion_r264771950
##########
File path:
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
##########
@@ -106,8 +106,8 @@ class KafkaSystemFactory extends SystemFactory with
Logging {
if (appConfig.getAppMode == ApplicationMode.BATCH) {
val streamConfig = new StreamConfig(config)
streamConfig.getStreamIds().filter(streamConfig.getIsIntermediateStream(_)).map(streamId
=> {
+ // only the override here
val properties = new Properties()
- properties.putAll(streamConfig.getStreamProperties(streamId))
Review comment:
Right, previous the code was blindly use the intermediate stream properties
as the kafka properties. Now I change it to an override so I only pass in extra
properties here.
The problem in previous impl is that Kafka topic creation needs to filter
out some properties, and it happens in the creation of KafkaStreamSpec. If the
intermeidate stream properties were fully added, it will contain the properties
that need to be filtered out again.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services