This is an automated email from the ASF dual-hosted git repository.

nickpan47 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d9ebc7  SAMZA-398: Remove force NONE compression for changelog topic 
producer
     new d5f5494  Merge pull request #1492 from 
perkss/SAMZA-398-allow-compressed-changelog
9d9ebc7 is described below

commit 9d9ebc70d127c9deca35e09d1056835ad4f73983
Author: perkss <stuart.m.pe...@gmail.com>
AuthorDate: Tue Apr 13 21:30:15 2021 +0100

    SAMZA-398: Remove force NONE compression for changelog topic producer
---
 .../src/main/java/org/apache/samza/config/StorageConfig.java | 12 ------------
 .../test/java/org/apache/samza/config/TestStorageConfig.java | 12 ------------
 .../src/main/scala/org/apache/samza/config/KafkaConfig.scala |  4 +---
 .../org/apache/samza/system/kafka/KafkaSystemFactory.scala   | 11 +----------
 .../apache/samza/system/kafka/TestKafkaSystemFactory.scala   | 12 ------------
 .../framework/StreamApplicationIntegrationTestHarness.java   |  1 +
 6 files changed, 3 insertions(+), 49 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index a8b8702..548cc27 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -29,7 +29,6 @@ import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.execution.StreamManager;
-import org.apache.samza.util.StreamUtil;
 
 import static com.google.common.base.Preconditions.*;
 
@@ -241,17 +240,6 @@ public class StorageConfig extends MapConfig {
   }
 
   /**
-   * Helper method to check if a system has a changelog attached to it.
-   */
-  public boolean isChangelogSystem(String systemName) {
-    return getStoreNames().stream()
-        .map(this::getChangelogStream)
-        .filter(Optional::isPresent)
-        .map(systemStreamName -> 
StreamUtil.getSystemStreamFromNames(systemStreamName.get()).getSystem())
-        .anyMatch(system -> system.equals(systemName));
-  }
-
-  /**
    * Helper method to check if there is any stores configured w/ a changelog
    */
   public boolean hasDurableStores() {
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 88fbbe0..ec9b263 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -258,18 +258,6 @@ public class TestStorageConfig {
   }
 
   @Test
-  public void testIsChangelogSystem() {
-    StorageConfig storageConfig = new StorageConfig(new 
MapConfig(ImmutableMap.of(
-        // store0 has a changelog stream
-        String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class",
-        String.format(CHANGELOG_STREAM, STORE_NAME0), 
"system0.changelog-stream",
-        // store1 does not have a changelog stream
-        String.format(StorageConfig.FACTORY, STORE_NAME1), "factory.class")));
-    assertTrue(storageConfig.isChangelogSystem("system0"));
-    assertFalse(storageConfig.isChangelogSystem("other-system"));
-  }
-
-  @Test
   public void testHasDurableStores() {
     // no changelog, which means no durable stores
     StorageConfig storageConfig = new StorageConfig(
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 69a9966..391536a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -381,14 +381,12 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
   }
 
   def getKafkaSystemProducerConfig( systemName: String,
-                                    clientId: String,
-                                    injectedProps: Map[String, String] = 
Map()) = {
+                                    clientId: String) = {
 
     val subConf = config.subset("systems.%s.producer." format systemName, true)
     val producerProps = new util.HashMap[String, String]()
     producerProps.putAll(subConf)
     producerProps.put("client.id", clientId)
-    producerProps.putAll(injectedProps.asJava)
     new KafkaProducerConfig(systemName, clientId, producerProps)
   }
 }
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 8d1fd6b..a2773f6 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -32,14 +32,6 @@ import scala.collection.JavaConverters._
 import org.apache.samza.util._
 
 object KafkaSystemFactory extends Logging {
-  @VisibleForTesting
-  def getInjectedProducerProperties(systemName: String, config: Config) = if 
(new StorageConfig(config).isChangelogSystem(systemName)) {
-    warn("System name '%s' is being used as a changelog. Disabling compression 
since Kafka does not support compression for log compacted topics." format 
systemName)
-    Map[String, String]("compression.type" -> "none")
-  } else {
-    Map[String, String]()
-  }
-
   val CLIENTID_PRODUCER_PREFIX = "kafka-producer"
   val CLIENTID_CONSUMER_PREFIX = "kafka-consumer"
   val CLIENTID_ADMIN_PREFIX = "kafka-admin-consumer"
@@ -67,9 +59,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
   }
 
   def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemProducer = {
-    val injectedProps = 
KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
     val clientId = 
KafkaConsumerConfig.createClientId(KafkaSystemFactory.CLIENTID_PRODUCER_PREFIX, 
config);
-    val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId, injectedProps)
+    val producerConfig = config.getKafkaSystemProducerConfig(systemName, 
clientId)
     val getProducer = () => {
       new KafkaProducer[Array[Byte], 
Array[Byte]](producerConfig.getProducerProperties)
     }
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
index 596d67b..ecbc00d 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
@@ -82,16 +82,4 @@ class TestKafkaSystemFactory {
     assertNotNull(producer)
     assertTrue(producer.isInstanceOf[KafkaSystemProducer])
   }
-
-  @Test
-  def testInjectedProducerProps {
-    val configMap = Map[String, String](
-      StorageConfig.FACTORY.format("system1") -> "some.factory.Class",
-      StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1",
-      StorageConfig.FACTORY.format("system2") -> "some.factory.Class")
-    val config = new MapConfig(configMap.asJava)
-    assertEquals(Map[String, String](), 
KafkaSystemFactory.getInjectedProducerProperties("system3", config))
-    assertEquals(Map[String, String](), 
KafkaSystemFactory.getInjectedProducerProperties("system2", config))
-    assertEquals(Map[String, String]("compression.type" -> "none"), 
KafkaSystemFactory.getInjectedProducerProperties("system1", config))
-  }
 }
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 0d1e49a..012eece 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -188,6 +188,7 @@ public class StreamApplicationIntegrationTestHarness 
extends IntegrationTestHarn
     configMap.put("systems.kafka.samza.key.serde", "string");
     configMap.put("systems.kafka.samza.msg.serde", "string");
     configMap.put("systems.kafka.samza.offset.default", "oldest");
+    configMap.put("systems.kafka.producer.compression.type", "snappy");
     configMap.put("job.coordinator.system", "kafka");
     configMap.put("job.default.system", "kafka");
     configMap.put("job.coordinator.replication.factor", "1");

Reply via email to