Repository: kafka
Updated Branches:
  refs/heads/0.11.0 d46513d2c -> ae170a6a1


KAFKA-5020; Update message format in implementation docs

Author: Apurva Mehta <apu...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3360 from apurvam/KAFKA-5020-message-format-docs-update

(cherry picked from commit 6471822079e0bf378ff5bc83f4bd3b71ac0582cf)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ae170a6a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ae170a6a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ae170a6a

Branch: refs/heads/0.11.0
Commit: ae170a6a16bb2bdd9546f41929babaa8250429ca
Parents: d46513d
Author: Apurva Mehta <apu...@confluent.io>
Authored: Fri Jun 30 09:22:25 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jun 30 09:22:59 2017 -0700

----------------------------------------------------------------------
 docs/implementation.html | 125 ++++++++++++++++++++++++++----------------
 1 file changed, 79 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ae170a6a/docs/implementation.html
----------------------------------------------------------------------
diff --git a/docs/implementation.html b/docs/implementation.html
index 2cf401a..af234ea 100644
--- a/docs/implementation.html
+++ b/docs/implementation.html
@@ -22,61 +22,94 @@
     </p>
     <h3><a id="messages" href="#messages">5.2 Messages</a></h3>
     <p>
-    Messages consist of a fixed-size header, a variable length opaque key byte 
array and a variable length opaque value byte array. The header contains the 
following fields:
-    <ul>
-        <li> A CRC32 checksum to detect corruption or truncation. </li>
-        <li> A format version. </li>
-        <li> An attributes identifier </li>
-        <li> A timestamp </li>
-    </ul>
-    Leaving the key and value opaque is the right decision: there is a great 
deal of progress being made on serialization libraries right now, and any 
particular choice is unlikely to be right for all uses. Needless to say a 
particular application using Kafka would likely mandate a particular 
serialization type as part of its usage. The <code>MessageSet</code> interface 
is simply an iterator over messages with specialized methods for bulk reading 
and writing to an NIO <code>Channel</code>.
+    Messages consist of a variable-length header, a variable length opaque key 
byte array and a variable length opaque value byte array. The format of the 
header is described in the following section.
+    Leaving the key and value opaque is the right decision: there is a great 
deal of progress being made on serialization libraries right now, and any 
particular choice is unlikely to be right for all uses. Needless to say a 
particular application using Kafka would likely mandate a particular 
serialization type as part of its usage. The <code>RecordBatch</code> interface 
is simply an iterator over messages with specialized methods for bulk reading 
and writing to an NIO <code>Channel</code>.</p>
 
     <h3><a id="messageformat" href="#messageformat">5.3 Message Format</a></h3>
+    <p>
+    Messages (aka Records) are always written in batches. The technical term 
for a batch of messages is a record batch, and a record batch contains one or 
more records. In the degenerate case, we could have a record batch containing a 
single record.
+    Record batches and records have their own headers. The format of each is 
described below for Kafka version 0.11.0 and later (message format version v2, 
or magic=2). <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets";>Click
 here</a> for details about message formats 0 and 1.</p>
+
+    <h4><a id="recordbatch" href="#recordbatch">5.3.1 Record Batch</a></h4>
+       <p> The following is the on-disk format of a RecordBatch. </p>
+       <p><pre class="brush: java;">
+               baseOffset: int64
+               batchLength: int32
+               partitionLeaderEpoch: int32
+               magic: int8 (current magic value is 2)
+               crc: int32
+               attributes: int16
+                       bit 0~2:
+                               0: no compression
+                               1: gzip
+                               2: snappy
+                               3: lz4
+                       bit 3: timestampType
+                       bit 4: isTransactional (0 means not transactional)
+                       bit 5: isControlBatch (0 means not a control batch)
+                       bit 6~15: unused
+               lastOffsetDelta: int32
+               firstTimestamp: int64
+               maxTimestamp: int64
+               producerId: int64
+               producerEpoch: int16
+               baseSequence: int32
+               records: [Record]
+       </pre></p>
+    <p> Note that when compression is enabled, the compressed record data is 
serialized directly following the count of the number of records. </p>
+
+    <p>The CRC covers the data from the attributes to the end of the batch 
(i.e. all the bytes that follow the CRC). It is located after the magic byte, 
which
+    means that clients must parse the magic byte before deciding how to 
interpret the bytes between the batch length and the magic byte. The partition 
leader
+    epoch field is not included in the CRC computation to avoid the need to 
recompute the CRC when this field is assigned for every batch that is received 
by
+    the broker. The CRC-32C (Castagnoli) polynomial is used for the 
computation.</p>
+
+    <p>On compaction: unlike the older message formats, magic v2 and above 
preserves the first and last offset/sequence numbers from the original batch 
when the log is cleaned. This is required in order to be able to restore the
+    producer's state when the log is reloaded. If we did not retain the last 
sequence number, for example, then after a partition leader failure, the 
producer might see an OutOfSequence error. The base sequence number must
+    be preserved for duplicate checking (the broker checks incoming Produce 
requests for duplicates by verifying that the first and last sequence numbers 
of the incoming batch match the last from that producer). As a result,
+    it is possible to have empty batches in the log when all the records in 
the batch are cleaned but batch is still retained in order to preserve a 
producer's last sequence number. One oddity here is that the baseTimestamp
+    field is not preserved during compaction, so it will change if the first 
record in the batch is compacted away.</p>
+
+    <h5><a id="controlbatch" href="#controlbatch">5.3.1.1 Control 
Batches</a></h5>
+    <p>A control batch contains a single record called the control record. 
Control records should not be passed on to applications. Instead, they are used 
by consumers to filter out aborted transactional messages.</p>
+    <p> The key of a control record conforms to the following schema: </p>
+    <p><pre class="brush: java">
+       version: int16 (current version is 0)
+       type: int16 (0 indicates an abort marker, 1 indicates a commit)
+    </pre></p>
+    <p>The schema for the value of a control record is dependent on the type. 
The value is opaque to clients.</p>
+
+       <h4><a id="record" href="#record">5.3.2 Record</a></h4>
+       <p>Record level headers were introduced in Kafka 0.11.0. The on-disk 
format of a record with Headers is delineated below. </p>
+       <p><pre class="brush: java;">
+               length: varint
+               attributes: int8
+                       bit 0~7: unused
+               timestampDelta: varint
+               offsetDelta: varint
+               keyLength: varint
+               key: byte[]
+               valueLen: varint
+               value: byte[]
+               Headers => [Header]
+       </pre></p>
+       <h5><a id="recordheader" href="#recordheader">5.4.2.1 Record 
Header</a></h5>
+       <p><pre class="brush: java;">
+               headerKeyLength: varint
+               headerKey: String
+               headerValueLength: varint
+               Value: byte[]
+       </pre></p>
+    <p>We use the the same varint encoding as Protobuf. More information on 
the latter can be found <a 
href="https://developers.google.com/protocol-buffers/docs/encoding#varints";>here</a>.
 The count of headers in a record
