This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 3c2863c1e2074f3132feaa6c315ec109726198d0
Author: Shanthoosh Venkataraman <[email protected]>
AuthorDate: Tue Jan 14 16:20:29 2020 -0800

    Fix the coordinator stream creation workflow.
---
 .../samza/system/kafka/KafkaSystemAdmin.java       | 10 +++---
 .../system/kafka/TestKafkaSystemAdminJava.java     | 39 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 5 deletions(-)

diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index 97229db..e5d6af1 100644
--- 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -464,19 +464,19 @@ public class KafkaSystemAdmin implements SystemAdmin {
     LOG.info("Creating Kafka topic: {} on system: {}", 
streamSpec.getPhysicalName(), streamSpec.getSystemName());
     final String REPL_FACTOR = "replication.factor";
 
-    KafkaStreamSpec kSpec = toKafkaSpec(streamSpec);
-    String topicName = kSpec.getPhysicalName();
+    KafkaStreamSpec kafkaStreamSpec = toKafkaSpec(streamSpec);
+    String topicName = kafkaStreamSpec.getPhysicalName();
 
     // create topic.
-    NewTopic newTopic = new NewTopic(topicName, kSpec.getPartitionCount(), 
(short) kSpec.getReplicationFactor());
+    NewTopic newTopic = new NewTopic(topicName, 
kafkaStreamSpec.getPartitionCount(), (short) 
kafkaStreamSpec.getReplicationFactor());
 
     // specify the configs
-    Map<String, String> streamConfig = new HashMap<>(streamSpec.getConfig());
+    Map<String, String> streamConfig = new 
HashMap<>(kafkaStreamSpec.getConfig());
     // HACK - replication.factor is invalid config for AdminClient.createTopics
     if (streamConfig.containsKey(REPL_FACTOR)) {
       String repl = streamConfig.get(REPL_FACTOR);
       LOG.warn("Configuration {}={} for topic={} is invalid. Using kSpec repl 
factor {}",
-          REPL_FACTOR, repl, kSpec.getPhysicalName(), 
kSpec.getReplicationFactor());
+          REPL_FACTOR, repl, kafkaStreamSpec.getPhysicalName(), 
kafkaStreamSpec.getReplicationFactor());
       streamConfig.remove(REPL_FACTOR);
     }
     newTopic.configs(new MapConfig(streamConfig));
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 1431600..7ca03f3 100644
--- 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,15 +19,22 @@
 
 package org.apache.samza.system.kafka;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.samza.Partition;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
@@ -62,6 +69,38 @@ public class TestKafkaSystemAdminJava extends 
TestKafkaSystemAdmin {
   private static final String TEST_OFFSET = "10";
 
   @Test
+  public void 
testCreateStreamShouldCoordinatorStreamWithCorrectTopicProperties() throws 
Exception {
+    String coordinatorTopicName = String.format("topic-name-%s", 
RandomStringUtils.randomAlphabetic(5));
+    StreamSpec coordinatorStreamSpec = 
KafkaStreamSpec.createCoordinatorStreamSpec(coordinatorTopicName, SYSTEM());
+
+    boolean hasCreatedStream = 
systemAdmin().createStream(coordinatorStreamSpec);
+
+    assertTrue(hasCreatedStream);
+
+    Map<String, String> coordinatorTopicProperties = 
getTopicConfigFromKafkaBroker(coordinatorTopicName);
+
+    assertEquals("compact", 
coordinatorTopicProperties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+    assertEquals("26214400", 
coordinatorTopicProperties.get(TopicConfig.SEGMENT_BYTES_CONFIG));
+    assertEquals("86400000", 
coordinatorTopicProperties.get(TopicConfig.DELETE_RETENTION_MS_CONFIG));
+  }
+
+  private static Map<String, String> getTopicConfigFromKafkaBroker(String 
topicName) throws Exception {
+    List<ConfigResource> configResourceList = ImmutableList.of(
+        new ConfigResource(ConfigResource.Type.TOPIC, topicName));
+    Map<ConfigResource, org.apache.kafka.clients.admin.Config> 
configResourceConfigMap =
+        adminClient().describeConfigs(configResourceList).all().get();
+    Map<String, String> kafkaTopicConfig = new HashMap<>();
+
+    configResourceConfigMap.values().forEach(configEntry -> {
+      configEntry.entries().forEach(config -> {
+          kafkaTopicConfig.put(config.name(), config.value());
+      });
+    });
+
+    return kafkaTopicConfig;
+  }
+
+  @Test
   public void testGetOffsetsAfter() {
     SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM, TOPIC, new 
Partition(0));
     SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM, TOPIC, new 
Partition(1));

Reply via email to