Hello, I want to check if the snappy compression works well with the java Kafka client.
In order to handle this, I set up a small program. This program generate 1024 messages of readable data. Their size are of 1024 bytes each. I send these messages on tree new topics and after I check the size of these topic directly on the broker filesystem. You can find this program through the following java code : package unit_test.testCompress; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; /** * Can be use in order to execute some unit test on compression */ public class TestCompress { public static void compress(String type, String version){ Map<String,Object> configs = new HashMap<String,Object>(); configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); configs.put("producer.type", "async"); configs.put("compression.type", type); configs.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); configs.put("partitioner.class", "com.astellia.astkafkaproducer.RecordPartitioner"); configs.put("bootstrap.servers", "kafka:9092"); KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(configs); Random r = new Random(15415485); int size = 1024; //1 Ko byte[] buffer = new byte[size]; for(int i = 0; i < size; i++){ buffer[i] = (byte) ('A' + (r.nextInt() % 26)); } buffer[size-1] = 0; //System.out.println(new String(buffer)); for(int i = 0; i < size; i++ ){ Future<RecordMetadata> result = producer.send( new ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" + type , buffer)); } producer.close(); } public static void main(String[] args) { String version = "v10"; compress("snappy",version); compress("gzip",version); compress("none",version); } } I'm compiling this code with this following maven pom file : <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>unit_test</groupId> <artifactId>testCompress</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>testCompress</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2</version> </dependency> </dependencies> </project> This program executes very well on my computer. But when I check the results directly on my kafka broker through the command line tool "du" the space took by each topics. I found : - gzip topic is compressed that's ok - none topic is not compressed that's ok - but snappy topic is not compressed, that's not ok (screenshot can be found here : http://i.stack.imgur.com/7W1f5.png) I checked though vi the stored file and data are still clear. I'm aware about this issue on Kafka 8.2.1 : https://issues.apache.org/jira/browse/KAFKA-2189 But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker. I checked the dependency of Snappy as well. I'm using the 1.1.1.7 Have you an idea of how to enable snappy compression on Kafak ? Did I forget a parameter to enable snappy compression on kafka ? Are my kafka version not compatible ?