Repository: samza Updated Branches: refs/heads/master 810d8bd80 -> 78ee98261
SAMZA-1600: remove the combination of cleanup policy "compact,delete"⦠⦠in changelog topic properties Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Reviewers: Jagadish <jagad...@apache.org> Closes #435 from nickpan47/SAMZA-1600 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/78ee9826 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/78ee9826 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/78ee9826 Branch: refs/heads/master Commit: 78ee98261debd13a5f2a7fb2ea6408c63f9c4c66 Parents: 810d8bd Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Authored: Mon Mar 5 19:40:45 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Mon Mar 5 19:40:45 2018 -0800 ---------------------------------------------------------------------- .../scala/org/apache/samza/config/KafkaConfig.scala | 15 +++++++++------ .../org/apache/samza/config/TestKafkaConfig.scala | 10 +++++----- 2 files changed, 14 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/78ee9826/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 1c1cdbd..124c85a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -250,12 +250,15 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val kafkaChangeLogProperties = new Properties val appConfig = new ApplicationConfig(config) - if (appConfig.getAppMode == ApplicationMode.STREAM) { - kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") - } else{ - kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete") - kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) - } + // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.3, + // 1.0.2, or 1.1.0 (see KAFKA-6568) + // if (appConfig.getAppMode == ApplicationMode.STREAM) { + // kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") + // } else{ + // kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete") + // kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) + // } + kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE) kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name))) filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) } http://git-wip-us.apache.org/repos/asf/samza/blob/78ee9826/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 19b2cc6..a4fe686 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -143,13 +143,13 @@ class TestKafkaConfig { val batchMapConfig = new MapConfig(props.asScala.asJava) val batchKafkaConfig = new KafkaConfig(batchMapConfig) assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete") - assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"), + assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("delete.retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) - assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact,delete") - assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"), + assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact") + assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("delete.retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) - assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact,delete") - assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("retention.ms"), + assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact") + assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("delete.retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) }