Re: Question about Kafka
Hello, Any suggestion regarding this msg: " org.apache.kafka.common.errors.TimeoutException: Expiring 61 record(s) for due to 30001 ms has passed since batch creation plus linger time " Thanks in advance Maha From: "MAHA ALSAYASNEH" To: "users" Sent: Tuesday, September 19, 2017 6:18:25 PM Subject: Re: Question about Kafka Well I kept the defualt: log.retention.hours=168 Here are my broker configurations: # Server Basics # # The id of the broker. This must be set to a unique integer for each broker. broker.id=3 host.name= port=9092 zookeeper.connect=xxx:2181,:2181,:2181 #The maximum size of message that the server can receive message.max.bytes=224 eplica.fetch.max.bytes=224 request.timeout.ms=30 log.flush.interval.ms=1 log.flush.interval.messages=2 request.timeout.ms=30 #replica.socket.timeout.ms=6 #linger.ms=3 # Switch to enable topic deletion or not, default value is false delete.topic.enable=true # Socket Server Settings # # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = security_protocol://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://x.x.x.X:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 # The number of threads handling network requests num.network.threads=4 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 # Log Basics # # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=8 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 # Log Flush Policy # # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=1 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 # Log Retention Policy # # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. #log.retention.ms=60 # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=536870912 # log.segment.bytes=2147483648 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies #log.retention.check.interval.ms=6 # Zookeeper # # Zookeeper
Re: Kafka Client Consumes Last Committed Message On Restart
Hi, If you are using commitSync(Map offsets) api, then the committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1. https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync(java.util.Map) On Wed, Sep 20, 2017 at 12:23 AM, Manan G wrote: > Hello, > > I am using Kafka broker and Java client library v 0.11.0.0. > > When I restart my Kafka consumer application which uses Java Kafka client > library to retrieve messages, I notice that for each partition, the message > associated with the last offset that was committed successfully gets > re-consumed. I am "not" using auto-commit feature of the Java Kafka client > library. > > So for example, for some topic partition 1 - > 1. Consumer application commits offset 100 manually. > 2. Consumer application gets restarted. > 3. Consumer polls for messages from Kafka. This time, the first message > polled from Kafka is the one with offset 100. Shouldn't the new batch of > messages polled from Kafka start from offset 101? > > This causes duplication of messages - one for each partition on application > restart. I understand that message duplication is possible in other > scenarios too, but this particular behavior coming from the client library > seemed strange. In my case, I am writing a framework for our use case, and > we would like to avoid this specific message duplication scenario. > > Questions: > * Wondering if there is any reason for this or am I missing something in my > code. > * If this was done by design, what would be easiest way for user > application to circumvent this behavior? > > Thanks in advance. >
Re: KSQL with Apache Kafka
Those prerequisites are just for the Confluent CLI used in the quickstart. The Apache Kafka and Zookeeper versions included in the Confluent distribution are the latest and the same as the Apache Kafka download so it will work. You will just need to start Zookeeper and Kafka with the shell scripts in the ./bin directory rather than just typing “confluent start” as it says in the quickstart documentation. -hans > On Sep 19, 2017, at 8:41 PM, Koert Kuipers wrote: > > we are using the other components of confluent platform without installing > the confluent platform, and its no problem at all. i dont see why it would > be any different with this one. > >> On Tue, Sep 19, 2017 at 1:38 PM, Buntu Dev wrote: >> >> Based on the prerequisites mentioned on Github, Confluent platform seems to >> be required for using KSQL: >> >> https://github.com/confluentinc/ksql/blob/0.1.x/ >> docs/quickstart/quickstart-non-docker.md#non-docker-setup-for-ksql >> >> >> Did anyone try KSQL against vanilla Apache Kafka? >> >> >> Thanks! >>
Re: KSQL with Apache Kafka
we are using the other components of confluent platform without installing the confluent platform, and its no problem at all. i dont see why it would be any different with this one. On Tue, Sep 19, 2017 at 1:38 PM, Buntu Dev wrote: > Based on the prerequisites mentioned on Github, Confluent platform seems to > be required for using KSQL: > > https://github.com/confluentinc/ksql/blob/0.1.x/ > docs/quickstart/quickstart-non-docker.md#non-docker-setup-for-ksql > > > Did anyone try KSQL against vanilla Apache Kafka? > > > Thanks! >
Re: Change replication factor for a topic in the runtime
You can do this using the kafka-reassign-partitions tool (or using a 3rd party tool like kafka-assigner in github.com/linkedin/kafka-tools) to explicitly assign the partitions to an extra replica, or remove a replica. -Todd On Tue, Sep 19, 2017 at 3:45 PM, Devendar Rao wrote: > Is it possible to change the replication factor in runtime? We're using > 10.x version. > > Thanks, > Devendar > -- *Todd Palino* Senior Staff Engineer, Site Reliability Data Infrastructure Streaming linkedin.com/in/toddpalino
Change replication factor for a topic in the runtime
Is it possible to change the replication factor in runtime? We're using 10.x version. Thanks, Devendar
Kafka SSL error
Hello All - I was able to set up SSL for the Kafka brokers, using OpenSSL. however, I'm having issues with setting up SSL using the pem file (i.e. SSL certificate - certified by CA, provided by the company) Here is what i've done - created the server/client keystore & truststore files and imported the provided cert.pem file keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file cert.pem keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file cert.pem keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file cert.pem keytool -keystore kafka.client.keystore.jks -alias localhost -validity 365 -genkey keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file cert.pem I've a console producer pushing data in to the topic, and gives error as shown below -> Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence > violation, state = 1, type = 1 > at > sun.security.ssl.ServerHandshaker.processMessage(ServerHandshaker.java:213) > at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) > at sun.security.ssl.Handshaker$1.run(Handshaker.java:966) > at sun.security.ssl.Handshaker$1.run(Handshaker.java:963) > at java.security.AccessController.doPrivileged(Native Method) > at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416) > at > org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:336) > at > org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:417) > ... 7 more Any ideas on what the issue might be ? thanks for help in advance!
Re: Classloading Error with Kotlin and Streams
> On Sep 15, 2017, at 23:08, Amir Nagri wrote: > > Were you able to resolve above? No, not yet. And I haven’t had a chance to open that JIRA ticket… sorry about that. Will try to get to it soon. Software Architect @ Park Assist » http://tech.parkassist.com/
Kafka Client Consumes Last Committed Message On Restart
Hello, I am using Kafka broker and Java client library v 0.11.0.0. When I restart my Kafka consumer application which uses Java Kafka client library to retrieve messages, I notice that for each partition, the message associated with the last offset that was committed successfully gets re-consumed. I am "not" using auto-commit feature of the Java Kafka client library. So for example, for some topic partition 1 - 1. Consumer application commits offset 100 manually. 2. Consumer application gets restarted. 3. Consumer polls for messages from Kafka. This time, the first message polled from Kafka is the one with offset 100. Shouldn't the new batch of messages polled from Kafka start from offset 101? This causes duplication of messages - one for each partition on application restart. I understand that message duplication is possible in other scenarios too, but this particular behavior coming from the client library seemed strange. In my case, I am writing a framework for our use case, and we would like to avoid this specific message duplication scenario. Questions: * Wondering if there is any reason for this or am I missing something in my code. * If this was done by design, what would be easiest way for user application to circumvent this behavior? Thanks in advance.
KSQL with Apache Kafka
Based on the prerequisites mentioned on Github, Confluent platform seems to be required for using KSQL: https://github.com/confluentinc/ksql/blob/0.1.x/docs/quickstart/quickstart-non-docker.md#non-docker-setup-for-ksql Did anyone try KSQL against vanilla Apache Kafka? Thanks!
Replication throttling
Hi, Kafka Users, In the documentation for replication throttling it is mentioned that it should be removed after partitions moved or a broker completed bootstrap ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas#KIP-73ReplicationQuotas-2.HowdoIthrottleabootstrappingbroker? and https://kafka.apache.org/documentation/ ) In an assumption that throttling is configured based on network or drive throughput it seems to be a good idea to have it always enabled. Why is it recommended to turn it off? In addition, we noticed that during broker bootstrap if throttling is not configured and broker got overloaded then we face NotLeaderForPartitionException for a prolonged time span. It doesn't happen replication throttling 10-20% below max throughput. Best, Ivan Simonenko
Re: Question about Kafka
Well I kept the defualt: log.retention.hours=168 Here are my broker configurations: # Server Basics # # The id of the broker. This must be set to a unique integer for each broker. broker.id=3 host.name= port=9092 zookeeper.connect=xxx:2181,:2181,:2181 #The maximum size of message that the server can receive message.max.bytes=224 eplica.fetch.max.bytes=224 request.timeout.ms=30 log.flush.interval.ms=1 log.flush.interval.messages=2 request.timeout.ms=30 #replica.socket.timeout.ms=6 #linger.ms=3 # Switch to enable topic deletion or not, default value is false delete.topic.enable=true # Socket Server Settings # # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = security_protocol://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://x.x.x.X:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 # The number of threads handling network requests num.network.threads=4 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 # Log Basics # # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=8 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 # Log Flush Policy # # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=1 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 # Log Retention Policy # # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. #log.retention.ms=60 # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=536870912 # log.segment.bytes=2147483648 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies #log.retention.check.interval.ms=6 # Zookeeper # # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. # Timeout in ms for connecting to zoo
Re: Question about Kafka
What is the retention time on the topic you are publishing to? From: MAHA ALSAYASNEH Sent: Tuesday, September 19, 2017 10:25:15 AM To: users@kafka.apache.org Subject: Question about Kafka Hello, I'm using Kafka 0.10.1.1 I set up my cluster Kafka + zookeeper on three nodes (three brokers, one topic, 6 partitions, 3 replicas) When I send messages using Kafka producer (independent node), sometimes I get this error and I couldn't figure out how to solve it. " org.apache.kafka.common.errors.TimeoutException: Expiring 61 record(s) for due to 30001 ms has passed since batch creation plus linger time " Could you please help. Thanks in advance Maha
Re: implementing kafka transactions : performance issue
Hi Apurva, My transactions are pretty small : only one producer.send to kafka in this particular case (even if I have tested with up to 100) The producer code is embedded in an app linked with JDBC connection to some Database. I tested kafka-producer-perf-test.sh : not sure to clearly understand the results. Latency is higher than expected but consistant with what I have with my app (Each time my app enters producer.commitTransaction() it takes 100- 200ms to get out) To say it differently, when I push 1 message in kafka with transaction I am able to push 40 messages without transactions : what is wrong ? the way I consider transactions (too small ?) my app ? Any thoughts ? Thanks Results from kafka-producer-perf-test.sh (on my laptop ..., not a production cluster !) usr/local/kafka/bin/kafka-producer-perf-test.sh --topic my_topic --num-records 6000 --throughput 300 --producer-props bootstrap.servers=tpg59:9092,tpg59:9093,tpg59:9094 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer --record-size 80 --print-metrics --transactional-id test --transaction-duration-ms 5 1481 records sent, 294,1 records/sec (0,02 MB/sec), 54,8 ms avg latency, 116,0 max latency. 1511 records sent, 300,5 records/sec (0,02 MB/sec), 53,3 ms avg latency, 153,0 max latency. 1514 records sent, 300,0 records/sec (0,02 MB/sec), 45,5 ms avg latency, 101,0 max latency. 6000 records sent, 298,834545 records/sec (0,02 MB/sec), 50,41 ms avg latency, 153,00 ms max latency, 45 ms 50th, 91 ms 95th, 114 ms 99th, 153 ms 99.9th. Metric Name Value kafka-metrics-count:count:{client-id=producer-1}: 76,000 producer-metrics:batch-size-avg:{client-id=producer-1}: 646,526 producer-metrics:batch-size-max:{client-id=producer-1}: 1574,000 producer-metrics:batch-split-rate:{client-id=producer-1}: 0,000 producer-metrics:buffer-available-bytes:{client-id=producer-1}: 33554432,000 producer-metrics:buffer-exhausted-rate:{client-id=producer-1}: 0,000 producer-metrics:buffer-total-bytes:{client-id=producer-1}: 33554432,000 producer-metrics:bufferpool-wait-ratio:{client-id=producer-1}: 0,000 producer-metrics:compression-rate-avg:{client-id=producer-1}: 1,000 producer-metrics:connection-close-rate:{client-id=producer-1}: 0,000 producer-metrics:connection-count:{client-id=producer-1}: 4,000 producer-metrics:connection-creation-rate:{client-id=producer-1}: 0,079 producer-metrics:incoming-byte-rate:{client-id=producer-1}: 1555,399 producer-metrics:io-ratio:{client-id=producer-1}: 0,004 producer-metrics:io-time-ns-avg:{client-id=producer-1}: 38931,988 producer-metrics:io-wait-ratio:{client-id=producer-1}: 0,258 producer-metrics:io-wait-time-ns-avg:{client-id=producer-1}: 2866393,133 producer-metrics:metadata-age:{client-id=producer-1}: 20,071 producer-metrics:network-io-rate:{client-id=producer-1}: 76,761 producer-metrics:outgoing-byte-rate:{client-id=producer-1}: 14062,380 producer-metrics:produce-throttle-time-avg:{client-id=producer-1}: 0,000 producer-metrics:produce-throttle-time-max:{client-id=producer-1}: 0,000 producer-metrics:record-error-rate:{client-id=producer-1}: 0,000 producer-metrics:record-queue-time-avg:{client-id=producer-1}: 36,874 producer-metrics:record-queue-time-max:{client-id=producer-1}: 131,000 producer-metrics:record-retry-rate:{client-id=producer-1}: 0,000 producer-metrics:record-send-rate:{client-id=producer-1}: 119,950 producer-metrics:record-size-avg:{client-id=producer-1}: 166,000 producer-metrics:record-size-max:{client-id=producer-1}: 166,000 producer-metrics:records-per-request-avg:{client-id=producer-1}: 6,579 producer-metrics:request-latency-avg:{client-id=producer-1}: 12,739 producer-metrics:request-latency-max:{client-id=producer-1}: 86,000 producer-metrics:request-rate:{client-id=producer-1}: 38,381 producer-metrics:request-size-avg:{client-id=producer-1}: 366,386 producer-metrics:request-size-max:{client-id=producer-1}: 1636,000 producer-metrics:requests-in-flight:{client-id=producer-1}: 0,000 producer-metrics:response-rate:{client-id=producer-1}: 38,383 producer-metrics:select-rate:{client-id=producer-1}: 90,160 producer-metrics:waiting-threads:{client-id=producer-1}: 0,000 producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node--1} : 10,390 producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node-1} : 959,564 producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node-2} : 296,117 producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node-3} : 295,731 producer-node-metrics:outgoing-byte-rat
Question about Kafka
Hello, I'm using Kafka 0.10.1.1 I set up my cluster Kafka + zookeeper on three nodes (three brokers, one topic, 6 partitions, 3 replicas) When I send messages using Kafka producer (independent node), sometimes I get this error and I couldn't figure out how to solve it. " org.apache.kafka.common.errors.TimeoutException: Expiring 61 record(s) for due to 30001 ms has passed since batch creation plus linger time " Could you please help. Thanks in advance Maha
Why doesn't MirrorMaker apply the Exactly once and transactional semantics at 0.11.x?
Hi Everyone, Since Exactly once and transactional semantics is the most important and proudest feature at 0.11.x version, why doesn’t MirrorMaker apply the Exactly once and transactional semantics at 0.11.x? I think it will shake our confidence to upgrading to 0.11.x to use the idempotent semantics on production work. Thanks & Have a good day! -- Yang Cui FreeWheel | Beijing +86 1381-1441-685
Re: Upgraded brokers from 0.9.0.1 -> 0.10.0.1: how to upgrade message format to 0.10.0.1?
Ah, cool, thanks Ismael! --John On Tue, Sep 19, 2017 at 10:20 AM, Ismael Juma wrote: > 0.10.0.1 consumers understand the older formats. So, the conversion only > happens when the message format is newer than what the consumer > understands. For the producer side, the conversion is not particularly > costly since the data is in the heap and, if you use compression, 0.9.0.x > would do recompression either way. > > Ismael > > On Tue, Sep 19, 2017 at 2:41 PM, John Yost wrote: > > > Hi Everyone, > > > > We recently upgraded our cluster from 0.9.0.1 to 0.10.0.1 but had to keep > > our Kafka clients at 0.9.0.1. We now want to upgrade our clients and, > > concurrently, the message version to 0.10.0.1. > > > > When we did the 0.9.0.1 -> 0.10.0.1 broker upgrade we were not able to > > upgrade the kafka clients to 0.10.0.1, and we did not set the message > > format. As a result the 0.9.0.1 -> 0.10.0.1 format conversion for both > > incoming out outgoing messages caused our memory heap requirements to go > > from 6 GB up to 12 GB, resulting in long GC pauses that caused our > brokers > > to crash. > > > > Once I explicitly set the message format to 0.9.0.1 everything was fine. > > However, now that I want to convert to the 0.10.0.1 message format, I am > > looking for guidance as to the best way to do this. I can switch our > Kafka > > clients to the 0.10.0.1 release, no problem. > > > > But...our challenge is that we occasionally have to replay topics, and I > am > > concerned that we will be back to the issue of converting outgoing > messages > > from 0.9.0.1 to 0.10.0.1 in these replay scenarios as well as in > > inter-broker replication. > > > > Please confirm the best way to upgrade our clients and message format to > > 0.10.0.1 while preserving the stability of our cluster as well as the > > ability to replay topics that will have both 0.9.0.1 and > 0.10.0.1-formatted > > messages. > > > > Thanks > > > > --John > > >
Re: Upgraded brokers from 0.9.0.1 -> 0.10.0.1: how to upgrade message format to 0.10.0.1?
0.10.0.1 consumers understand the older formats. So, the conversion only happens when the message format is newer than what the consumer understands. For the producer side, the conversion is not particularly costly since the data is in the heap and, if you use compression, 0.9.0.x would do recompression either way. Ismael On Tue, Sep 19, 2017 at 2:41 PM, John Yost wrote: > Hi Everyone, > > We recently upgraded our cluster from 0.9.0.1 to 0.10.0.1 but had to keep > our Kafka clients at 0.9.0.1. We now want to upgrade our clients and, > concurrently, the message version to 0.10.0.1. > > When we did the 0.9.0.1 -> 0.10.0.1 broker upgrade we were not able to > upgrade the kafka clients to 0.10.0.1, and we did not set the message > format. As a result the 0.9.0.1 -> 0.10.0.1 format conversion for both > incoming out outgoing messages caused our memory heap requirements to go > from 6 GB up to 12 GB, resulting in long GC pauses that caused our brokers > to crash. > > Once I explicitly set the message format to 0.9.0.1 everything was fine. > However, now that I want to convert to the 0.10.0.1 message format, I am > looking for guidance as to the best way to do this. I can switch our Kafka > clients to the 0.10.0.1 release, no problem. > > But...our challenge is that we occasionally have to replay topics, and I am > concerned that we will be back to the issue of converting outgoing messages > from 0.9.0.1 to 0.10.0.1 in these replay scenarios as well as in > inter-broker replication. > > Please confirm the best way to upgrade our clients and message format to > 0.10.0.1 while preserving the stability of our cluster as well as the > ability to replay topics that will have both 0.9.0.1 and 0.10.0.1-formatted > messages. > > Thanks > > --John >
Upgraded brokers from 0.9.0.1 -> 0.10.0.1: how to upgrade message format to 0.10.0.1?
Hi Everyone, We recently upgraded our cluster from 0.9.0.1 to 0.10.0.1 but had to keep our Kafka clients at 0.9.0.1. We now want to upgrade our clients and, concurrently, the message version to 0.10.0.1. When we did the 0.9.0.1 -> 0.10.0.1 broker upgrade we were not able to upgrade the kafka clients to 0.10.0.1, and we did not set the message format. As a result the 0.9.0.1 -> 0.10.0.1 format conversion for both incoming out outgoing messages caused our memory heap requirements to go from 6 GB up to 12 GB, resulting in long GC pauses that caused our brokers to crash. Once I explicitly set the message format to 0.9.0.1 everything was fine. However, now that I want to convert to the 0.10.0.1 message format, I am looking for guidance as to the best way to do this. I can switch our Kafka clients to the 0.10.0.1 release, no problem. But...our challenge is that we occasionally have to replay topics, and I am concerned that we will be back to the issue of converting outgoing messages from 0.9.0.1 to 0.10.0.1 in these replay scenarios as well as in inter-broker replication. Please confirm the best way to upgrade our clients and message format to 0.10.0.1 while preserving the stability of our cluster as well as the ability to replay topics that will have both 0.9.0.1 and 0.10.0.1-formatted messages. Thanks --John
Re: Data loss while upgrading confluent 3.0.0 kafka cluster to confluent 3.2.2
Hi Yogesh, A few questions: 1. Please share the code for the test script. 2. At which point in the sequence below was the code for the brokers updated to 0.10.2? 3. When doing a rolling restart, it's generally a good idea to ensure that there are no under-replicated partitions. 4. Is controlled shutdown completing successfully? Ismael On Tue, Sep 19, 2017 at 12:33 PM, Yogesh Sangvikar < yogesh.sangvi...@gmail.com> wrote: > Hi Team, > > Thanks for providing comments. > > Here adding more details on steps followed for upgrade, > > Cluster details: We are using 4 node kafka cluster and topics with 3 > replication factor. For upgrade test, we are using a topic with 5 > partitions & 3 replication factor. > > Topic:student-activity PartitionCount:5ReplicationFactor:3 > Configs: > Topic: student-activity Partition: 0Leader: 4 Replicas: > 4,2,3 Isr: 4,2,3 > Topic: student-activity Partition: 1Leader: 1 Replicas: > 1,3,4 Isr: 1,4,3 > Topic: student-activity Partition: 2Leader: 2 Replicas: > 2,4,1 Isr: 2,4,1 > Topic: student-activity Partition: 3Leader: 3 Replicas: > 3,1,2 Isr: 1,2,3 > Topic: student-activity Partition: 4Leader: 4 Replicas: > 4,3,1 Isr: 4,1,3 > > We are using a test script to publish events continuously to one of the > topic partition (here partition 3) and monitoring the scripts total > published events count with the partition 3 offset value. > > [ Note: The topic partitions offset count may differ from CLI utility and > screenshot due to capture delay. ] > >- First, we have rolling restarted all kafka brokers for explicit >protocol and message version to 0.10.0, > inter.broker.protocol.version=0.10.0 > >log.message.format.version=0.10.0 > >- During this restarted, the events are getting published as expected >and counters are increasing & in-sync replicas are coming up immediately >post restart. > >[***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class >kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.* >**.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic >student-activity --time -1 >student-activity:2:1 >student-activity:4:1 >student-activity:1:68 >student-activity:3:785 >student-activity:0:1 >[image: Inline image 1] > > >- Next, we have rolling restarted kafka brokers for >"inter.broker.protocol.version=0.10.2" in below broker sequence. (note >that, test script is publishing events to the topic partition continuously) > >- Restarted server with broker.id = 4, > >[***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class >kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.* >**.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic >student-activity --time -1 >student-activity:2:1 >student-activity:4:1 >student-activity:1:68 >student-activity:3:1189 >student-activity:0:1 > >[image: Inline image 2] > >- Restarted server with broker.id = 3, > >[***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class >kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.* >**.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic >student-activity --time -1 >student-activity:2:1 >student-activity:4:1 >student-activity:1:68 >*student-activity:3:1430* >student-activity:0:1 > > >[image: Inline image 3] > > >- Restarted server with broker.id = 2, (here, observe the partition 3 >offset count is decreased from last restart offset) > >[***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class >kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.* >**.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic >student-activity --time -1 >student-activity:2:1 >student-activity:4:1 >student-activity:1:68 >*student-activity:3:1357* >student-activity:0:1 > >[image: Inline image 4] > > >- Restarted last server with broker.id = 1, > >[***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class >kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.* >**.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic >student-activity --time -1 >student-activity:2:1 >student-activity:4:1 >student-activity:1:68 >student-activity:3:1613 >student-activity:0:1 >[image: Inline image 5] > >- Finally, rolling restarted all brokers (in same sequence above) for >"log.message.format.version=0.10.2" > > > [image: Inline image 6] > [image: Inline image 7] > [image: Inline image 8] > > [image: Inline image 9] > >- The topic offset counter after final restart, > >[***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class >kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.* >**.***:9092,***.***.***.***:9092,**
Re: Data loss while upgrading confluent 3.0.0 kafka cluster to confluent 3.2.2
Looks like the screen shots didn't come through. Consider pasting the text. Thanks Original message From: Yogesh Sangvikar Date: 9/19/17 4:33 AM (GMT-08:00) To: users@kafka.apache.org Cc: Sumit Arora , Bharathreddy Sodinapalle , asgar@happiestminds.com Subject: Re: Data loss while upgrading confluent 3.0.0 kafka cluster to confluent 3.2.2 Hi Team, Thanks for providing comments. Here adding more details on steps followed for upgrade, Cluster details: We are using 4 node kafka cluster and topics with 3 replication factor. For upgrade test, we are using a topic with 5 partitions & 3 replication factor. Topic:student-activity PartitionCount:5 ReplicationFactor:3 Configs: Topic: student-activity Partition: 0 Leader: 4 Replicas: 4,2,3 Isr: 4,2,3 Topic: student-activity Partition: 1 Leader: 1 Replicas: 1,3,4 Isr: 1,4,3 Topic: student-activity Partition: 2 Leader: 2 Replicas: 2,4,1 Isr: 2,4,1 Topic: student-activity Partition: 3 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3 Topic: student-activity Partition: 4 Leader: 4 Replicas: 4,3,1 Isr: 4,1,3 We are using a test script to publish events continuously to one of the topic partition (here partition 3) and monitoring the scripts total published events count with the partition 3 offset value. [ Note: The topic partitions offset count may differ from CLI utility and screenshot due to capture delay. ]First, we have rolling restarted all kafka brokers for explicit protocol and message version to 0.10.0, inter.broker.protocol.version=0.10.0 log.message.format.version=0.10.0 During this restarted, the events are getting published as expected and counters are increasing & in-sync replicas are coming up immediately post restart. [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:785 student-activity:0:1 Next, we have rolling restarted kafka brokers for "inter.broker.protocol.version=0.10.2" in below broker sequence. (note that, test script is publishing events to the topic partition continuously) Restarted server with broker.id = 4, [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:1189 student-activity:0:1 Restarted server with broker.id = 3, [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:1430 student-activity:0:1 Restarted server with broker.id = 2, (here, observe the partition 3 offset count is decreased from last restart offset) [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:1357 student-activity:0:1 Restarted last server with broker.id = 1, [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:1613 student-activity:0:1 Finally, rolling restarted all brokers (in same sequence above) for "log.message.format.version=0.10.2" The topic offset counter after final restart, [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:2694 student-activity:0:1 And, the topic offset counter after stopping events publish script, [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:2769 student-activity:0:1 Calculating missing events counts, Total events published by script to partition 3 : 3090 Offset count on Partition 3 : 2769 Missi
Re: Data loss while upgrading confluent 3.0.0 kafka cluster to confluent 3.2.2
Hi Team, Thanks for providing comments. Here adding more details on steps followed for upgrade, Cluster details: We are using 4 node kafka cluster and topics with 3 replication factor. For upgrade test, we are using a topic with 5 partitions & 3 replication factor. Topic:student-activity PartitionCount:5ReplicationFactor:3 Configs: Topic: student-activity Partition: 0Leader: 4 Replicas: 4,2,3 Isr: 4,2,3 Topic: student-activity Partition: 1Leader: 1 Replicas: 1,3,4 Isr: 1,4,3 Topic: student-activity Partition: 2Leader: 2 Replicas: 2,4,1 Isr: 2,4,1 Topic: student-activity Partition: 3Leader: 3 Replicas: 3,1,2 Isr: 1,2,3 Topic: student-activity Partition: 4Leader: 4 Replicas: 4,3,1 Isr: 4,1,3 We are using a test script to publish events continuously to one of the topic partition (here partition 3) and monitoring the scripts total published events count with the partition 3 offset value. [ Note: The topic partitions offset count may differ from CLI utility and screenshot due to capture delay. ] - First, we have rolling restarted all kafka brokers for explicit protocol and message version to 0.10.0, inter.broker.protocol.version=0.10.0 log.message.format.version=0.10.0 - During this restarted, the events are getting published as expected and counters are increasing & in-sync replicas are coming up immediately post restart. [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:785 student-activity:0:1 [image: Inline image 1] - Next, we have rolling restarted kafka brokers for "inter.broker.protocol.version=0.10.2" in below broker sequence. (note that, test script is publishing events to the topic partition continuously) - Restarted server with broker.id = 4, [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:1189 student-activity:0:1 [image: Inline image 2] - Restarted server with broker.id = 3, [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 *student-activity:3:1430* student-activity:0:1 [image: Inline image 3] - Restarted server with broker.id = 2, (here, observe the partition 3 offset count is decreased from last restart offset) [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 *student-activity:3:1357* student-activity:0:1 [image: Inline image 4] - Restarted last server with broker.id = 1, [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:1613 student-activity:0:1 [image: Inline image 5] - Finally, rolling restarted all brokers (in same sequence above) for "log.message.format.version=0.10.2" [image: Inline image 6] [image: Inline image 7] [image: Inline image 8] [image: Inline image 9] - The topic offset counter after final restart, [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:2694 student-activity:0:1 - And, the topic offset counter after stopping events publish script, [***@***.***.***.*** confluent-3.2.2]$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list ***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092,***.***.***.***:9092 --topic student-activity --time -1 student-activity:2:1 student-activity:4:1 student-activity:1:68 student-activity:3:2769 student-activity:0:1 - Calculating missing events counts,