Hello,

I want to check if the snappy compression works well with the java Kafka client.

In order to handle this, I set up a small program. This program
generate 1024 messages of readable data. Their size are of 1024 bytes
each. I send these messages on tree new topics and after I check the
size of these topic directly on the broker filesystem.

You can find this program through the following java code :

    package unit_test.testCompress;

    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.Future;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;


    /**
     * Can be use in order to execute some unit test on compression
     */
    public class TestCompress {

        public static void compress(String type, String version){
            Map<String,Object> configs = new HashMap<String,Object>();
            configs.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
            configs.put("producer.type", "async");
            configs.put("compression.type", type);
            configs.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
            configs.put("partitioner.class",
"com.astellia.astkafkaproducer.RecordPartitioner");
            configs.put("bootstrap.servers", "kafka:9092");


            KafkaProducer<String, byte[]> producer = new
KafkaProducer<String, byte[]>(configs);

            Random r = new Random(15415485);
            int size = 1024; //1 Ko
            byte[] buffer = new byte[size];
            for(int i = 0; i < size; i++){
                buffer[i] = (byte) ('A' + (r.nextInt() % 26));
            }
            buffer[size-1] = 0;
            //System.out.println(new String(buffer));
            for(int i = 0; i < size; i++ ){
                Future<RecordMetadata> result = producer.send( new
ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" +
type , buffer));
            }

            producer.close();
        }

        public static void main(String[] args) {

            String version = "v10";
            compress("snappy",version);
            compress("gzip",version);
            compress("none",version);

        }

    }


I'm compiling this code with this following maven pom file :

        <project xmlns="http://maven.apache.org/POM/4.0.0";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
      <modelVersion>4.0.0</modelVersion>

      <groupId>unit_test</groupId>
      <artifactId>testCompress</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>

      <name>testCompress</name>
      <url>http://maven.apache.org</url>

      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>

      <dependencies>
         <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.2</version>
        </dependency>
      </dependencies>
    </project>

This program executes very well on my computer.

But when I check the results directly on my kafka broker through the
command line tool "du" the space took by each topics. I found  :
- gzip topic is compressed that's ok
- none topic is not compressed that's ok
- but snappy topic is not compressed, that's not ok
(screenshot can be found here : http://i.stack.imgur.com/7W1f5.png)


I checked though vi the stored file and data are still clear.

I'm aware about this issue on Kafka 8.2.1 :
https://issues.apache.org/jira/browse/KAFKA-2189

But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker.

I checked the dependency of Snappy as well. I'm using the 1.1.1.7

Have you an idea of how to enable snappy compression on Kafak ?
Did I forget a parameter to enable snappy compression on kafka ?
Are my kafka version not compatible ?

Reply via email to