This is an automated email from the ASF dual-hosted git repository. pmaheshwari pushed a commit to branch 1.3.0 in repository https://gitbox.apache.org/repos/asf/samza.git
commit 0de2385540747af500efcf134cf80f0748ffff40 Author: xinyuiscool <[email protected]> AuthorDate: Thu Oct 17 14:12:45 2019 -0700 SAMZA-2352: Use min.compaction.lag.ms to avoid compacting the Kafka changelog topic (#1190) --- .../main/java/org/apache/samza/config/StorageConfig.java | 16 ++++++++++++++++ .../java/org/apache/samza/config/TestStorageConfig.java | 14 ++++++++++++++ .../main/scala/org/apache/samza/config/KafkaConfig.scala | 12 +++++++++++- .../scala/org/apache/samza/config/TestKafkaConfig.scala | 15 ++++++++++++++- 4 files changed, 55 insertions(+), 2 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index 86c7e7d..2b7ce02 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -31,6 +31,8 @@ import org.apache.samza.SamzaException; import org.apache.samza.execution.StreamManager; import org.apache.samza.util.StreamUtil; +import static com.google.common.base.Preconditions.*; + /** * Config helper methods related to storage. @@ -45,6 +47,7 @@ public class StorageConfig extends MapConfig { public static final String MSG_SERDE = STORE_PREFIX + "%s.msg.serde"; public static final String CHANGELOG_STREAM = STORE_PREFIX + "%s" + CHANGELOG_SUFFIX; public static final String ACCESSLOG_STREAM_SUFFIX = "access-log"; + // TODO: setting replication.factor seems not working as in KafkaConfig. public static final String CHANGELOG_REPLICATION_FACTOR = STORE_PREFIX + "%s.changelog.replication.factor"; public static final String CHANGELOG_MAX_MSG_SIZE_BYTES = STORE_PREFIX + "%s.changelog.max.message.size.bytes"; public static final int DEFAULT_CHANGELOG_MAX_MSG_SIZE_BYTES = 1048576; @@ -52,6 +55,10 @@ public class StorageConfig extends MapConfig { public static final boolean DEFAULT_DISALLOW_LARGE_MESSAGES = false; public static final String DROP_LARGE_MESSAGES = STORE_PREFIX + "%s.drop.large.messages"; public static final boolean DEFAULT_DROP_LARGE_MESSAGES = false; + // The log compaction lag time for transactional state change log + public static final String MIN_COMPACTION_LAG_MS = "min.compaction.lag.ms"; + public static final String CHANGELOG_MIN_COMPACTION_LAG_MS = STORE_PREFIX + "%s.changelog." + MIN_COMPACTION_LAG_MS; + public static final long DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS = TimeUnit.HOURS.toMillis(4); static final String CHANGELOG_SYSTEM = "job.changelog.system"; static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms"; @@ -207,6 +214,15 @@ public class StorageConfig extends MapConfig { return getBoolean(String.format(DROP_LARGE_MESSAGES, storeName), DEFAULT_DROP_LARGE_MESSAGES); } + public long getChangelogMinCompactionLagMs(String storeName) { + String minCompactLagConfigName = String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, storeName); + // Avoid the inconsistency of overriding using stores.x.changelog.kafka... + checkArgument(get("stores." + storeName + ".changelog.kafka." + MIN_COMPACTION_LAG_MS) == null, + "Use " + minCompactLagConfigName + " to set kafka min.compaction.lag.ms property."); + + return getLong(minCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS); + } + /** * Helper method to check if a system has a changelog attached to it. */ diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java index e094de2..713aa49 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java @@ -25,6 +25,7 @@ import java.util.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import java.util.concurrent.TimeUnit; import org.apache.samza.SamzaException; import org.junit.Test; @@ -296,4 +297,17 @@ public class TestStorageConfig { new MapConfig(ImmutableMap.of(String.format(StorageConfig.DROP_LARGE_MESSAGES, STORE_NAME0), "true"))); assertEquals(true, storageConfig.getDropLargeMessages(STORE_NAME0)); } + + @Test + public void testGetChangelogMinCompactionLagMs() { + // empty config, return default lag ms + assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS, + new StorageConfig(new MapConfig()).getChangelogMinCompactionLagMs(STORE_NAME0)); + + long lagOverride = TimeUnit.HOURS.toMillis(6); + StorageConfig storageConfig = new StorageConfig( + new MapConfig(ImmutableMap.of(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0), + String.valueOf(lagOverride)))); + assertEquals(lagOverride, storageConfig.getChangelogMinCompactionLagMs(STORE_NAME0)); + } } diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index f8051f2..75fbb6b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -336,8 +336,18 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name)) } + val storageConfig = new StorageConfig(config) kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE) - kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name))) + kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(storageConfig.getChangeLogDeleteRetentionInMs(name))) + + // To enable transactional state, we will need to avoid the head of the changelog + // (the messages after last checkpoint) being log-compacted so we can trim the rest of the updates. + // We use min.compaction.log.ms to control the compaction time. + if (new TaskConfig(this).getTransactionalStateRestoreEnabled) { + kafkaChangeLogProperties.setProperty(StorageConfig.MIN_COMPACTION_LAG_MS, + String.valueOf(storageConfig.getChangelogMinCompactionLagMs(name))) + } + filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) } kafkaChangeLogProperties } diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index ea6c3f8..8558a85 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -20,6 +20,7 @@ package org.apache.samza.config import java.util.Properties +import java.util.concurrent.TimeUnit import org.apache.samza.config.factories.PropertiesConfigFactory import org.junit.Assert._ @@ -82,11 +83,11 @@ class TestKafkaConfig { @Test def testChangeLogProperties() { + props.setProperty("job.changelog.system", SYSTEM_NAME) props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory") props.setProperty("stores.test1.changelog", "kafka.mychangelog1") props.setProperty("stores.test2.changelog", "kafka.mychangelog2") props.setProperty("stores.test2.changelog.max.message.bytes", "1024000") - props.setProperty("job.changelog.system", "kafka") props.setProperty("stores.test3.changelog", "otherstream") props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete") props.setProperty("stores.test4.rocksdb.ttl.ms", "3600") @@ -107,6 +108,7 @@ class TestKafkaConfig { assertEquals("otherstream", storeToChangelog.getOrDefault("test3", "")) assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms")) assertNull(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms")) + assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms")) props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory") val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores() @@ -138,6 +140,17 @@ class TestKafkaConfig { String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("max.message.bytes"), KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES) + + // test compaction config for transactional state + val lagOverride = String.valueOf(TimeUnit.HOURS.toMillis(6)) + props.setProperty(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true") + props.setProperty("stores.test2.changelog.min.compaction.lag.ms", lagOverride) + val tsMapConfig = new MapConfig(props.asScala.asJava) + val tsKafkaConfig = new KafkaConfig(tsMapConfig) + assertEquals(String.valueOf(StorageConfig.DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS), + tsKafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms")) + assertEquals(lagOverride, + tsKafkaConfig.getChangelogKafkaProperties("test2").getProperty("min.compaction.lag.ms")) } @Test
