mynameborat commented on a change in pull request #1050: SAMZA-2221: Use the
KafkaStreamSpec instead of the StreamSpec
URL: https://github.com/apache/samza/pull/1050#discussion_r301285981
##########
File path:
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
##########
@@ -461,24 +461,13 @@ public Integer offsetComparator(String offset1, String
offset2) {
@Override
public boolean createStream(StreamSpec streamSpec) {
LOG.info("Creating Kafka topic: {} on system: {}",
streamSpec.getPhysicalName(), streamSpec.getSystemName());
- final String REPL_FACTOR = "replication.factor";
KafkaStreamSpec kSpec = toKafkaSpec(streamSpec);
String topicName = kSpec.getPhysicalName();
// create topic.
NewTopic newTopic = new NewTopic(topicName, kSpec.getPartitionCount(),
(short) kSpec.getReplicationFactor());
-
- // specify the configs
- Map<String, String> streamConfig = new HashMap<>(streamSpec.getConfig());
- // HACK - replication.factor is invalid config for AdminClient.createTopics
- if (streamConfig.containsKey(REPL_FACTOR)) {
- String repl = streamConfig.get(REPL_FACTOR);
- LOG.warn("Configuration {}={} for topic={} is invalid. Using kSpec repl
factor {}",
- REPL_FACTOR, repl, kSpec.getPhysicalName(),
kSpec.getReplicationFactor());
- streamConfig.remove(REPL_FACTOR);
- }
- newTopic.configs(new MapConfig(streamConfig));
+
newTopic.configs(KafkaStreamSpec.filterUnsupportedProperties(kSpec.getConfig()));
Review comment:
It should be sufficient to pass `kSpec.getConfig()` and you don't need apply
filter again.
`toKafkaSpec(...)` takes care of filtering it unsupported properties.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services