[ https://issues.apache.org/jira/browse/KAFKA-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alex Fechner updated KAFKA-4597: -------------------------------- Description: Kafka topics might be configured to record timestamps of the messages produced. There are two different timestamps which might be stored: # Record *create time*: The time the record is created by the client. # Log *append time*: The time the record has been added to the log by the broker. The [ProducerRecord|https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html] docs state: {quote} In either of the cases above, the timestamp that has actually been used will be returned to user in RecordMetadata {quote} However I found the *create time* used in both cases. The following class creates two topics, one configured to store *create time*. The other used *log append time*. It produces 10 messages in each topic and outputs the timestamps from the record meta data as well as those fetched by a consumer client. {code:java} import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaTimestampTest { public static void main(String[] args) throws ExecutionException, InterruptedException { String ip = "10.0.10.8"; Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers", ip + ":9092"); producerProperties.put("acks", "all"); producerProperties.put("retries", 0); producerProperties.put("batch.size", 16384); producerProperties.put("linger.ms", 1); producerProperties.put("buffer.memory", 33554432); producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers", ip + ":9092"); consumerProperties.put("enable.auto.commit", "false"); consumerProperties.put("session.timeout.ms", "30000"); consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Producer<String, String> producer = new KafkaProducer<>(producerProperties); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties); ZkClient zkClient = new ZkClient(ip + ":2181", 10000, 10000, ZKStringSerializer$.MODULE$); ZkConnection zkConnection = new ZkConnection(ip + ":2181"); ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); TopicPartition topicPartitionWithCreateTime = new TopicPartition("test-topic-with-create-time", 0); TopicPartition topicPartitionWithLogAppendTime = new TopicPartition("test-topic-with-log-append-time", 0); // create topic with create time if (!AdminUtils.topicExists(zkUtils, topicPartitionWithCreateTime.topic())) { Properties topicProperties = new Properties(); topicProperties.put("message.timestamp.type", "CreateTime"); AdminUtils.createTopic(zkUtils, topicPartitionWithCreateTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$); } // create topic with log append time if (!AdminUtils.topicExists(zkUtils, topicPartitionWithLogAppendTime.topic())) { Properties topicProperties = new Properties(); topicProperties.put("message.timestamp.type", "LogAppendTime"); AdminUtils.createTopic(zkUtils, topicPartitionWithLogAppendTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$); } consumer.assign(Arrays.asList(topicPartitionWithLogAppendTime, topicPartitionWithCreateTime)); String format = "#%s, MetaDataOffset: %s, MetaDataTime: %s, ConsumerRecordOffset: %s, ConsumerRecordTime: %s"; System.out.println(String.format("Create messages into topic %s ...", topicPartitionWithCreateTime)); for (int i = 0; i < 10; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithCreateTime.topic(), topicPartitionWithCreateTime.partition(), "", "message")).get(); consumer.seek(topicPartitionWithCreateTime, recordMetadata.offset()); ConsumerRecord<String, String> consumerRecord = consumer.poll(1000).records(topicPartitionWithCreateTime).get(0); System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp())); } System.out.println(String.format("Create messages into topic %s...", topicPartitionWithLogAppendTime)); for (int i = 0; i < 10; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithLogAppendTime.topic(), topicPartitionWithLogAppendTime.partition(), "", "message")).get(); consumer.seek(topicPartitionWithLogAppendTime, recordMetadata.offset()); ConsumerRecord<String, String> consumerRecord = consumer.poll(1000).records(topicPartitionWithLogAppendTime).get(0); System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp())); } AdminUtils.deleteTopic(zkUtils, topicPartitionWithCreateTime.topic()); AdminUtils.deleteTopic(zkUtils, topicPartitionWithLogAppendTime.topic()); } } {code} The output shows that in case of *log append time* the timestamps differ. {code} Create messages into topic test-topic-with-create-time-0 ... #1, MetaDataOffset: 0, MetaDataTime: 1483623773788, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623773788 #2, MetaDataOffset: 1, MetaDataTime: 1483623774178, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774178 #3, MetaDataOffset: 2, MetaDataTime: 1483623774183, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623774183 #4, MetaDataOffset: 3, MetaDataTime: 1483623774188, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623774188 #5, MetaDataOffset: 4, MetaDataTime: 1483623774193, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623774193 #6, MetaDataOffset: 5, MetaDataTime: 1483623774197, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623774197 #7, MetaDataOffset: 6, MetaDataTime: 1483623774202, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623774202 #8, MetaDataOffset: 7, MetaDataTime: 1483623774207, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623774207 #9, MetaDataOffset: 8, MetaDataTime: 1483623774212, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623774212 #10, MetaDataOffset: 9, MetaDataTime: 1483623774217, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623774217 Create messages into topic test-topic-with-log-append-time-0... #1, MetaDataOffset: 0, MetaDataTime: 1483623774224, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623774992 #2, MetaDataOffset: 1, MetaDataTime: 1483623774230, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774997 #3, MetaDataOffset: 2, MetaDataTime: 1483623774235, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623775002 #4, MetaDataOffset: 3, MetaDataTime: 1483623774239, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623775007 #5, MetaDataOffset: 4, MetaDataTime: 1483623774244, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623775011 #6, MetaDataOffset: 5, MetaDataTime: 1483623774248, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623775015 #7, MetaDataOffset: 6, MetaDataTime: 1483623774253, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623775020 #8, MetaDataOffset: 7, MetaDataTime: 1483623774257, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623775024 #9, MetaDataOffset: 8, MetaDataTime: 1483623774262, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623775029 #10, MetaDataOffset: 9, MetaDataTime: 1483623774267, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623775034 {code} I assume the timestamps in the record meta data represent the create time but could not ensure that. was: Kafka topics might be configured recording timestamps of the messages produced. There are two different timestamps which might be stored: # Record *create time*: The time the record is created by the client. # Log *append time*: The time the record has been added to the log by the broker. The [ProducerRecord|https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html] docs state: {quote} In either of the cases above, the timestamp that has actually been used will be returned to user in RecordMetadata {quote} However I found the *create time* used in both cases. The following class creates two topics, one configured to store *create time*. The other used *log append time*. It produces 10 messages in each topic and outputs the timestamps from the record meta data as well as those fetched by a consumer client. {code:java} import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaTimestampTest { public static void main(String[] args) throws ExecutionException, InterruptedException { String ip = "10.0.10.8"; Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers", ip + ":9092"); producerProperties.put("acks", "all"); producerProperties.put("retries", 0); producerProperties.put("batch.size", 16384); producerProperties.put("linger.ms", 1); producerProperties.put("buffer.memory", 33554432); producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers", ip + ":9092"); consumerProperties.put("enable.auto.commit", "false"); consumerProperties.put("session.timeout.ms", "30000"); consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Producer<String, String> producer = new KafkaProducer<>(producerProperties); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties); ZkClient zkClient = new ZkClient(ip + ":2181", 10000, 10000, ZKStringSerializer$.MODULE$); ZkConnection zkConnection = new ZkConnection(ip + ":2181"); ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); TopicPartition topicPartitionWithCreateTime = new TopicPartition("test-topic-with-create-time", 0); TopicPartition topicPartitionWithLogAppendTime = new TopicPartition("test-topic-with-log-append-time", 0); // create topic with create time if (!AdminUtils.topicExists(zkUtils, topicPartitionWithCreateTime.topic())) { Properties topicProperties = new Properties(); topicProperties.put("message.timestamp.type", "CreateTime"); AdminUtils.createTopic(zkUtils, topicPartitionWithCreateTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$); } // create topic with log append time if (!AdminUtils.topicExists(zkUtils, topicPartitionWithLogAppendTime.topic())) { Properties topicProperties = new Properties(); topicProperties.put("message.timestamp.type", "LogAppendTime"); AdminUtils.createTopic(zkUtils, topicPartitionWithLogAppendTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$); } consumer.assign(Arrays.asList(topicPartitionWithLogAppendTime, topicPartitionWithCreateTime)); String format = "#%s, MetaDataOffset: %s, MetaDataTime: %s, ConsumerRecordOffset: %s, ConsumerRecordTime: %s"; System.out.println(String.format("Create messages into topic %s ...", topicPartitionWithCreateTime)); for (int i = 0; i < 10; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithCreateTime.topic(), topicPartitionWithCreateTime.partition(), "", "message")).get(); consumer.seek(topicPartitionWithCreateTime, recordMetadata.offset()); ConsumerRecord<String, String> consumerRecord = consumer.poll(1000).records(topicPartitionWithCreateTime).get(0); System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp())); } System.out.println(String.format("Create messages into topic %s...", topicPartitionWithLogAppendTime)); for (int i = 0; i < 10; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithLogAppendTime.topic(), topicPartitionWithLogAppendTime.partition(), "", "message")).get(); consumer.seek(topicPartitionWithLogAppendTime, recordMetadata.offset()); ConsumerRecord<String, String> consumerRecord = consumer.poll(1000).records(topicPartitionWithLogAppendTime).get(0); System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp())); } AdminUtils.deleteTopic(zkUtils, topicPartitionWithCreateTime.topic()); AdminUtils.deleteTopic(zkUtils, topicPartitionWithLogAppendTime.topic()); } } {code} The output shows that in case of *log append time* the timestamps differ. {code} Create messages into topic test-topic-with-create-time-0 ... #1, MetaDataOffset: 0, MetaDataTime: 1483623773788, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623773788 #2, MetaDataOffset: 1, MetaDataTime: 1483623774178, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774178 #3, MetaDataOffset: 2, MetaDataTime: 1483623774183, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623774183 #4, MetaDataOffset: 3, MetaDataTime: 1483623774188, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623774188 #5, MetaDataOffset: 4, MetaDataTime: 1483623774193, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623774193 #6, MetaDataOffset: 5, MetaDataTime: 1483623774197, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623774197 #7, MetaDataOffset: 6, MetaDataTime: 1483623774202, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623774202 #8, MetaDataOffset: 7, MetaDataTime: 1483623774207, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623774207 #9, MetaDataOffset: 8, MetaDataTime: 1483623774212, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623774212 #10, MetaDataOffset: 9, MetaDataTime: 1483623774217, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623774217 Create messages into topic test-topic-with-log-append-time-0... #1, MetaDataOffset: 0, MetaDataTime: 1483623774224, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623774992 #2, MetaDataOffset: 1, MetaDataTime: 1483623774230, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774997 #3, MetaDataOffset: 2, MetaDataTime: 1483623774235, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623775002 #4, MetaDataOffset: 3, MetaDataTime: 1483623774239, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623775007 #5, MetaDataOffset: 4, MetaDataTime: 1483623774244, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623775011 #6, MetaDataOffset: 5, MetaDataTime: 1483623774248, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623775015 #7, MetaDataOffset: 6, MetaDataTime: 1483623774253, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623775020 #8, MetaDataOffset: 7, MetaDataTime: 1483623774257, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623775024 #9, MetaDataOffset: 8, MetaDataTime: 1483623774262, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623775029 #10, MetaDataOffset: 9, MetaDataTime: 1483623774267, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623775034 {code} I assume the timestamps in the record meta data represent the create time but could not ensure that. > Record metadata returned by producer doesn*t consider log append time > --------------------------------------------------------------------- > > Key: KAFKA-4597 > URL: https://issues.apache.org/jira/browse/KAFKA-4597 > Project: Kafka > Issue Type: Bug > Components: clients, producer > Affects Versions: 0.10.1.1 > Reporter: Alex Fechner > Priority: Minor > > Kafka topics might be configured to record timestamps of the messages > produced. There are two different timestamps which might be stored: > # Record *create time*: The time the record is created by the client. > # Log *append time*: The time the record has been added to the log by the > broker. > The > [ProducerRecord|https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html] > docs state: > {quote} > In either of the cases above, the timestamp that has actually been used will > be returned to user in RecordMetadata > {quote} > However I found the *create time* used in both cases. > The following class creates two topics, one configured to store *create > time*. The other used *log append time*. It produces 10 messages in each > topic and outputs the timestamps from the record meta data as well as those > fetched by a consumer client. > {code:java} > import kafka.admin.AdminUtils; > import kafka.admin.RackAwareMode; > import kafka.utils.ZKStringSerializer$; > import kafka.utils.ZkUtils; > import org.I0Itec.zkclient.ZkClient; > import org.I0Itec.zkclient.ZkConnection; > import org.apache.kafka.clients.consumer.ConsumerRecord; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.Producer; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.clients.producer.RecordMetadata; > import org.apache.kafka.common.TopicPartition; > import java.util.Arrays; > import java.util.Properties; > import java.util.concurrent.ExecutionException; > public class KafkaTimestampTest { > public static void main(String[] args) throws ExecutionException, > InterruptedException { > String ip = "10.0.10.8"; > Properties producerProperties = new Properties(); > producerProperties.put("bootstrap.servers", ip + ":9092"); > producerProperties.put("acks", "all"); > producerProperties.put("retries", 0); > producerProperties.put("batch.size", 16384); > producerProperties.put("linger.ms", 1); > producerProperties.put("buffer.memory", 33554432); > producerProperties.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > producerProperties.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > Properties consumerProperties = new Properties(); > consumerProperties.put("bootstrap.servers", ip + ":9092"); > consumerProperties.put("enable.auto.commit", "false"); > consumerProperties.put("session.timeout.ms", "30000"); > consumerProperties.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > consumerProperties.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > Producer<String, String> producer = new > KafkaProducer<>(producerProperties); > KafkaConsumer<String, String> consumer = new > KafkaConsumer<>(consumerProperties); > ZkClient zkClient = new ZkClient(ip + ":2181", 10000, 10000, > ZKStringSerializer$.MODULE$); > ZkConnection zkConnection = new ZkConnection(ip + ":2181"); > ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); > TopicPartition topicPartitionWithCreateTime = new > TopicPartition("test-topic-with-create-time", 0); > TopicPartition topicPartitionWithLogAppendTime = new > TopicPartition("test-topic-with-log-append-time", 0); > // create topic with create time > if (!AdminUtils.topicExists(zkUtils, > topicPartitionWithCreateTime.topic())) { > Properties topicProperties = new Properties(); > topicProperties.put("message.timestamp.type", "CreateTime"); > AdminUtils.createTopic(zkUtils, > topicPartitionWithCreateTime.topic(), 1, 1, topicProperties, > RackAwareMode.Disabled$.MODULE$); > } > // create topic with log append time > if (!AdminUtils.topicExists(zkUtils, > topicPartitionWithLogAppendTime.topic())) { > Properties topicProperties = new Properties(); > topicProperties.put("message.timestamp.type", "LogAppendTime"); > AdminUtils.createTopic(zkUtils, > topicPartitionWithLogAppendTime.topic(), 1, 1, topicProperties, > RackAwareMode.Disabled$.MODULE$); > } > consumer.assign(Arrays.asList(topicPartitionWithLogAppendTime, > topicPartitionWithCreateTime)); > String format = "#%s, MetaDataOffset: %s, MetaDataTime: %s, > ConsumerRecordOffset: %s, ConsumerRecordTime: %s"; > System.out.println(String.format("Create messages into topic %s ...", > topicPartitionWithCreateTime)); > for (int i = 0; i < 10; i++) { > RecordMetadata recordMetadata = producer.send(new > ProducerRecord<>(topicPartitionWithCreateTime.topic(), > topicPartitionWithCreateTime.partition(), "", "message")).get(); > consumer.seek(topicPartitionWithCreateTime, > recordMetadata.offset()); > ConsumerRecord<String, String> consumerRecord = > consumer.poll(1000).records(topicPartitionWithCreateTime).get(0); > System.out.println(String.format(format, i + 1, > recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), > consumerRecord.timestamp())); > } > System.out.println(String.format("Create messages into topic %s...", > topicPartitionWithLogAppendTime)); > for (int i = 0; i < 10; i++) { > RecordMetadata recordMetadata = producer.send(new > ProducerRecord<>(topicPartitionWithLogAppendTime.topic(), > topicPartitionWithLogAppendTime.partition(), "", "message")).get(); > consumer.seek(topicPartitionWithLogAppendTime, > recordMetadata.offset()); > ConsumerRecord<String, String> consumerRecord = > consumer.poll(1000).records(topicPartitionWithLogAppendTime).get(0); > System.out.println(String.format(format, i + 1, > recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), > consumerRecord.timestamp())); > } > AdminUtils.deleteTopic(zkUtils, topicPartitionWithCreateTime.topic()); > AdminUtils.deleteTopic(zkUtils, > topicPartitionWithLogAppendTime.topic()); > } > } > {code} > The output shows that in case of *log append time* the timestamps differ. > {code} > Create messages into topic test-topic-with-create-time-0 ... > #1, MetaDataOffset: 0, MetaDataTime: 1483623773788, ConsumerRecordOffset: 0, > ConsumerRecordTime: 1483623773788 > #2, MetaDataOffset: 1, MetaDataTime: 1483623774178, ConsumerRecordOffset: 1, > ConsumerRecordTime: 1483623774178 > #3, MetaDataOffset: 2, MetaDataTime: 1483623774183, ConsumerRecordOffset: 2, > ConsumerRecordTime: 1483623774183 > #4, MetaDataOffset: 3, MetaDataTime: 1483623774188, ConsumerRecordOffset: 3, > ConsumerRecordTime: 1483623774188 > #5, MetaDataOffset: 4, MetaDataTime: 1483623774193, ConsumerRecordOffset: 4, > ConsumerRecordTime: 1483623774193 > #6, MetaDataOffset: 5, MetaDataTime: 1483623774197, ConsumerRecordOffset: 5, > ConsumerRecordTime: 1483623774197 > #7, MetaDataOffset: 6, MetaDataTime: 1483623774202, ConsumerRecordOffset: 6, > ConsumerRecordTime: 1483623774202 > #8, MetaDataOffset: 7, MetaDataTime: 1483623774207, ConsumerRecordOffset: 7, > ConsumerRecordTime: 1483623774207 > #9, MetaDataOffset: 8, MetaDataTime: 1483623774212, ConsumerRecordOffset: 8, > ConsumerRecordTime: 1483623774212 > #10, MetaDataOffset: 9, MetaDataTime: 1483623774217, ConsumerRecordOffset: 9, > ConsumerRecordTime: 1483623774217 > Create messages into topic test-topic-with-log-append-time-0... > #1, MetaDataOffset: 0, MetaDataTime: 1483623774224, ConsumerRecordOffset: 0, > ConsumerRecordTime: 1483623774992 > #2, MetaDataOffset: 1, MetaDataTime: 1483623774230, ConsumerRecordOffset: 1, > ConsumerRecordTime: 1483623774997 > #3, MetaDataOffset: 2, MetaDataTime: 1483623774235, ConsumerRecordOffset: 2, > ConsumerRecordTime: 1483623775002 > #4, MetaDataOffset: 3, MetaDataTime: 1483623774239, ConsumerRecordOffset: 3, > ConsumerRecordTime: 1483623775007 > #5, MetaDataOffset: 4, MetaDataTime: 1483623774244, ConsumerRecordOffset: 4, > ConsumerRecordTime: 1483623775011 > #6, MetaDataOffset: 5, MetaDataTime: 1483623774248, ConsumerRecordOffset: 5, > ConsumerRecordTime: 1483623775015 > #7, MetaDataOffset: 6, MetaDataTime: 1483623774253, ConsumerRecordOffset: 6, > ConsumerRecordTime: 1483623775020 > #8, MetaDataOffset: 7, MetaDataTime: 1483623774257, ConsumerRecordOffset: 7, > ConsumerRecordTime: 1483623775024 > #9, MetaDataOffset: 8, MetaDataTime: 1483623774262, ConsumerRecordOffset: 8, > ConsumerRecordTime: 1483623775029 > #10, MetaDataOffset: 9, MetaDataTime: 1483623774267, ConsumerRecordOffset: 9, > ConsumerRecordTime: 1483623775034 > {code} > I assume the timestamps in the record meta data represent the create time > but could not ensure that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)