This is an automated email from the ASF dual-hosted git repository. lhaiesp pushed a commit to branch 1.3.1 in repository https://gitbox.apache.org/repos/asf/samza.git
commit cccca7cf137ee168eed07238f5deeb0f60d9710d Author: shanthoosh <[email protected]> AuthorDate: Fri Jan 17 10:19:34 2020 -0800 Fix the RocksDB TTL type conversion in change log properties generation. (#1254) --- .../scala/org/apache/samza/config/KafkaConfig.scala | 2 +- .../org/apache/samza/config/TestKafkaConfig.scala | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 3b5f5f3..69a9966 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -324,7 +324,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { // - Set topic TTL to be the same as RocksDB TTL Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match { case Some(rocksDbTtl) => - if (!rocksDbTtl.isEmpty && rocksDbTtl.toInt < 0) { + if (!rocksDbTtl.isEmpty && rocksDbTtl.toLong < 0) { kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name)) } else if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) { diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 00b103d..64b476b 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -105,6 +105,25 @@ class TestKafkaConfig { } @Test + def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForLargeTTLStores(): Unit = { + val props = new Properties + props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092") + props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/") + props.setProperty(JobConfig.JOB_NAME, "jobName") + + props.setProperty("stores.test1.changelog", "kafka.mychangelog1") + // Set the RocksDB TTL to be 28 days. + props.setProperty("stores.test1.rocksdb.ttl.ms", "2419200000") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1") + assertEquals("delete", kafkaProperties.getProperty("cleanup.policy")) + assertEquals("536870912", kafkaProperties.getProperty("segment.bytes")) + assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms")) + } + + @Test def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForStoresWithEmptyRocksDBTTL(): Unit = { val props = new Properties props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