+    is also encoded as a varint.</p>
 
-    <pre class="brush: java;">
-       /**
-        * 1. 4 byte CRC32 of the message
-        * 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
-        * 3. 1 byte "attributes" identifier to allow annotations on the 
message independent of the version
-        *    bit 0 ~ 2 : Compression codec.
-        *      0 : no compression
-        *      1 : gzip
-        *      2 : snappy
-        *      3 : lz4
-        *    bit 3 : Timestamp type
-        *      0 : create time
-        *      1 : log append time
-        *    bit 4 ~ 7 : reserved
-        * 4. (Optional) 8 byte timestamp only if "magic" identifier is greater 
than 0
-        * 5. 4 byte key length, containing length K
-        * 6. K byte key
-        * 7. 4 byte payload length, containing length V
-        * 8. V byte payload
-        */
-    </pre>
-    </p>
     <h3><a id="log" href="#log">5.4 Log</a></h3>
     <p>
     A log for a topic named "my_topic" with two partitions consists of two 
directories (namely <code>my_topic_0</code> and <code>my_topic_1</code>) 
populated with data files containing the messages for that topic. The format of 
the log files is a sequence of "log entries""; each log entry is a 4 byte 
integer <i>N</i> storing the message length which is followed by the <i>N</i> 
message bytes. Each message is uniquely identified by a 64-bit integer 
<i>offset</i> giving the byte position of the start of this message in the 
stream of all messages ever sent to that topic on that partition. The on-disk 
format of each message is given below. Each log file is named with the offset 
of the first message it contains. So the first file created will be 
00000000000.kafka, and each additional file will have an integer name roughly 
<i>S</i> bytes from the previous file where <i>S</i> is the max log file size 
given in the configuration.
     </p>
     <p>
-    The exact binary format for messages is versioned and maintained as a 
standard interface so message sets can be transferred between producer, broker, 
and client without recopying or conversion when desirable. This format is as 
follows:
+    The exact binary format for records is versioned and maintained as a 
standard interface so record batches can be transferred between producer, 
broker, and client without recopying or conversion when desirable. The previous 
section included details about the on-disk format of records.</p>
     </p>
-    <pre class="brush: java;">
-    On-disk format of a message
-
-    offset         : 8 bytes 
-    message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K 
+ 4 + V)
-    crc            : 4 bytes
-    magic value    : 1 byte
-    attributes     : 1 byte
-    timestamp      : 8 bytes (Only exists when magic value is greater than 
zero)
-    key length     : 4 bytes
-    key            : K bytes
-    value length   : 4 bytes
-    value          : V bytes
-    </pre>
-    <p>
+   <p>
     The use of the message offset as the message id is unusual. Our original 
idea was to use a GUID generated by the producer, and maintain a mapping from 
GUID to offset on each broker. But since a consumer must maintain an ID for 
each server, the global uniqueness of the GUID provides no value. Furthermore, 
the complexity of maintaining the mapping from a random id to an offset 
requires a heavy weight index structure which must be synchronized with disk, 
essentially requiring a full persistent random-access data structure. Thus to 
simplify the lookup structure we decided to use a simple per-partition atomic 
counter which could be coupled with the partition id and node id to uniquely 
identify a message; this makes the lookup structure simpler, though multiple 
seeks per consumer request are still likely. However once we settled on a 
counter, the jump to directly using the offset seemed natural&mdash;both after 
all are monotonically increasing integers unique to a partition. Since the
  offset is hidden from the consumer API this decision is ultimately an 
implementation detail and we went with the more efficient approach.
     </p>
     <img class="centered" src="/{{version}}/images/kafka_log.png">

Reply via email to