This is an automated email from the ASF dual-hosted git repository. nickpan47 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 9d9ebc7 SAMZA-398: Remove force NONE compression for changelog topic producer new d5f5494 Merge pull request #1492 from perkss/SAMZA-398-allow-compressed-changelog 9d9ebc7 is described below commit 9d9ebc70d127c9deca35e09d1056835ad4f73983 Author: perkss <stuart.m.pe...@gmail.com> AuthorDate: Tue Apr 13 21:30:15 2021 +0100 SAMZA-398: Remove force NONE compression for changelog topic producer --- .../src/main/java/org/apache/samza/config/StorageConfig.java | 12 ------------ .../test/java/org/apache/samza/config/TestStorageConfig.java | 12 ------------ .../src/main/scala/org/apache/samza/config/KafkaConfig.scala | 4 +--- .../org/apache/samza/system/kafka/KafkaSystemFactory.scala | 11 +---------- .../apache/samza/system/kafka/TestKafkaSystemFactory.scala | 12 ------------ .../framework/StreamApplicationIntegrationTestHarness.java | 1 + 6 files changed, 3 insertions(+), 49 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index a8b8702..548cc27 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -29,7 +29,6 @@ import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.execution.StreamManager; -import org.apache.samza.util.StreamUtil; import static com.google.common.base.Preconditions.*; @@ -241,17 +240,6 @@ public class StorageConfig extends MapConfig { } /** - * Helper method to check if a system has a changelog attached to it. - */ - public boolean isChangelogSystem(String systemName) { - return getStoreNames().stream() - .map(this::getChangelogStream) - .filter(Optional::isPresent) - .map(systemStreamName -> StreamUtil.getSystemStreamFromNames(systemStreamName.get()).getSystem()) - .anyMatch(system -> system.equals(systemName)); - } - - /** * Helper method to check if there is any stores configured w/ a changelog */ public boolean hasDurableStores() { diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java index 88fbbe0..ec9b263 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java @@ -258,18 +258,6 @@ public class TestStorageConfig { } @Test - public void testIsChangelogSystem() { - StorageConfig storageConfig = new StorageConfig(new MapConfig(ImmutableMap.of( - // store0 has a changelog stream - String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class", - String.format(CHANGELOG_STREAM, STORE_NAME0), "system0.changelog-stream", - // store1 does not have a changelog stream - String.format(StorageConfig.FACTORY, STORE_NAME1), "factory.class"))); - assertTrue(storageConfig.isChangelogSystem("system0")); - assertFalse(storageConfig.isChangelogSystem("other-system")); - } - - @Test public void testHasDurableStores() { // no changelog, which means no durable stores StorageConfig storageConfig = new StorageConfig( diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 69a9966..391536a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -381,14 +381,12 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { } def getKafkaSystemProducerConfig( systemName: String, - clientId: String, - injectedProps: Map[String, String] = Map()) = { + clientId: String) = { val subConf = config.subset("systems.%s.producer." format systemName, true) val producerProps = new util.HashMap[String, String]() producerProps.putAll(subConf) producerProps.put("client.id", clientId) - producerProps.putAll(injectedProps.asJava) new KafkaProducerConfig(systemName, clientId, producerProps) } } diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 8d1fd6b..a2773f6 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -32,14 +32,6 @@ import scala.collection.JavaConverters._ import org.apache.samza.util._ object KafkaSystemFactory extends Logging { - @VisibleForTesting - def getInjectedProducerProperties(systemName: String, config: Config) = if (new StorageConfig(config).isChangelogSystem(systemName)) { - warn("System name '%s' is being used as a changelog. Disabling compression since Kafka does not support compression for log compacted topics." format systemName) - Map[String, String]("compression.type" -> "none") - } else { - Map[String, String]() - } - val CLIENTID_PRODUCER_PREFIX = "kafka-producer" val CLIENTID_CONSUMER_PREFIX = "kafka-consumer" val CLIENTID_ADMIN_PREFIX = "kafka-admin-consumer" @@ -67,9 +59,8 @@ class KafkaSystemFactory extends SystemFactory with Logging { } def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = { - val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config) val clientId = KafkaConsumerConfig.createClientId(KafkaSystemFactory.CLIENTID_PRODUCER_PREFIX, config); - val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps) + val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) } diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala index 596d67b..ecbc00d 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala @@ -82,16 +82,4 @@ class TestKafkaSystemFactory { assertNotNull(producer) assertTrue(producer.isInstanceOf[KafkaSystemProducer]) } - - @Test - def testInjectedProducerProps { - val configMap = Map[String, String]( - StorageConfig.FACTORY.format("system1") -> "some.factory.Class", - StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1", - StorageConfig.FACTORY.format("system2") -> "some.factory.Class") - val config = new MapConfig(configMap.asJava) - assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system3", config)) - assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system2", config)) - assertEquals(Map[String, String]("compression.type" -> "none"), KafkaSystemFactory.getInjectedProducerProperties("system1", config)) - } } diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java index 0d1e49a..012eece 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java @@ -188,6 +188,7 @@ public class StreamApplicationIntegrationTestHarness extends IntegrationTestHarn configMap.put("systems.kafka.samza.key.serde", "string"); configMap.put("systems.kafka.samza.msg.serde", "string"); configMap.put("systems.kafka.samza.offset.default", "oldest"); + configMap.put("systems.kafka.producer.compression.type", "snappy"); configMap.put("job.coordinator.system", "kafka"); configMap.put("job.default.system", "kafka"); configMap.put("job.coordinator.replication.factor", "1");