[ https://issues.apache.org/jira/browse/KAFKA-6189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16249419#comment-16249419 ]
Xinyang Gao edited comment on KAFKA-6189 at 11/13/17 2:25 PM: -------------------------------------------------------------- I had seen similar behaviors which also led to message loss, setup is as follows: topic has 3 partitions and replication factor 3, named gao31 min.insync.replicas=2 consumer has default "auto.offset.reset=latest" consumer manually commitSync offsets after handling messages with consumer.commitSync() unclean leader election = false kafka cluster has 3 brokers, kafka-foo-0, kafka-foo-1 and kafka-foo-2 consumer group ID is GROUPID session.timeout.ms and max.poll.interval.ms are using default values Initially kafka-foo-1 is selected as group coordinator To reproduce: 1. kill kafka-foo-1 2. See following logs which implies that consumer disconnect to kafka-foo-1 and try to discover a new group coordinator {code:java} 2017-11-10 11:27:45,097 DEBUG org.apache.kafka.clients.NetworkClient - Node 2147483646 disconnected. [kafka-consumer] 2017-11-10 11:27:45,097 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator kafka-foo-1.***.com:9092 (id: 2147483646 rack: null) dead for group GROUPID [kafka-consumer] 2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-1.***.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer] 2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-2.***.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer] 2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient - Found least loaded node kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:27:45,098 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending GroupCoordinator request for group GROUPID to broker kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-1.***.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-2.***.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Found least loaded node kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:27:45,098 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending GroupCoordinator request for group GROUPID to broker kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-1.***.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-2.***.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Found least loaded node kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:27:45,099 DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request (type=MetadataRequest, topics=gao31) to node 0 [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Sending METADATA {topics=[gao31],allow_auto_topic_creation=true} to node 0. [kafka-consumer] 2017-11-10 11:27:45,100 TRACE org.apache.kafka.clients.NetworkClient - Sending FIND_COORDINATOR {coordinator_key=GROUPID,coordinator_type=0} to node 0. [kafka-consumer] 2017-11-10 11:27:45,276 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=2,error_code=0,high_watermark=1631,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1630, timestamp=1510313264997, key=1 bytes, value=102 bytes))]},{partition_header={partition=0,error_code=0,high_watermark=1223,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[]}]}]} [kafka-consumer] 2017-11-10 11:27:45,276 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 1630 for partition gao31-2 returned fetch data (error=NONE, highWaterMark=1631, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer] 2017-11-10 11:27:45,276 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 1223 for partition gao31-0 returned fetch data (error=NONE, highWaterMark=1223, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=0) [kafka-consumer] 2017-11-10 11:27:45,341 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1224,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1223, timestamp=1510313265277, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer] 2017-11-10 11:27:45,341 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 1223 for partition gao31-1 returned fetch data (error=NONE, highWaterMark=1224, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer] 2017-11-10 11:27:45,342 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 3, received {throttle_time_ms=0,brokers=[{node_id=2,host=kafka-foo-2.***.com,port=9092,rack=null},{node_id=1,host=kafka-foo-1.***.com,port=9092,rack=null},{node_id=0,host=kafka-foo-0.***.com,port=9092,rack=null}],cluster_id=Y8zodxM7TNi-19dFcgErpw,controller_id=0,topic_metadata=[{topic_error_code=0,topic=gao31,is_internal=false,partition_metadata=[{partition_error_code=0,partition_id=2,leader=2,replicas=[1,2,0],isr=[0,2]},{partition_error_code=0,partition_id=1,leader=0,replicas=[0,1,2],isr=[0,2]},{partition_error_code=0,partition_id=0,leader=2,replicas=[2,0,1],isr=[2,0]}]}]} [kafka-consumer] 2017-11-10 11:27:45,342 DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 4 to Cluster(id = Y8zodxM7TNi-19dFcgErpw, nodes = [kafka-foo-0.***.com:9092 (id: 0 rack: null), kafka-foo-1.***.com:9092 (id: 1 rack: null), kafka-foo-2.***.com:9092 (id: 2 rack: null)], partitions = [Partition(topic = gao31, partition = 0, leader = 2, replicas = [2,0,1], isr = [2,0]), Partition(topic = gao31, partition = 1, leader = 0, replicas = [0,1,2], isr = [0,2]), Partition(topic = gao31, partition = 2, leader = 2, replicas = [1,2,0], isr = [0,2])]) [kafka-consumer] 2017-11-10 11:27:45,343 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.***.com,port=9092}} [kafka-consumer] 2017-11-10 11:27:45,343 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313265343, latencyMs=245, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=7610,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-2.***.com:9092 (id: 2 rack: null))) for group GROUPID [kafka-consumer] 2017-11-10 11:27:45,346 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator kafka-foo-2.***.com:9092 (id: 2147483645 rack: null) for group GROUPID. [kafka-consumer] 2017-11-10 11:27:45,346 DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147483645 at kafka-foo-2.***.com:9092. [kafka-consumer] 2017-11-10 11:27:45,350 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Preparing to read 188 bytes of data for partition gao31-2 with offset 1630 [kafka-consumer] 2017-11-10 11:27:45,350 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Updating high watermark for partition gao31-2 to 1631 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Returning fetched records at offset 1630 for assigned partition gao31-2 and update position to 1631 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Preparing to read 0 bytes of data for partition gao31-0 with offset 1223 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Updating high watermark for partition gao31-0 to 1223 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Returning fetched records at offset 1223 for assigned partition gao31-0 and update position to 1223 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Preparing to read 188 bytes of data for partition gao31-1 with offset 1223 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Updating high watermark for partition gao31-1 to 1224 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Returning fetched records at offset 1223 for assigned partition gao31-1 and update position to 1224 [kafka-consumer] {code} 3. Then it tries to connect to kafka-foo-0 or kafka-foo-2 in a round-robin way: {code:java} 2017-11-10 11:27:45,470 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2147483645, for key 8, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=16},{partition=1,error_code=16},{partition=2,error_code=16}]}]} [kafka-consumer] 2017-11-10 11:27:45,470 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group GROUPID failed: This is not the correct coordinator. [kafka-consumer] 2017-11-10 11:27:45,470 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator kafka-foo-2.***.com:9092 (id: 2147483645 rack: null) dead for group GROUPID [kafka-consumer] 2017-11-10 11:27:45,573 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1225,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1224, timestamp=1510313265499, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer] 2017-11-10 11:27:45,573 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 1224 for partition gao31-1 returned fetch data (error=NONE, highWaterMark=1225, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer] 2017-11-10 11:27:45,575 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.***.com,port=9092}} [kafka-consumer] 2017-11-10 11:27:45,575 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313265575, latencyMs=4, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=7616,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-2.***.com:9092 (id: 2 rack: null))) for group GROUPID [kafka-consumer] 2017-11-10 11:27:45,575 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator kafka-foo-2.***.com:9092 (id: 2147483645 rack: null) for group GROUPID. [kafka-consumer] 2017-11-10 11:27:45,576 TRACE org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending OffsetCommit request with {gao31-1=OffsetAndMetadata{offset=1224, metadata='no metadata'}, gao31-2=OffsetAndMetadata{offset=1631, metadata='no metadata'}, gao31-0=OffsetAndMetadata{offset=1223, metadata='no metadata'}} to coordinator kafka-foo-2.***.com:9092 (id: 2147483645 rack: null) for group GROUPID [kafka-consumer] 2017-11-10 11:27:45,576 TRACE org.apache.kafka.clients.NetworkClient - Sending OFFSET_COMMIT {group_id=GROUPID,group_generation_id=7,member_id=consumer-2-82264aac-077f-469a-9783-2d8949bd61c3,retention_time=-1,topics=[{topic=gao31,partitions=[{partition=0,offset=1223,metadata=no metadata},{partition=1,offset=1224,metadata=no metadata},{partition=2,offset=1631,metadata=no metadata}]}]} to node 2147483645. [kafka-consumer] 2017-11-10 11:27:45,581 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2147483645, for key 8, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=25},{partition=1,error_code=25},{partition=2,error_code=25}]}]} [kafka-consumer] 2017-11-10 11:27:45,581 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group GROUPID failed: The coordinator is not aware of this member. [kafka-consumer] 2017-11-10 11:27:45,581 DEBUG *********************************************** - unable to commit offsets [kafka-consumer] 2017-11-10 11:27:45,582 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [gao31-1, gao31-2, gao31-0] for group GROUPID [kafka-consumer] ...... {code} this lasts for a about 3 minutes 4. Finally it discovered kafka-foo-0 and use it as group coordinator, however, the logs in red shows that the new group coordinator is not able to know what is the last committed offset for this consumer group, thus resetting the offset to "latest" {code:java} 2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient - Found least loaded node kafka-foo-0.***.com:9092 (id: 0 rack: null) connected with no in-flight requests [kafka-consumer] 2017-11-10 11:30:05,817 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending GroupCoordinator request for group GROUPID to broker kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient - Sending FIND_COORDINATOR {coordinator_key=GROUPID,coordinator_type=0} to node 0. [kafka-consumer] 2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=0,host=kafka-foo-0.***.com,port=9092}} [kafka-consumer] 2017-11-10 11:30:05,820 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313405820, latencyMs=3, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=10942,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-0.***.com:9092 (id: 0 rack: null))) for group GROUPID [kafka-consumer] 2017-11-10 11:30:05,820 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator kafka-foo-0.***.com:9092 (id: 2147483647 rack: null) for group GROUPID. [kafka-consumer] 2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Disabling heartbeat thread for group GROUPID [kafka-consumer] 2017-11-10 11:30:05,820 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group GROUPID [kafka-consumer] 2017-11-10 11:30:05,820 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ((type: JoinGroupRequest, groupId=GROUPID, sessionTimeout=10000, rebalanceTimeout=300000, memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@161cb9a1)) to coordinator kafka-foo-0.***.com:9092 (id: 2147483647 rack: null) [kafka-consumer] 2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient - Sending JOIN_GROUP {group_id=GROUPID,session_timeout=10000,rebalance_timeout=300000,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=17 cap=17]}]} to node 2147483647. [kafka-consumer] 2017-11-10 11:30:05,823 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2147483647, for key 11, received {throttle_time_ms=0,error_code=0,generation_id=1113,group_protocol=range,leader_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,members=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=17 cap=17]}]} [kafka-consumer] 2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received successful JoinGroup response for group GROUPID: org.apache.kafka.common.requests.JoinGroupResponse@6f13c4b9 [kafka-consumer] 2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Performing assignment for group GROUPID using strategy range with subscriptions {consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Subscription(topics=[gao31])} [kafka-consumer] 2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Finished assignment for group GROUPID: {consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Assignment(partitions=[gao31-0, gao31-1, gao31-2])} [kafka-consumer] 2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending leader SyncGroup for group GROUPID to coordinator kafka-foo-0.***.com:9092 (id: 2147483647 rack: null): (type=SyncGroupRequest, groupId=GROUPID, generationId=1113, memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, groupAssignment=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5) [kafka-consumer] 2017-11-10 11:30:05,824 TRACE org.apache.kafka.clients.NetworkClient - Sending SYNC_GROUP {group_id=GROUPID,generation_id=1113,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,group_assignment=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]}]} to node 2147483647. [kafka-consumer] 2017-11-10 11:30:06,051 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2147483647, for key 14, received {throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]} [kafka-consumer] 2017-11-10 11:30:06,051 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully joined group GROUPID with generation 1113 [kafka-consumer] 2017-11-10 11:30:06,051 TRACE org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Enabling heartbeat thread for group GROUPID [kafka-consumer] 2017-11-10 11:30:06,051 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [gao31-1, gao31-2, gao31-0] for group GROUPID [kafka-consumer] 2017-11-10 11:30:06,052 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group GROUPID fetching committed offsets for partitions: [gao31-1, gao31-2, gao31-0] [kafka-consumer] 2017-11-10 11:30:06,052 TRACE org.apache.kafka.clients.NetworkClient - Sending OFFSET_FETCH {group_id=GROUPID,topics=[{topic=gao31,partitions=[{partition=1},{partition=2},{partition=0}]}]} to node 2147483647. [kafka-consumer] *{color:#d04437}2017-11-10 11:30:06,059 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2147483647, for key 9, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,offset=-1,metadata=,error_code=0},{partition=1,offset=-1,metadata=,error_code=0},{partition=2,offset=-1,metadata=,error_code=0}]}],error_code=0} [kafka-consumer] 2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group GROUPID has no committed offset for partition gao31-1 [kafka-consumer] 2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group GROUPID has no committed offset for partition gao31-2 [kafka-consumer] 2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group GROUPID has no committed offset for partition gao31-0 [kafka-consumer]{color}* 2017-11-10 11:30:06,059 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={gao31-1=-1, gao31-2=-1, gao31-0=-1}, minVersion=0) to broker kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:30:06,060 TRACE org.apache.kafka.clients.NetworkClient - Sending LIST_OFFSETS {replica_id=-1,isolation_level=0,topics=[{topic=gao31,partitions=[{partition=0,timestamp=-1},{partition=1,timestamp=-1},{partition=2,timestamp=-1}]}]} to node 0. [kafka-consumer] 2017-11-10 11:30:06,062 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 2, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=0,timestamp=-1,offset=1288},{partition=1,error_code=0,timestamp=-1,offset=1290},{partition=2,error_code=0,timestamp=-1,offset=1718}]}]} [kafka-consumer] 2017-11-10 11:30:06,062 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Received ListOffsetResponse org.apache.kafka.common.requests.ListOffsetResponse@492e9c44 from broker kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for gao31-1. Fetched offset 1290, timestamp -1 [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for gao31-2. Fetched offset 1718, timestamp -1 [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for gao31-0. Fetched offset 1288, timestamp -1 [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition gao31-1 to offset 1290. [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition gao31-2 to offset 1718. [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition gao31-0 to offset 1288. [kafka-consumer] {code} I am wondering why the new group coordinator is not able to know the committed offset for the consumer group ? Since the group ID was never being changed, just consumer leaves and re-joins, I am expecting that the new consumer is able to pick up the last committed offset and continue from there ? Otherwise we will lose messages on consumer side (the messages generated in the group coordinator downtime). Any theories about this ? Thanks was (Author: gaoxinyang): I had seen similar behaviors which also led to message loss, setup is as follows: topic has 3 partitions and replication factor 3, named gao31 min.insync.replicas=2 consumer has default "auto.offset.reset=latest" consumer manually commitSync offsets after handling messages with consumer.commitSync() unclean leader election = false kafka cluster has 3 brokers, kafka-foo-0, kafka-foo-1 and kafka-foo-2 consumer group ID is uklonvd826214 session.timeout.ms and max.poll.interval.ms are using default values Initially kafka-foo-1 is selected as group coordinator To reproduce: 1. kill kafka-foo-1 2. See following logs which implies that consumer disconnect to kafka-foo-1 and try to discover a new group coordinator {code:java} 2017-11-10 11:27:45,097 DEBUG org.apache.kafka.clients.NetworkClient - Node 2147483646 disconnected. [kafka-consumer] 2017-11-10 11:27:45,097 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator kafka-foo-1.***.com:9092 (id: 2147483646 rack: null) dead for group uklonvd826214 [kafka-consumer] 2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-1.***.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer] 2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-2.***.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer] 2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient - Found least loaded node kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:27:45,098 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending GroupCoordinator request for group uklonvd826214 to broker kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-1.***.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-2.***.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Found least loaded node kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:27:45,098 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending GroupCoordinator request for group uklonvd826214 to broker kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-1.***.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Removing node kafka-foo-2.***.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Found least loaded node kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:27:45,099 DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request (type=MetadataRequest, topics=gao31) to node 0 [kafka-consumer] 2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient - Sending METADATA {topics=[gao31],allow_auto_topic_creation=true} to node 0. [kafka-consumer] 2017-11-10 11:27:45,100 TRACE org.apache.kafka.clients.NetworkClient - Sending FIND_COORDINATOR {coordinator_key=uklonvd826214,coordinator_type=0} to node 0. [kafka-consumer] 2017-11-10 11:27:45,276 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=2,error_code=0,high_watermark=1631,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1630, timestamp=1510313264997, key=1 bytes, value=102 bytes))]},{partition_header={partition=0,error_code=0,high_watermark=1223,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[]}]}]} [kafka-consumer] 2017-11-10 11:27:45,276 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 1630 for partition gao31-2 returned fetch data (error=NONE, highWaterMark=1631, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer] 2017-11-10 11:27:45,276 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 1223 for partition gao31-0 returned fetch data (error=NONE, highWaterMark=1223, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=0) [kafka-consumer] 2017-11-10 11:27:45,341 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1224,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1223, timestamp=1510313265277, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer] 2017-11-10 11:27:45,341 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 1223 for partition gao31-1 returned fetch data (error=NONE, highWaterMark=1224, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer] 2017-11-10 11:27:45,342 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 3, received {throttle_time_ms=0,brokers=[{node_id=2,host=kafka-foo-2.***.com,port=9092,rack=null},{node_id=1,host=kafka-foo-1.***.com,port=9092,rack=null},{node_id=0,host=kafka-foo-0.***.com,port=9092,rack=null}],cluster_id=Y8zodxM7TNi-19dFcgErpw,controller_id=0,topic_metadata=[{topic_error_code=0,topic=gao31,is_internal=false,partition_metadata=[{partition_error_code=0,partition_id=2,leader=2,replicas=[1,2,0],isr=[0,2]},{partition_error_code=0,partition_id=1,leader=0,replicas=[0,1,2],isr=[0,2]},{partition_error_code=0,partition_id=0,leader=2,replicas=[2,0,1],isr=[2,0]}]}]} [kafka-consumer] 2017-11-10 11:27:45,342 DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 4 to Cluster(id = Y8zodxM7TNi-19dFcgErpw, nodes = [kafka-foo-0.***.com:9092 (id: 0 rack: null), kafka-foo-1.***.com:9092 (id: 1 rack: null), kafka-foo-2.***.com:9092 (id: 2 rack: null)], partitions = [Partition(topic = gao31, partition = 0, leader = 2, replicas = [2,0,1], isr = [2,0]), Partition(topic = gao31, partition = 1, leader = 0, replicas = [0,1,2], isr = [0,2]), Partition(topic = gao31, partition = 2, leader = 2, replicas = [1,2,0], isr = [0,2])]) [kafka-consumer] 2017-11-10 11:27:45,343 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.***.com,port=9092}} [kafka-consumer] 2017-11-10 11:27:45,343 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313265343, latencyMs=245, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=7610,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-2.***.com:9092 (id: 2 rack: null))) for group uklonvd826214 [kafka-consumer] 2017-11-10 11:27:45,346 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator kafka-foo-2.***.com:9092 (id: 2147483645 rack: null) for group uklonvd826214. [kafka-consumer] 2017-11-10 11:27:45,346 DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147483645 at kafka-foo-2.***.com:9092. [kafka-consumer] 2017-11-10 11:27:45,350 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Preparing to read 188 bytes of data for partition gao31-2 with offset 1630 [kafka-consumer] 2017-11-10 11:27:45,350 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Updating high watermark for partition gao31-2 to 1631 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Returning fetched records at offset 1630 for assigned partition gao31-2 and update position to 1631 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Preparing to read 0 bytes of data for partition gao31-0 with offset 1223 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Updating high watermark for partition gao31-0 to 1223 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Returning fetched records at offset 1223 for assigned partition gao31-0 and update position to 1223 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Preparing to read 188 bytes of data for partition gao31-1 with offset 1223 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Updating high watermark for partition gao31-1 to 1224 [kafka-consumer] 2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Returning fetched records at offset 1223 for assigned partition gao31-1 and update position to 1224 [kafka-consumer] {code} 3. Then it tries to connect to kafka-foo-0 or kafka-foo-2 in a round-robin way: {code:java} 2017-11-10 11:27:45,470 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2147483645, for key 8, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=16},{partition=1,error_code=16},{partition=2,error_code=16}]}]} [kafka-consumer] 2017-11-10 11:27:45,470 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group uklonvd826214 failed: This is not the correct coordinator. [kafka-consumer] 2017-11-10 11:27:45,470 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator kafka-foo-2.***.com:9092 (id: 2147483645 rack: null) dead for group uklonvd826214 [kafka-consumer] 2017-11-10 11:27:45,573 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1225,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1224, timestamp=1510313265499, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer] 2017-11-10 11:27:45,573 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 1224 for partition gao31-1 returned fetch data (error=NONE, highWaterMark=1225, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer] 2017-11-10 11:27:45,575 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.***.com,port=9092}} [kafka-consumer] 2017-11-10 11:27:45,575 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313265575, latencyMs=4, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=7616,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-2.***.com:9092 (id: 2 rack: null))) for group uklonvd826214 [kafka-consumer] 2017-11-10 11:27:45,575 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator kafka-foo-2.***.com:9092 (id: 2147483645 rack: null) for group uklonvd826214. [kafka-consumer] 2017-11-10 11:27:45,576 TRACE org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending OffsetCommit request with {gao31-1=OffsetAndMetadata{offset=1224, metadata='no metadata'}, gao31-2=OffsetAndMetadata{offset=1631, metadata='no metadata'}, gao31-0=OffsetAndMetadata{offset=1223, metadata='no metadata'}} to coordinator kafka-foo-2.***.com:9092 (id: 2147483645 rack: null) for group uklonvd826214 [kafka-consumer] 2017-11-10 11:27:45,576 TRACE org.apache.kafka.clients.NetworkClient - Sending OFFSET_COMMIT {group_id=uklonvd826214,group_generation_id=7,member_id=consumer-2-82264aac-077f-469a-9783-2d8949bd61c3,retention_time=-1,topics=[{topic=gao31,partitions=[{partition=0,offset=1223,metadata=no metadata},{partition=1,offset=1224,metadata=no metadata},{partition=2,offset=1631,metadata=no metadata}]}]} to node 2147483645. [kafka-consumer] 2017-11-10 11:27:45,581 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2147483645, for key 8, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=25},{partition=1,error_code=25},{partition=2,error_code=25}]}]} [kafka-consumer] 2017-11-10 11:27:45,581 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group uklonvd826214 failed: The coordinator is not aware of this member. [kafka-consumer] 2017-11-10 11:27:45,581 DEBUG *********************************************** - unable to commit offsets [kafka-consumer] 2017-11-10 11:27:45,582 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [gao31-1, gao31-2, gao31-0] for group uklonvd826214 [kafka-consumer] ...... {code} this lasts for a about 3 minutes 4. Finally it discovered kafka-foo-0 and use it as group coordinator, however, the logs in red shows that the new group coordinator is not able to know what is the last committed offset for this consumer group, thus resetting the offset to "latest" {code:java} 2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient - Found least loaded node kafka-foo-0.***.com:9092 (id: 0 rack: null) connected with no in-flight requests [kafka-consumer] 2017-11-10 11:30:05,817 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending GroupCoordinator request for group uklonvd826214 to broker kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient - Sending FIND_COORDINATOR {coordinator_key=uklonvd826214,coordinator_type=0} to node 0. [kafka-consumer] 2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=0,host=kafka-foo-0.***.com,port=9092}} [kafka-consumer] 2017-11-10 11:30:05,820 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313405820, latencyMs=3, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=10942,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-0.***.com:9092 (id: 0 rack: null))) for group uklonvd826214 [kafka-consumer] 2017-11-10 11:30:05,820 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator kafka-foo-0.***.com:9092 (id: 2147483647 rack: null) for group uklonvd826214. [kafka-consumer] 2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Disabling heartbeat thread for group uklonvd826214 [kafka-consumer] 2017-11-10 11:30:05,820 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group uklonvd826214 [kafka-consumer] 2017-11-10 11:30:05,820 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ((type: JoinGroupRequest, groupId=uklonvd826214, sessionTimeout=10000, rebalanceTimeout=300000, memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@161cb9a1)) to coordinator kafka-foo-0.***.com:9092 (id: 2147483647 rack: null) [kafka-consumer] 2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient - Sending JOIN_GROUP {group_id=uklonvd826214,session_timeout=10000,rebalance_timeout=300000,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=17 cap=17]}]} to node 2147483647. [kafka-consumer] 2017-11-10 11:30:05,823 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2147483647, for key 11, received {throttle_time_ms=0,error_code=0,generation_id=1113,group_protocol=range,leader_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,members=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=17 cap=17]}]} [kafka-consumer] 2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received successful JoinGroup response for group uklonvd826214: org.apache.kafka.common.requests.JoinGroupResponse@6f13c4b9 [kafka-consumer] 2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Performing assignment for group uklonvd826214 using strategy range with subscriptions {consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Subscription(topics=[gao31])} [kafka-consumer] 2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Finished assignment for group uklonvd826214: {consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Assignment(partitions=[gao31-0, gao31-1, gao31-2])} [kafka-consumer] 2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending leader SyncGroup for group uklonvd826214 to coordinator kafka-foo-0.***.com:9092 (id: 2147483647 rack: null): (type=SyncGroupRequest, groupId=uklonvd826214, generationId=1113, memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, groupAssignment=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5) [kafka-consumer] 2017-11-10 11:30:05,824 TRACE org.apache.kafka.clients.NetworkClient - Sending SYNC_GROUP {group_id=uklonvd826214,generation_id=1113,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,group_assignment=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]}]} to node 2147483647. [kafka-consumer] 2017-11-10 11:30:06,051 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2147483647, for key 14, received {throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]} [kafka-consumer] 2017-11-10 11:30:06,051 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully joined group uklonvd826214 with generation 1113 [kafka-consumer] 2017-11-10 11:30:06,051 TRACE org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Enabling heartbeat thread for group uklonvd826214 [kafka-consumer] 2017-11-10 11:30:06,051 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [gao31-1, gao31-2, gao31-0] for group uklonvd826214 [kafka-consumer] 2017-11-10 11:30:06,052 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group uklonvd826214 fetching committed offsets for partitions: [gao31-1, gao31-2, gao31-0] [kafka-consumer] 2017-11-10 11:30:06,052 TRACE org.apache.kafka.clients.NetworkClient - Sending OFFSET_FETCH {group_id=uklonvd826214,topics=[{topic=gao31,partitions=[{partition=1},{partition=2},{partition=0}]}]} to node 2147483647. [kafka-consumer] *{color:#d04437}2017-11-10 11:30:06,059 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 2147483647, for key 9, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,offset=-1,metadata=,error_code=0},{partition=1,offset=-1,metadata=,error_code=0},{partition=2,offset=-1,metadata=,error_code=0}]}],error_code=0} [kafka-consumer] 2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group uklonvd826214 has no committed offset for partition gao31-1 [kafka-consumer] 2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group uklonvd826214 has no committed offset for partition gao31-2 [kafka-consumer] 2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group uklonvd826214 has no committed offset for partition gao31-0 [kafka-consumer]{color}* 2017-11-10 11:30:06,059 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={gao31-1=-1, gao31-2=-1, gao31-0=-1}, minVersion=0) to broker kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:30:06,060 TRACE org.apache.kafka.clients.NetworkClient - Sending LIST_OFFSETS {replica_id=-1,isolation_level=0,topics=[{topic=gao31,partitions=[{partition=0,timestamp=-1},{partition=1,timestamp=-1},{partition=2,timestamp=-1}]}]} to node 0. [kafka-consumer] 2017-11-10 11:30:06,062 TRACE org.apache.kafka.clients.NetworkClient - Completed receive from node 0, for key 2, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=0,timestamp=-1,offset=1288},{partition=1,error_code=0,timestamp=-1,offset=1290},{partition=2,error_code=0,timestamp=-1,offset=1718}]}]} [kafka-consumer] 2017-11-10 11:30:06,062 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Received ListOffsetResponse org.apache.kafka.common.requests.ListOffsetResponse@492e9c44 from broker kafka-foo-0.***.com:9092 (id: 0 rack: null) [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for gao31-1. Fetched offset 1290, timestamp -1 [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for gao31-2. Fetched offset 1718, timestamp -1 [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Handling ListOffsetResponse response for gao31-0. Fetched offset 1288, timestamp -1 [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition gao31-1 to offset 1290. [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition gao31-2 to offset 1718. [kafka-consumer] 2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition gao31-0 to offset 1288. [kafka-consumer] {code} I am wondering why the new group coordinator is not able to know the committed offset for the consumer group ? Since the group ID was never being changed, just consumer leaves and re-joins, I am expecting that the new consumer is able to pick up the last committed offset and continue from there ? Otherwise we will lose messages on consumer side (the messages generated in the group coordinator downtime). Any theories about this ? Thanks > Loosing messages on OFFSET_OUT_OF_RANGE error in consumer > --------------------------------------------------------- > > Key: KAFKA-6189 > URL: https://issues.apache.org/jira/browse/KAFKA-6189 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.11.0.0 > Reporter: Andrey > Attachments: kafkaLossingMessages.png > > > Steps to reproduce: > * Setup test: > ** producer sends messages constantly. If cluster not available, then it will > retry > ** consumer polling > ** topic has 3 partitions and replication factor 3. > ** min.insync.replicas=2 > ** producer has "acks=all" > ** consumer has default "auto.offset.reset=latest" > ** consumer manually commitSync offsets after handling messages. > ** unclean leader election = false > ** kafka cluster has 3 brokers > * Kill broker 0 > * In consumer's logs: > {code} > 2017-11-08 11:36:33,967 INFO > org.apache.kafka.clients.consumer.internals.Fetcher - Fetch offset > 10706682 is out of range for partition mytopic-2, resetting offset > [kafka-consumer] > 2017-11-08 11:36:33,968 INFO > org.apache.kafka.clients.consumer.internals.Fetcher - Fetch offset > 8024431 is out of range for partition mytopic-1, resetting offset > [kafka-consumer] > 2017-11-08 11:36:34,045 INFO > org.apache.kafka.clients.consumer.internals.Fetcher - Fetch offset > 8029505 is out of range for partition mytopic-0, resetting offset > [kafka-consumer] > {code} > After that, consumer lost several messages on each partition. > Expected: > * return upper bound of range > * consumer should resume from that offset instead of "auto.offset.reset". > Workaround: > * put "auto.offset.reset=earliest" > * get a lot of duplicate messages, instead of lost > Looks like this is what happening during the recovery from broker failure > (see attachment) -- This message was sent by Atlassian JIRA (v6.4.14#64029)