Hi all,
There are huge latency gap between arm and x86 when running 
kafka-producer-perf-test.sh with compression set, especially  when the codec is 
zstd. So I looked into the source code. It seems the response from broker 
server is too slow for arm.  And the 
LogValidator#validateMessagesAndAssignOffsetsCompressed takes too much time.

I have read source code and know that broker will validate Messages from 
producer by invoking LogValidator#validateMessagesAndAssignOffsetsCompressed  
in case it’s compressed.  It seems the decompress is processed by 
compressedIterator.  I think decompress is called when iterator.next()) is 
invoked. Do I misunderstand this code?

public Iterator<Record> iterator() {
    if (count() == 0)
        return Collections.emptyIterator();

    if (!isCompressed())
        return uncompressedIterator();
    // for a normal iterator, we cannot ensure that the underlying compression 
stream is closed,
    // so we decompress the full record set here. Use cases which call for a 
lower memory footprint
    // can use `streamingIterator` at the cost of additional complexity
    try (CloseableIterator<Record> iterator = 
compressedIterator(BufferSupplier.NO_CACHING, false)) {
        List<Record> records = new ArrayList<>(count());
        while (iterator.hasNext())
            records.add(iterator.next());
        return records.iterator();
    }
}

compressedIterator will return StreamRecordIterator. And there is a method 
doReadRecord which calls DefaultRecord.readFrom. Does the decompression 
procedure occur in DefaultRecord.readFrom? I am really confused.

private CloseableIterator<Record> compressedIterator(BufferSupplier 
bufferSupplier, boolean skipKeyValue) {
    final ByteBuffer buffer = this.buffer.duplicate();
    buffer.position(RECORDS_OFFSET);

    final DataInputStream inputStream = new 
DataInputStream(compressionType().wrapForInput(buffer, magic(),
        bufferSupplier));

    if (skipKeyValue) {
        // this buffer is used to skip length delimited fields like key, value, 
headers
        byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];

        return new StreamRecordIterator(inputStream) {
            @Override
            protected Record doReadRecord(long baseOffset, long firstTimestamp, 
int baseSequence, Long logAppendTime) throws IOException {
                return DefaultRecord.readPartiallyFrom(inputStream, skipArray, 
baseOffset, firstTimestamp, baseSequence, logAppendTime);
            }
        };
    } else {
        return new StreamRecordIterator(inputStream) {
            @Override
            protected Record doReadRecord(long baseOffset, long firstTimestamp, 
int baseSequence, Long logAppendTime) throws IOException {
                return DefaultRecord.readFrom(inputStream, baseOffset, 
firstTimestamp, baseSequence, logAppendTime);
            }
        };
    }
}

In method compressedIterator ,take zstd as an example , the wrapForInput is


public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
    try {
        return (InputStream) ZstdConstructors.INPUT.invoke(new 
ByteBufferInputStream(buffer));
    } catch (Throwable e) {
        throw new KafkaException(e);
    }
},

Method wrapForInput returns a InputStream.  For ZstdInputStream, it invokes 
decompressStream when read() or read(byte[], int , int ) is called.

In readFrom, I add some log to estimate the time consumed. Below is an example 
for estimating time consumed by ByteUtils.readVarint(input); And I get time for 
arm is 3966, time for x86 is 1484.
// i and sum are static.
296     public static DefaultRecord readFrom(DataInput input,
297                                          long baseOffset,
298                                          long baseTimestamp,
299                                          int baseSequence,
300                                          Long logAppendTime) throws 
IOException {
301         long startTime = 0, endTime = 0;
302         if (i >= 5000 && i < 6000) {
303           startTime = System.nanoTime() ;
304          }
// compute the time consumed by ByteUtils.readVarint
305         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
306         if (i >= 5000 && i < 6000) {
307            long end_time = System.nanoTime();
308            sum += end_time -startTime;
309         }
310         if (i == 6000)
311           log.warn("#### readVarint {} ", sum / 1000 );
312
313         ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
314         input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes);
315         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 
sizeOfBodyInBytes;
316         i++;
317         return readFrom(recordBuffer, totalSizeInBytes, sizeOfBodyInBytes, 
baseOffset, baseTimestamp,
318                 baseSequence, logAppendTime);
319     }

Method readVarint is

public static int readVarint(DataInput in) throws IOException {
    int value = readUnsignedVarint(in);
    return (value >>> 1) ^ -(value & 1);
}
which should be not so huge gap between ARM and X86 if it’s just some >>> and ^ 
operation.

I am stuck in here. Can any one give some tips about how effectively debug this 
issue?  And where is exactly the decompress called ?



The default compress level for zstd is 3, and I got there latency by lzbench,
time (us)
compress
x86
arm
arm/x86
zstd
63875
88423
1.384313
snappy
36838
43936
1.192681
lz4
26125
34254
1.311158
decompress
x86
arm
arm/x86
zstd
14255
25599
1.795791
snappy
12569
18588
1.478877
lz4
7005
11475
1.638116

And the latency got by kafka-producer-perf-test.sh has a huge gap.
Avarage latency (ms)
compression
x86
arm
gzip
1.2
19.43
snappy
7.44
346.79
lz4
4.65
352.67
zstd
7.1
3692.8

Best Wishes,
Jiamei

IMPORTANT NOTICE: The contents of this email and any attachments are 
confidential and may also be privileged. If you are not the intended recipient, 
please notify the sender immediately and do not disclose the contents to any 
other person, use it for any purpose, or store or copy the information in any 
medium. Thank you.

Reply via email to