[jira] [Assigned] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition

2019-12-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-9073:
--

Assignee: Guozhang Wang

> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters java.lang.IllegalStateException: No current assignment for 
> partition
> --
>
> Key: KAFKA-9073
> URL: https://issues.apache.org/jira/browse/KAFKA-9073
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: amuthan Ganeshan
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.4.0, 2.3.2
>
> Attachments: KAFKA-9073.log
>
>
> I have a Kafka stream application that stores the incoming messages into a 
> state store, and later during the punctuation period, we store them into a 
> big data persistent store after processing the messages.
> The application consumes from 120 partitions distributed across 40 instances. 
> The application has been running fine without any problem for months, but all 
> of a sudden some of the instances failed because of a stream thread exception 
> saying  
> ```java.lang.IllegalStateException: No current assignment for partition 
> --changelog-98```
>  
> And other instances are stuck in the REBALANCING state, and never comes out 
> of it. Here is the full stack trace, I just masked the application-specific 
> app name and store name in the stack trace due to NDA.
>  
> ```
> 2019-10-21 13:27:13,481 ERROR 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> Encountered the following error during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> application.id-store_name-changelog-98
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> ```
>  
> Now I checked the state sore disk usage; it is less than 40% of the total 
> disk space available. Restarting the application solves the problem for a 
> short amount of time, but the error popping up randomly on some other 
> instances quickly. I tried to change the retry and retry.backoff.ms 
> configuration but not helpful at all
> ```
> retries = 2147483647
> retry.backoff.ms
> ```
> After googling for some time I found there was a similar bug reported to the 
> Kafka team in the past, and also notice my stack trace is exactly matching 
> with the stack trace of the reported bug.
> Here is the link for the bug reported on a comparable basis a year ago.
> https://issues.apache.

[jira] [Assigned] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-12-09 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-7362:
---

Assignee: Dhruvil Shah  (was: xiongqi wu)

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: Dhruvil Shah
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-12-09 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992234#comment-16992234
 ] 

Dhruvil Shah commented on KAFKA-7362:
-

[~Ahuri3] that sounds reasonable, as long as you know which partitions have 
been orphaned.

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: Dhruvil Shah
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9048) Improve scalability in number of partitions in replica fetcher

2019-12-09 Thread Pradeep Bansal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992214#comment-16992214
 ] 

Pradeep Bansal commented on KAFKA-9048:
---

When is this change planned in Kafka release?

> Improve scalability in number of partitions in replica fetcher
> --
>
> Key: KAFKA-9048
> URL: https://issues.apache.org/jira/browse/KAFKA-9048
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Lucas Bradstreet
>Assignee: Guozhang Wang
>Priority: Major
>
> https://issues.apache.org/jira/browse/KAFKA-9039 
> ([https://github.com/apache/kafka/pull/7443]) improves the performance of the 
> replica fetcher (at both small and large numbers of partitions), but it does 
> not improve its complexity or scalability in the number of partitions.
> I took a profile using async-profiler for the 1000 partition JMH replica 
> fetcher benchmark. The big remaining culprits are:
>  * ~18% looking up logStartOffset
>  * ~45% FetchSessionHandler$Builder.add
>  * ~19% FetchSessionHandler$Builder.build
> *Suggestions*
>  # The logStartOffset is looked up for every partition on each doWork pass. 
> This requires a hashmap lookup even though the logStartOffset changes rarely. 
> If the replica fetcher could be notified of updates to the logStartOffset, 
> then we could reduce the overhead to a function of the number of updates to 
> the logStartOffset instead of O( n ) on each pass.
>  # The use of FetchSessionHandler means that we maintain a partitionStates 
> hashmap in the replica fetcher, and a sessionPartitions hashmap in the 
> FetchSessionHandler. On each incremental fetch session pass, we need to 
> reconcile these two hashmaps to determine which partitions were added/updated 
> and which partitions were removed. This reconciliation process is especially 
> expensive, requiring multiple passes over the fetching partitions, and 
> hashmap remove and puts for most partitions. The replica fetcher could be 
> smarter by maintaining the fetch session *updated* hashmap containing 
> FetchRequest.PartitionData(s) directly, as well as *removed* partitions list 
> so that these do not need to be generated by reconciled on each fetch pass.
>  # maybeTruncate requires an O( n ) pass over the elements in partitionStates 
> even if there are no partitions in truncating state. If we can maintain some 
> additional state about whether truncating partitions exist in 
> partitionStates, or if we could separate these states into a separate data 
> structure, we would not need to iterate across all partitions on every doWork 
> pass. I’ve seen clusters where this work takes about 0.5%-1% of CPU, which is 
> minor but will become more substantial as the number of partitions increases.
> If we can achieve 1 and 2, the complexity will be improved from a function of 
> the number of partitions to the the number of partitions with updated fetch 
> offsets/log start offsets between each fetch. In general, a minority of 
> partitions will have changes in these between fetches, so this should improve 
> the average case complexity greatly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9064) Observing transient issue with kinit command

2019-12-09 Thread Pradeep Bansal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992191#comment-16992191
 ] 

Pradeep Bansal commented on KAFKA-9064:
---

Can somebody please help with resolving this?

> Observing transient issue with kinit command
> 
>
> Key: KAFKA-9064
> URL: https://issues.apache.org/jira/browse/KAFKA-9064
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: Pradeep Bansal
>Priority: Major
>
> I have specified kinit command to be skinit. While this works fine for most 
> time, sometimes I see below exception where it doesnt respect provided kinit 
> command and use default value. Can this be handled?
>  
> |{{[}}{{2019}}{{-}}{{02}}{{-}}{{19}} 
> {{10}}{{:}}{{20}}{{:}}{{07}}{{,}}{{862}}{{] WARN [Principal}}{{=}}{{null]: 
> Could }}{{not}} {{renew TGT due to problem running shell command: 
> }}{{'/usr/bin/kinit -R'}}{{. Exiting refresh thread. 
> (org.apache.kafka.common.security.kerberos.KerberosLogin)}}
> {{org.apache.kafka.common.utils.Shell$ExitCodeException: kinit: Matching 
> credential }}{{not}} {{found (filename: 
> }}{{/}}{{tmp}}{{/}}{{krb5cc_25012_76850_sshd_w6VpLC8R0Y) }}{{while}} 
> {{renewing credentials}}
>  
> {{}}{{at 
> org.apache.kafka.common.utils.Shell.runCommand(Shell.java:}}{{130}}{{)}}
> {{}}{{at 
> org.apache.kafka.common.utils.Shell.run(Shell.java:}}{{76}}{{)}}
> {{}}{{at 
> org.apache.kafka.common.utils.Shell$ShellCommandExecutor.execute(Shell.java:}}{{204}}{{)}}
> {{}}{{at 
> org.apache.kafka.common.utils.Shell.execCommand(Shell.java:}}{{268}}{{)}}
> {{}}{{at 
> org.apache.kafka.common.utils.Shell.execCommand(Shell.java:}}{{255}}{{)}}
> {{}}{{at 
> org.apache.kafka.common.security.kerberos.KerberosLogin.}}{{lambda}}{{$login$}}{{10}}{{(KerberosLogin.java:}}{{212}}{{)}}
> {{}}{{at 
> java.base}}{{/}}{{java.lang.Thread.run(Thread.java:}}{{834}}{{)}}|
> | |
> | |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9212) Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest

2019-12-09 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-9212.

Resolution: Fixed

> Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest
> --
>
> Key: KAFKA-9212
> URL: https://issues.apache.org/jira/browse/KAFKA-9212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager
>Affects Versions: 2.3.0, 2.3.1
> Environment: Linux
>Reporter: Yannick
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.4.0, 2.3.2
>
>
> When running Kafka connect s3 sink connector ( confluent 5.3.0), after one 
> broker got restarted (leaderEpoch updated at this point), the connect worker 
> crashed with the following error : 
> [2019-11-19 16:20:30,097] ERROR [Worker clientId=connect-1, 
> groupId=connect-ls] Uncaught exception in herder work thread, exiting: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253)
>  org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by 
> times in 30003ms
>  
> After investigation, it seems it's because it got fenced when sending 
> ListOffsetRequest in loop and then got timed out , as follows :
> [2019-11-19 16:20:30,020] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Sending ListOffsetRequest (type=ListOffsetRequest, 
> replicaId=-1, partitionTimestamps={connect_ls_config-0={timestamp: -1, 
> maxNumOffsets: 1, currentLeaderEpoch: Optional[1]}}, 
> isolationLevel=READ_UNCOMMITTED) to broker kafka6.fra2.internal:9092 (id: 4 
> rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:905)
> [2019-11-19 16:20:30,044] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher:985)
>  
> The above happens multiple times until timeout.
>  
> According to the debugs, the consumer always get a leaderEpoch of 1 for this 
> topic when starting up :
>  
>  [2019-11-19 13:27:30,802] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Updating last seen epoch from null to 1 for partition 
> connect_ls_config-0 (org.apache.kafka.clients.Metadata:178)
>   
>   
>  But according to our brokers log, the leaderEpoch should be 2, as follows :
>   
>  [2019-11-18 14:19:28,988] INFO [Partition connect_ls_config-0 broker=4] 
> connect_ls_config-0 starts at Leader Epoch 2 from offset 22. Previous Leader 
> Epoch was: 1 (kafka.cluster.Partition)
>   
>   
>  This make impossible to restart the worker as it will always get fenced and 
> then finally timeout.
>   
>  It is also impossible to consume with a 2.3 kafka-console-consumer as 
> follows :
>   
>  kafka-console-consumer --bootstrap-server BOOTSTRAPSERVER:9092 --topic 
> connect_ls_config --from-beginning 
>   
>  the above will just hang forever ( which is not expected cause there is 
> data) and we can see those debug messages :
> [2019-11-19 22:17:59,124] DEBUG [Consumer clientId=consumer-1, 
> groupId=console-consumer-3844] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>   
>   
>  Interesting fact, if we do subscribe the same way with kafkacat (1.5.0) we 
> can consume without problem ( must be the way kafkacat is consuming ignoring 
> FENCED_LEADER_EPOCH):
>   
>  kafkacat -b BOOTSTRAPSERVER:9092 -t connect_ls_config -o beginning
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9212) Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest

2019-12-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992119#comment-16992119
 ] 

ASF GitHub Bot commented on KAFKA-9212:
---

hachikuji commented on pull request #7805: KAFKA-9212; Ensure LeaderAndIsr 
state updated in controller context during reassignment
URL: https://github.com/apache/kafka/pull/7805
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest
> --
>
> Key: KAFKA-9212
> URL: https://issues.apache.org/jira/browse/KAFKA-9212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager
>Affects Versions: 2.3.0, 2.3.1
> Environment: Linux
>Reporter: Yannick
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.4.0, 2.3.2
>
>
> When running Kafka connect s3 sink connector ( confluent 5.3.0), after one 
> broker got restarted (leaderEpoch updated at this point), the connect worker 
> crashed with the following error : 
> [2019-11-19 16:20:30,097] ERROR [Worker clientId=connect-1, 
> groupId=connect-ls] Uncaught exception in herder work thread, exiting: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253)
>  org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by 
> times in 30003ms
>  
> After investigation, it seems it's because it got fenced when sending 
> ListOffsetRequest in loop and then got timed out , as follows :
> [2019-11-19 16:20:30,020] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Sending ListOffsetRequest (type=ListOffsetRequest, 
> replicaId=-1, partitionTimestamps={connect_ls_config-0={timestamp: -1, 
> maxNumOffsets: 1, currentLeaderEpoch: Optional[1]}}, 
> isolationLevel=READ_UNCOMMITTED) to broker kafka6.fra2.internal:9092 (id: 4 
> rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:905)
> [2019-11-19 16:20:30,044] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher:985)
>  
> The above happens multiple times until timeout.
>  
> According to the debugs, the consumer always get a leaderEpoch of 1 for this 
> topic when starting up :
>  
>  [2019-11-19 13:27:30,802] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Updating last seen epoch from null to 1 for partition 
> connect_ls_config-0 (org.apache.kafka.clients.Metadata:178)
>   
>   
>  But according to our brokers log, the leaderEpoch should be 2, as follows :
>   
>  [2019-11-18 14:19:28,988] INFO [Partition connect_ls_config-0 broker=4] 
> connect_ls_config-0 starts at Leader Epoch 2 from offset 22. Previous Leader 
> Epoch was: 1 (kafka.cluster.Partition)
>   
>   
>  This make impossible to restart the worker as it will always get fenced and 
> then finally timeout.
>   
>  It is also impossible to consume with a 2.3 kafka-console-consumer as 
> follows :
>   
>  kafka-console-consumer --bootstrap-server BOOTSTRAPSERVER:9092 --topic 
> connect_ls_config --from-beginning 
>   
>  the above will just hang forever ( which is not expected cause there is 
> data) and we can see those debug messages :
> [2019-11-19 22:17:59,124] DEBUG [Consumer clientId=consumer-1, 
> groupId=console-consumer-3844] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>   
>   
>  Interesting fact, if we do subscribe the same way with kafkacat (1.5.0) we 
> can consume without problem ( must be the way kafkacat is consuming ignoring 
> FENCED_LEADER_EPOCH):
>   
>  kafkacat -b BOOTSTRAPSERVER:9092 -t connect_ls_config -o beginning
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9288) Do not allow the same object to be inserted multiple times into ImplicitLinkedHashCollection

2019-12-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992066#comment-16992066
 ] 

ASF GitHub Bot commented on KAFKA-9288:
---

cmccabe commented on pull request #7809: KAFKA-9288: Do not allow the same 
object to be inserted multiple times into ImplicitLinkedHashCollection
URL: https://github.com/apache/kafka/pull/7809
 
 
   We should not allow the same object to be inserted multiple times into 
ImplicitLinkedHashCollection. It causes corruption because there is only one 
set of previous and next pointers in the node.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Do not allow the same object to be inserted multiple times into 
> ImplicitLinkedHashCollection
> 
>
> Key: KAFKA-9288
> URL: https://issues.apache.org/jira/browse/KAFKA-9288
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>
> We should not allow the same object to be inserted multiple times into 
> ImplicitLinkedHashCollection.  It causes corruption because there is only one 
> set of previous and next pointers in the node.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9288) Do not allow the same object to be inserted multiple times into ImplicitLinkedHashCollection

2019-12-09 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-9288:
---

 Summary: Do not allow the same object to be inserted multiple 
times into ImplicitLinkedHashCollection
 Key: KAFKA-9288
 URL: https://issues.apache.org/jira/browse/KAFKA-9288
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe


We should not allow the same object to be inserted multiple times into 
ImplicitLinkedHashCollection.  It causes corruption because there is only one 
set of previous and next pointers in the node.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9159) Consumer.endOffsets Throw TimeoutException: Failed to get offsets by times in 30000ms after a leader change

2019-12-09 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992030#comment-16992030
 ] 

Guozhang Wang commented on KAFKA-9159:
--

[~zzccctv] I've left a comment on your submitted PR 
https://github.com/apache/kafka/pull/7665, thanks.

> Consumer.endOffsets Throw TimeoutException: Failed to get offsets by times in 
> 3ms after a leader change
> ---
>
> Key: KAFKA-9159
> URL: https://issues.apache.org/jira/browse/KAFKA-9159
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0, 2.0.0
>Reporter: zhangzhanchang
>Priority: Major
> Attachments: image-2019-11-08-10-28-19-881.png, 
> image-2019-11-08-10-29-06-282.png
>
>
> case 1: 0.10.2 version loop call Consumer.endOffsets Throw TimeoutException: 
> Failed to get offsets by times in 3ms after kill -9 broker,but a leader 
> change ,loop call Consumer.endOffsets no problem
> !image-2019-11-08-10-28-19-881.png|width=416,height=299!
> case 2: 2.0 version loop call Consumer.endOffsets Throw TimeoutException: 
> Failed to get offsets by times in 3ms after a leader change,but kill -9 
> broker,loop call Consumer.endOffsets no problem
> !image-2019-11-08-10-29-06-282.png|width=412,height=314!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9173) StreamsPartitionAssignor assigns partitions to only one worker

2019-12-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991991#comment-16991991
 ] 

Matthias J. Sax commented on KAFKA-9173:


Ack

> StreamsPartitionAssignor assigns partitions to only one worker
> --
>
> Key: KAFKA-9173
> URL: https://issues.apache.org/jira/browse/KAFKA-9173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Oleg Muravskiy
>Priority: Major
>  Labels: user-experience
> Attachments: StreamsPartitionAssignor.log
>
>
> I'm running a distributed KafkaStreams application on 10 worker nodes, 
> subscribed to 21 topics with 10 partitions in each. I'm only using a 
> Processor interface, and a persistent state store.
> However, only one worker gets assigned partitions, all other workers get 
> nothing. Restarting the application, or cleaning local state stores does not 
> help. StreamsPartitionAssignor migrates to other nodes, and eventually picks 
> up other node to assign partitions to, but still only one node.
> It's difficult to figure out where to look for the signs of problems, I'm 
> attaching the log messages from the StreamsPartitionAssignor. Let me know 
> what else I could provide to help resolve this.
> [^StreamsPartitionAssignor.log]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9259) suppress() for windowed-Serdes does not work with default serdes

2019-12-09 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-9259:
-
Labels: newbie  (was: )

> suppress() for windowed-Serdes does not work with default serdes
> 
>
> Key: KAFKA-9259
> URL: https://issues.apache.org/jira/browse/KAFKA-9259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: newbie
>
> The suppress() operator either inherits serdes from its upstream operator or 
> falls back to default serdes from the config.
> If the upstream operator is an windowed aggregation, the window-aggregation 
> operator wraps the user passed-in serde with a window-serde and pushed it 
> into suppress() – however, if default serdes are used, the window-aggregation 
> operator cannot push anything into suppress(). At runtime, it just creates a 
> default serde and wraps it according. For this case, suppress() also falls 
> back to default serdes; however, it does not wrap the serde and thus a 
> ClassCastException is thrown when the serde is used later.
> suppress() is already aware if the upstream aggregation is time/session 
> windowed or not and thus should use this information to wrap default serdes 
> accordingly.
> The current workaround for windowed-suppress is to overwrite the default 
> serde upstream to suppress(), such that suppress() inherits serdes and does 
> not fall back to default serdes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes

2019-12-09 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-9273:


Assignee: (was: Bill Bejeck)

> Refactor AbstractJoinIntegrationTest and Sub-classes
> 
>
> Key: KAFKA-9273
> URL: https://issues.apache.org/jira/browse/KAFKA-9273
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: newbie
>
> The  AbstractJoinIntegrationTest uses an embedded broker, but not all the 
> sub-classes require the use of an embedded broker anymore.  Additionally, 
> there are two test remaining that require an embedded broker, but they don't 
> perform joins, the are tests validating other conditions, so ideally those 
> tests should move into a separate test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes

2019-12-09 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-9273:
-
Labels: newbie  (was: )

> Refactor AbstractJoinIntegrationTest and Sub-classes
> 
>
> Key: KAFKA-9273
> URL: https://issues.apache.org/jira/browse/KAFKA-9273
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>  Labels: newbie
>
> The  AbstractJoinIntegrationTest uses an embedded broker, but not all the 
> sub-classes require the use of an embedded broker anymore.  Additionally, 
> there are two test remaining that require an embedded broker, but they don't 
> perform joins, the are tests validating other conditions, so ideally those 
> tests should move into a separate test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9173) StreamsPartitionAssignor assigns partitions to only one worker

2019-12-09 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991960#comment-16991960
 ] 

Sophie Blee-Goldman commented on KAFKA-9173:


[~mjsax] See [https://github.com/apache/kafka/pull/7808] & 
[https://github.com/apache/kafka-site/pull/244]

> StreamsPartitionAssignor assigns partitions to only one worker
> --
>
> Key: KAFKA-9173
> URL: https://issues.apache.org/jira/browse/KAFKA-9173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Oleg Muravskiy
>Priority: Major
>  Labels: user-experience
> Attachments: StreamsPartitionAssignor.log
>
>
> I'm running a distributed KafkaStreams application on 10 worker nodes, 
> subscribed to 21 topics with 10 partitions in each. I'm only using a 
> Processor interface, and a persistent state store.
> However, only one worker gets assigned partitions, all other workers get 
> nothing. Restarting the application, or cleaning local state stores does not 
> help. StreamsPartitionAssignor migrates to other nodes, and eventually picks 
> up other node to assign partitions to, but still only one node.
> It's difficult to figure out where to look for the signs of problems, I'm 
> attaching the log messages from the StreamsPartitionAssignor. Let me know 
> what else I could provide to help resolve this.
> [^StreamsPartitionAssignor.log]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9230) Change User Customizable Metrics API in StreamsMetrics interface

2019-12-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991932#comment-16991932
 ] 

ASF GitHub Bot commented on KAFKA-9230:
---

guozhangwang commented on pull request #7762: KAFKA-9230: Refactor 
user-customizable Streams metrics
URL: https://github.com/apache/kafka/pull/7762
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Change User Customizable Metrics API in StreamsMetrics interface
> 
>
> Key: KAFKA-9230
> URL: https://issues.apache.org/jira/browse/KAFKA-9230
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>
> As proposed in KIP-444, the user-customizable metrics API in the 
> StreamsMetrics interface shall be improved. For more details, see 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9241) SASL Clients are not forced to re-authenticate if they don't leverage SaslAuthenticateRequest

2019-12-09 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-9241.
---
Fix Version/s: 2.5.0
 Reviewer: Rajini Sivaram
   Resolution: Fixed

> SASL Clients are not forced to re-authenticate if they don't leverage 
> SaslAuthenticateRequest
> -
>
> Key: KAFKA-9241
> URL: https://issues.apache.org/jira/browse/KAFKA-9241
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.2.0, 2.3.0, 2.2.1
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
>  Labels: security, security-issue
> Fix For: 2.5.0
>
>
> Brokers are supposed to force SASL clients to re-authenticate (and kill such 
> connections in the absence of a timely and successful re-authentication) when 
> SASL Re-Authentication 
> [(KIP-368)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate]
>   is enabled via a positive `connections.max.reauth.ms` configuration value.  
> There is a flaw in the logic that causes connections to not be killed in the 
> absence of a timely and successful re-authentication _if the client does not 
> leverage the SaslAuthenticateRequest API_ (which was defined in 
> [KIP-152|https://cwiki.apache.org/confluence/display/KAFKA/KIP-152+-+Improve+diagnostics+for+SASL+authentication+failures]).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9241) SASL Clients are not forced to re-authenticate if they don't leverage SaslAuthenticateRequest

2019-12-09 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-9241:
--
Affects Version/s: 2.4.0

> SASL Clients are not forced to re-authenticate if they don't leverage 
> SaslAuthenticateRequest
> -
>
> Key: KAFKA-9241
> URL: https://issues.apache.org/jira/browse/KAFKA-9241
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.2.0, 2.3.0, 2.2.1, 2.4.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
>  Labels: security, security-issue
> Fix For: 2.5.0
>
>
> Brokers are supposed to force SASL clients to re-authenticate (and kill such 
> connections in the absence of a timely and successful re-authentication) when 
> SASL Re-Authentication 
> [(KIP-368)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate]
>   is enabled via a positive `connections.max.reauth.ms` configuration value.  
> There is a flaw in the logic that causes connections to not be killed in the 
> absence of a timely and successful re-authentication _if the client does not 
> leverage the SaslAuthenticateRequest API_ (which was defined in 
> [KIP-152|https://cwiki.apache.org/confluence/display/KAFKA/KIP-152+-+Improve+diagnostics+for+SASL+authentication+failures]).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9241) SASL Clients are not forced to re-authenticate if they don't leverage SaslAuthenticateRequest

2019-12-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991849#comment-16991849
 ] 

ASF GitHub Bot commented on KAFKA-9241:
---

rajinisivaram commented on pull request #7784: KAFKA-9241: Some SASL Clients 
not forced to re-authenticate
URL: https://github.com/apache/kafka/pull/7784
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SASL Clients are not forced to re-authenticate if they don't leverage 
> SaslAuthenticateRequest
> -
>
> Key: KAFKA-9241
> URL: https://issues.apache.org/jira/browse/KAFKA-9241
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.2.0, 2.3.0, 2.2.1
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
>  Labels: security, security-issue
>
> Brokers are supposed to force SASL clients to re-authenticate (and kill such 
> connections in the absence of a timely and successful re-authentication) when 
> SASL Re-Authentication 
> [(KIP-368)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate]
>   is enabled via a positive `connections.max.reauth.ms` configuration value.  
> There is a flaw in the logic that causes connections to not be killed in the 
> absence of a timely and successful re-authentication _if the client does not 
> leverage the SaslAuthenticateRequest API_ (which was defined in 
> [KIP-152|https://cwiki.apache.org/confluence/display/KAFKA/KIP-152+-+Improve+diagnostics+for+SASL+authentication+failures]).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9212) Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest

2019-12-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991826#comment-16991826
 ] 

ASF GitHub Bot commented on KAFKA-9212:
---

hachikuji commented on pull request #7805: KAFKA-9212; Ensure LeaderAndIsr 
state updated in controller context during reassignment
URL: https://github.com/apache/kafka/pull/7805
 
 
   This is a cherry-pick of 
https://github.com/apache/kafka/commit/5d0cb1419cd1f1cdfb7bc04ed4760d5a0eae0aa1.
 The main differences are 1) leader epoch validation is unconditionally 
disable, and 2) the test case has been refactored due to the absence of the 
reassignment admin APIs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest
> --
>
> Key: KAFKA-9212
> URL: https://issues.apache.org/jira/browse/KAFKA-9212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager
>Affects Versions: 2.3.0, 2.3.1
> Environment: Linux
>Reporter: Yannick
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.4.0, 2.3.2
>
>
> When running Kafka connect s3 sink connector ( confluent 5.3.0), after one 
> broker got restarted (leaderEpoch updated at this point), the connect worker 
> crashed with the following error : 
> [2019-11-19 16:20:30,097] ERROR [Worker clientId=connect-1, 
> groupId=connect-ls] Uncaught exception in herder work thread, exiting: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253)
>  org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by 
> times in 30003ms
>  
> After investigation, it seems it's because it got fenced when sending 
> ListOffsetRequest in loop and then got timed out , as follows :
> [2019-11-19 16:20:30,020] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Sending ListOffsetRequest (type=ListOffsetRequest, 
> replicaId=-1, partitionTimestamps={connect_ls_config-0={timestamp: -1, 
> maxNumOffsets: 1, currentLeaderEpoch: Optional[1]}}, 
> isolationLevel=READ_UNCOMMITTED) to broker kafka6.fra2.internal:9092 (id: 4 
> rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:905)
> [2019-11-19 16:20:30,044] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher:985)
>  
> The above happens multiple times until timeout.
>  
> According to the debugs, the consumer always get a leaderEpoch of 1 for this 
> topic when starting up :
>  
>  [2019-11-19 13:27:30,802] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Updating last seen epoch from null to 1 for partition 
> connect_ls_config-0 (org.apache.kafka.clients.Metadata:178)
>   
>   
>  But according to our brokers log, the leaderEpoch should be 2, as follows :
>   
>  [2019-11-18 14:19:28,988] INFO [Partition connect_ls_config-0 broker=4] 
> connect_ls_config-0 starts at Leader Epoch 2 from offset 22. Previous Leader 
> Epoch was: 1 (kafka.cluster.Partition)
>   
>   
>  This make impossible to restart the worker as it will always get fenced and 
> then finally timeout.
>   
>  It is also impossible to consume with a 2.3 kafka-console-consumer as 
> follows :
>   
>  kafka-console-consumer --bootstrap-server BOOTSTRAPSERVER:9092 --topic 
> connect_ls_config --from-beginning 
>   
>  the above will just hang forever ( which is not expected cause there is 
> data) and we can see those debug messages :
> [2019-11-19 22:17:59,124] DEBUG [Consumer clientId=consumer-1, 
> groupId=console-consumer-3844] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>   
>   
>  Interesting fact, if we do subscribe the same way with kafkacat (1.5.0) we 
> can consume without problem ( must be the way kafkacat is consuming ignoring 
> FENCED_LEADER_EPOCH):
>   
>  kafkacat -b BOOTSTRAPSERVER:9092 -t connect_ls_config -o beginning
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-9261) NPE when updating client metadata

2019-12-09 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar reopened KAFKA-9261:
--

> NPE when updating client metadata
> -
>
> Key: KAFKA-9261
> URL: https://issues.apache.org/jira/browse/KAFKA-9261
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.4.0, 2.3.2
>
>
> We have seen the following exception recently:
> {code}
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at org.apache.kafka.common.Cluster.(Cluster.java:134)
>   at org.apache.kafka.common.Cluster.(Cluster.java:89)
>   at 
> org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:120)
>   at org.apache.kafka.clients.MetadataCache.(MetadataCache.java:82)
>   at org.apache.kafka.clients.MetadataCache.(MetadataCache.java:58)
>   at 
> org.apache.kafka.clients.Metadata.handleMetadataResponse(Metadata.java:325)
>   at org.apache.kafka.clients.Metadata.update(Metadata.java:252)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1059)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:845)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:548)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
> {code}
> The client assumes that if a leader is included in the response, then node 
> information must also be available. There are at least a couple possible 
> reasons this assumption can fail:
> 1. The client is able to detect stale partition metadata using leader epoch 
> information available. If stale partition metadata is detected, the client 
> ignores it and uses the last known metadata. However, it cannot detect stale 
> broker information and will always accept the latest update. This means that 
> the latest metadata may be a mix of multiple metadata responses and therefore 
> the invariant will not generally hold.
> 2. There is no lock which protects both the fetching of partition metadata 
> and the live broker when handling a Metadata request. This means an 
> UpdateMetadata request can arrive concurrently and break the intended 
> invariant.
> It seems case 2 has been possible for a long time, but it should be extremely 
> rare. Case 1 was only made possible with KIP-320, which added the leader 
> epoch tracking. It should also be rare, but the window for inconsistent 
> metadata is probably a bit bigger than the window for a concurrent update.
> To fix this, we should make the client more defensive about metadata updates 
> and not assume that the leader is among the live endpoints.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-9212) Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest

2019-12-09 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar reopened KAFKA-9212:
--

> Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest
> --
>
> Key: KAFKA-9212
> URL: https://issues.apache.org/jira/browse/KAFKA-9212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager
>Affects Versions: 2.3.0, 2.3.1
> Environment: Linux
>Reporter: Yannick
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.4.0, 2.3.2
>
>
> When running Kafka connect s3 sink connector ( confluent 5.3.0), after one 
> broker got restarted (leaderEpoch updated at this point), the connect worker 
> crashed with the following error : 
> [2019-11-19 16:20:30,097] ERROR [Worker clientId=connect-1, 
> groupId=connect-ls] Uncaught exception in herder work thread, exiting: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253)
>  org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by 
> times in 30003ms
>  
> After investigation, it seems it's because it got fenced when sending 
> ListOffsetRequest in loop and then got timed out , as follows :
> [2019-11-19 16:20:30,020] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Sending ListOffsetRequest (type=ListOffsetRequest, 
> replicaId=-1, partitionTimestamps={connect_ls_config-0={timestamp: -1, 
> maxNumOffsets: 1, currentLeaderEpoch: Optional[1]}}, 
> isolationLevel=READ_UNCOMMITTED) to broker kafka6.fra2.internal:9092 (id: 4 
> rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:905)
> [2019-11-19 16:20:30,044] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher:985)
>  
> The above happens multiple times until timeout.
>  
> According to the debugs, the consumer always get a leaderEpoch of 1 for this 
> topic when starting up :
>  
>  [2019-11-19 13:27:30,802] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Updating last seen epoch from null to 1 for partition 
> connect_ls_config-0 (org.apache.kafka.clients.Metadata:178)
>   
>   
>  But according to our brokers log, the leaderEpoch should be 2, as follows :
>   
>  [2019-11-18 14:19:28,988] INFO [Partition connect_ls_config-0 broker=4] 
> connect_ls_config-0 starts at Leader Epoch 2 from offset 22. Previous Leader 
> Epoch was: 1 (kafka.cluster.Partition)
>   
>   
>  This make impossible to restart the worker as it will always get fenced and 
> then finally timeout.
>   
>  It is also impossible to consume with a 2.3 kafka-console-consumer as 
> follows :
>   
>  kafka-console-consumer --bootstrap-server BOOTSTRAPSERVER:9092 --topic 
> connect_ls_config --from-beginning 
>   
>  the above will just hang forever ( which is not expected cause there is 
> data) and we can see those debug messages :
> [2019-11-19 22:17:59,124] DEBUG [Consumer clientId=consumer-1, 
> groupId=console-consumer-3844] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>   
>   
>  Interesting fact, if we do subscribe the same way with kafkacat (1.5.0) we 
> can consume without problem ( must be the way kafkacat is consuming ignoring 
> FENCED_LEADER_EPOCH):
>   
>  kafkacat -b BOOTSTRAPSERVER:9092 -t connect_ls_config -o beginning
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7251) Add support for TLS 1.3

2019-12-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991770#comment-16991770
 ] 

ASF GitHub Bot commented on KAFKA-7251:
---

rajinisivaram commented on pull request #7804: KAFKA-7251; Add support for TLS 
1.3
URL: https://github.com/apache/kafka/pull/7804
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for TLS 1.3
> ---
>
> Key: KAFKA-7251
> URL: https://issues.apache.org/jira/browse/KAFKA-7251
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> Java 11 adds support for TLS 1.3. We should support this after we add support 
> for Java 11.
> Related issues:
> [https://bugs.openjdk.java.net/browse/JDK-8206170]
> [https://bugs.openjdk.java.net/browse/JDK-8206178]
> [https://bugs.openjdk.java.net/browse/JDK-8208538]
> [https://bugs.openjdk.java.net/browse/JDK-8207009]
> [https://bugs.openjdk.java.net/browse/JDK-8209893]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9261) NPE when updating client metadata

2019-12-09 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-9261.
--
Resolution: Fixed

> NPE when updating client metadata
> -
>
> Key: KAFKA-9261
> URL: https://issues.apache.org/jira/browse/KAFKA-9261
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.4.0, 2.3.2
>
>
> We have seen the following exception recently:
> {code}
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at org.apache.kafka.common.Cluster.(Cluster.java:134)
>   at org.apache.kafka.common.Cluster.(Cluster.java:89)
>   at 
> org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:120)
>   at org.apache.kafka.clients.MetadataCache.(MetadataCache.java:82)
>   at org.apache.kafka.clients.MetadataCache.(MetadataCache.java:58)
>   at 
> org.apache.kafka.clients.Metadata.handleMetadataResponse(Metadata.java:325)
>   at org.apache.kafka.clients.Metadata.update(Metadata.java:252)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1059)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:845)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:548)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
> {code}
> The client assumes that if a leader is included in the response, then node 
> information must also be available. There are at least a couple possible 
> reasons this assumption can fail:
> 1. The client is able to detect stale partition metadata using leader epoch 
> information available. If stale partition metadata is detected, the client 
> ignores it and uses the last known metadata. However, it cannot detect stale 
> broker information and will always accept the latest update. This means that 
> the latest metadata may be a mix of multiple metadata responses and therefore 
> the invariant will not generally hold.
> 2. There is no lock which protects both the fetching of partition metadata 
> and the live broker when handling a Metadata request. This means an 
> UpdateMetadata request can arrive concurrently and break the intended 
> invariant.
> It seems case 2 has been possible for a long time, but it should be extremely 
> rare. Case 1 was only made possible with KIP-320, which added the leader 
> epoch tracking. It should also be rare, but the window for inconsistent 
> metadata is probably a bit bigger than the window for a concurrent update.
> To fix this, we should make the client more defensive about metadata updates 
> and not assume that the leader is among the live endpoints.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9283) Flaky Test - kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker

2019-12-09 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-9283:
-
Fix Version/s: (was: 2.4.0)

> Flaky Test - 
> kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker
> 
>
> Key: KAFKA-9283
> URL: https://issues.apache.org/jira/browse/KAFKA-9283
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0
> Environment: OS: Ubuntu 18.04.3 LTS
> Java Version: OpenJDK 11.0.4
> Scala Versions: 12.13.0, 12.13.1
> Gradle Version: 5.6.2
>Reporter: Israel Ekpo
>Priority: Major
>  Labels: flaky-test
>
> This same test fails occasionally on when run in Scala  2.12.10 but has been 
> failing consistently in Scala versions 2.13.0, 2.13.1.
> Needs review.
> Also, I had to adjust the scalaVersion variable in the gradle.properties 
> config file to the target version in my environment before it was picked up 
> in the integration test
> cat ~/scratchpad/111744.out/kafka-2.4.0-src/gradle.properties
> OS: Ubuntu 18.04.3 LTS
>  Java Version: OpenJDK 11.0.4
>  Scala Versions: 12.13.0, 12.13.1
>  Gradle Version: 5.6.2
> ./gradlew :core:integrationTest --tests 
> kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker
> > Configure project :
>  Building project 'core' with Scala version 2.13.1
>  Building project 'streams-scala' with Scala version 2.13.1
> > Task :core:integrationTest FAILED
>  
> kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker
>  failed, log available in 
> /home/isekpo/scratchpad/111744.out/kafka-2.4.0-src/core/build/reports/testOutput/kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker.test.stdout
> kafka.admin.ReassignPartitionsClusterTest > 
> shouldMoveSinglePartitionWithinBroker FAILED
>  org.scalatest.exceptions.TestFailedException: Partition should have been 
> moved to the expected log directory
>  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>  at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>  at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
>  at org.scalatest.Assertions.fail(Assertions.scala:1091)
>  at org.scalatest.Assertions.fail$(Assertions.scala:1087)
>  at org.scalatest.Assertions$.fail(Assertions.scala:1389)
>  at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842)
>  at 
> kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker(ReassignPartitionsClusterTest.scala:177)
> 1 test completed, 1 failed
> FAILURE: Build failed with an exception.
>  * What went wrong:
>  Execution failed for task ':core:integrationTest'.
>  > There were failing tests. See the report at: 
> [file:///home/isekpo/scratchpad/111744.out/kafka-2.4.0-src/core/build/reports/tests/integrationTest/index.html]
>  * Try:
>  Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
>  * Get more help at [https://help.gradle.org|https://help.gradle.org/]
> Deprecated Gradle features were used in this build, making it incompatible 
> with Gradle 6.0.
>  Use '--warning-mode all' to show the individual deprecation warnings.
>  See 
> [https://docs.gradle.org/5.6.2/userguide/command_line_interface.html#sec:command_line_warnings]
> BUILD FAILED in 1m 56s
>  13 actionable tasks: 4 executed, 9 up-to-date



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9212) Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest

2019-12-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991737#comment-16991737
 ] 

ASF GitHub Bot commented on KAFKA-9212:
---

hachikuji commented on pull request #7800: KAFKA-9212; Ensure LeaderAndIsr 
state updated in controller context during reassignment (#7795)
URL: https://github.com/apache/kafka/pull/7800
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest
> --
>
> Key: KAFKA-9212
> URL: https://issues.apache.org/jira/browse/KAFKA-9212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager
>Affects Versions: 2.3.0, 2.3.1
> Environment: Linux
>Reporter: Yannick
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.4.0, 2.3.2
>
>
> When running Kafka connect s3 sink connector ( confluent 5.3.0), after one 
> broker got restarted (leaderEpoch updated at this point), the connect worker 
> crashed with the following error : 
> [2019-11-19 16:20:30,097] ERROR [Worker clientId=connect-1, 
> groupId=connect-ls] Uncaught exception in herder work thread, exiting: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253)
>  org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by 
> times in 30003ms
>  
> After investigation, it seems it's because it got fenced when sending 
> ListOffsetRequest in loop and then got timed out , as follows :
> [2019-11-19 16:20:30,020] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Sending ListOffsetRequest (type=ListOffsetRequest, 
> replicaId=-1, partitionTimestamps={connect_ls_config-0={timestamp: -1, 
> maxNumOffsets: 1, currentLeaderEpoch: Optional[1]}}, 
> isolationLevel=READ_UNCOMMITTED) to broker kafka6.fra2.internal:9092 (id: 4 
> rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:905)
> [2019-11-19 16:20:30,044] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher:985)
>  
> The above happens multiple times until timeout.
>  
> According to the debugs, the consumer always get a leaderEpoch of 1 for this 
> topic when starting up :
>  
>  [2019-11-19 13:27:30,802] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Updating last seen epoch from null to 1 for partition 
> connect_ls_config-0 (org.apache.kafka.clients.Metadata:178)
>   
>   
>  But according to our brokers log, the leaderEpoch should be 2, as follows :
>   
>  [2019-11-18 14:19:28,988] INFO [Partition connect_ls_config-0 broker=4] 
> connect_ls_config-0 starts at Leader Epoch 2 from offset 22. Previous Leader 
> Epoch was: 1 (kafka.cluster.Partition)
>   
>   
>  This make impossible to restart the worker as it will always get fenced and 
> then finally timeout.
>   
>  It is also impossible to consume with a 2.3 kafka-console-consumer as 
> follows :
>   
>  kafka-console-consumer --bootstrap-server BOOTSTRAPSERVER:9092 --topic 
> connect_ls_config --from-beginning 
>   
>  the above will just hang forever ( which is not expected cause there is 
> data) and we can see those debug messages :
> [2019-11-19 22:17:59,124] DEBUG [Consumer clientId=consumer-1, 
> groupId=console-consumer-3844] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>   
>   
>  Interesting fact, if we do subscribe the same way with kafkacat (1.5.0) we 
> can consume without problem ( must be the way kafkacat is consuming ignoring 
> FENCED_LEADER_EPOCH):
>   
>  kafkacat -b BOOTSTRAPSERVER:9092 -t connect_ls_config -o beginning
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2019-12-09 Thread Raman Gupta (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991711#comment-16991711
 ] 

Raman Gupta commented on KAFKA-8803:


[~guozhang] Yes, the state is indefinite until the brokers are bounced. I 
believe I posted the complete broker logs earlier -- what is missing from them 
that you need?

I've had this issue happen to me three times, but as [~timvanlaer] says, it 
does not happen consistently, nor do I know how to reproduce it.

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Boyang Chen
>Priority: Major
> Attachments: logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8059) Flaky Test DynamicConnectionQuotaTest #testDynamicConnectionQuota

2019-12-09 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991655#comment-16991655
 ] 

Levani Kokhreidze commented on KAFKA-8059:
--

[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27037/console]

> Flaky Test DynamicConnectionQuotaTest #testDynamicConnectionQuota
> -
>
> Key: KAFKA-8059
> URL: https://issues.apache.org/jira/browse/KAFKA-8059
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0, 2.1.1
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.5.0
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/46/tests]
> {quote}org.scalatest.junit.JUnitTestFailedError: Expected exception 
> java.io.IOException to be thrown, but no exception was thrown
> at 
> org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:100)
> at 
> org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:71)
> at org.scalatest.Assertions$class.intercept(Assertions.scala:822)
> at org.scalatest.junit.JUnitSuite.intercept(JUnitSuite.scala:71)
> at 
> kafka.network.DynamicConnectionQuotaTest.testDynamicConnectionQuota(DynamicConnectionQuotaTest.scala:82){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9279) Silent data loss in Kafka producer

2019-12-09 Thread Alex Mironov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991553#comment-16991553
 ] 

Alex Mironov commented on KAFKA-9279:
-

I agree that this behaviour is unexpected and clearly breaks the transactional 
contract. See `commitTransaction` javadoc for example:
{code:java}
* Commits the ongoing transaction. This method will flush any unsent records 
before actually committing the transaction.
*
* Further, if any of the {@link #send(ProducerRecord)} calls which were part of 
the transaction hit irrecoverable
* errors, this method will throw the last received exception immediately and 
the transaction will not be committed.
* So all {@link #send(ProducerRecord)} calls in a transaction must succeed in 
order for this method to succeed.
...
public void commitTransaction() throws ProducerFencedException {{code}

> Silent data loss in Kafka producer
> --
>
> Key: KAFKA-9279
> URL: https://issues.apache.org/jira/browse/KAFKA-9279
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.0
>Reporter: Andrew Klopper
>Priority: Major
>
> It appears that it is possible for a producer.commitTransaction() call to 
> succeed even if an individual producer.send() call has failed. The following 
> code demonstrates the issue:
> {code:java}
> package org.example.dataloss;
> import java.nio.charset.StandardCharsets;
> import java.util.Properties;
> import java.util.Random;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public class Main {
> public static void main(final String[] args) {
> final Properties producerProps = new Properties();
> if (args.length != 2) {
> System.err.println("Invalid command-line arguments");
> System.exit(1);
> }
> final String bootstrapServer = args[0];
> final String topic = args[1];
> producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServer);
> producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "50");
> producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
> producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
> "100");
> producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
> "true");
> producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> "dataloss_01");
> producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
> "dataloss_01");
> try (final KafkaProducer producer = new 
> KafkaProducer<>(producerProps, new ByteArraySerializer(), new 
> ByteArraySerializer())) {
> producer.initTransactions();
> producer.beginTransaction();
> final Random random = new Random();
> final byte[] largePayload = new byte[200];
> random.nextBytes(largePayload);
> producer.send(
> new ProducerRecord<>(
> topic,
> "large".getBytes(StandardCharsets.UTF_8),
> largePayload
> ),
> (metadata, e) -> {
> if (e == null) {
> System.out.println("INFO: Large payload succeeded");
> } else {
> System.err.printf("ERROR: Large payload failed: 
> %s\n", e.getMessage());
> }
> }
> );
> producer.commitTransaction();
> System.out.println("Commit succeeded");
> } catch (final Exception e) {
> System.err.printf("FATAL ERROR: %s", e.getMessage());
> }
> }
> }
> {code}
> The code prints the following output:
> {code:java}
> ERROR: Large payload failed: The message is 293 bytes when serialized 
> which is larger than the maximum request size you have configured with the 
> max.request.size configuration.
> Commit succeeded{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7925) Constant 100% cpu usage by all kafka brokers

2019-12-09 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991524#comment-16991524
 ] 

Rajini Sivaram commented on KAFKA-7925:
---

I have rebased the PR, added an unit test and tested broker with the fix 
locally. So the PR is now ready for review and hopefully we can merge it for 
the next release (not 2.4.0 since it is too late).

> Constant 100% cpu usage by all kafka brokers
> 
>
> Key: KAFKA-7925
> URL: https://issues.apache.org/jira/browse/KAFKA-7925
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
> Environment: Java 11, Kafka v2.1.0, Kafka v2.1.1, Kafka v2.2.0
>Reporter: Abhi
>Priority: Critical
> Attachments: jira-server.log-1, jira-server.log-2, jira-server.log-3, 
> jira-server.log-4, jira-server.log-5, jira-server.log-6, 
> jira_prod.producer.log, threadump20190212.txt
>
>
> Hi,
> I am seeing constant 100% cpu usage on all brokers in our kafka cluster even 
> without any clients connected to any broker.
> This is a bug that we have seen multiple times in our kafka setup that is not 
> yet open to clients. It is becoming a blocker for our deployment now.
> I am seeing lot of connections to other brokers in CLOSE_WAIT state (see 
> below). In thread usage, I am seeing these threads 
> 'kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-0,kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-1,kafka-network-thread-6-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-2'
>  taking up more than 90% of the cpu time in a 60s interval.
> I have attached a thread dump of one of the brokers in the cluster.
> *Java version:*
> openjdk 11.0.2 2019-01-15
> OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
> OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
> *Kafka verison:* v2.1.0
>  
> *connections:*
> java 144319 kafkagod 88u IPv4 3063266 0t0 TCP *:35395 (LISTEN)
> java 144319 kafkagod 89u IPv4 3063267 0t0 TCP *:9144 (LISTEN)
> java 144319 kafkagod 104u IPv4 3064219 0t0 TCP 
> mwkafka-prod-02.tbd:47292->mwkafka-zk-prod-05.tbd:2181 (ESTABLISHED)
> java 144319 kafkagod 2003u IPv4 3055115 0t0 TCP *:9092 (LISTEN)
> java 144319 kafkagod 2013u IPv4 7220110 0t0 TCP 
> mwkafka-prod-02.tbd:60724->mwkafka-zk-prod-04.dr:2181 (ESTABLISHED)
> java 144319 kafkagod 2020u IPv4 30012904 0t0 TCP 
> mwkafka-prod-02.tbd:38988->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2021u IPv4 30012961 0t0 TCP 
> mwkafka-prod-02.tbd:58420->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2027u IPv4 30015723 0t0 TCP 
> mwkafka-prod-02.tbd:58398->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2028u IPv4 30015630 0t0 TCP 
> mwkafka-prod-02.tbd:36248->mwkafka-prod-02.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2030u IPv4 30015726 0t0 TCP 
> mwkafka-prod-02.tbd:39012->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2031u IPv4 30013619 0t0 TCP 
> mwkafka-prod-02.tbd:38986->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2032u IPv4 30015604 0t0 TCP 
> mwkafka-prod-02.tbd:36246->mwkafka-prod-02.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2033u IPv4 30012981 0t0 TCP 
> mwkafka-prod-02.tbd:36924->mwkafka-prod-01.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2034u IPv4 30012967 0t0 TCP 
> mwkafka-prod-02.tbd:39036->mwkafka-prod-02.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2035u IPv4 30012898 0t0 TCP 
> mwkafka-prod-02.tbd:36866->mwkafka-prod-01.dr:9092 (FIN_WAIT2)
> java 144319 kafkagod 2036u IPv4 30004729 0t0 TCP 
> mwkafka-prod-02.tbd:36882->mwkafka-prod-01.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2037u IPv4 30004914 0t0 TCP 
> mwkafka-prod-02.tbd:58426->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2038u IPv4 30015651 0t0 TCP 
> mwkafka-prod-02.tbd:36884->mwkafka-prod-01.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2039u IPv4 30012966 0t0 TCP 
> mwkafka-prod-02.tbd:58422->mwkafka-prod-01.nyc:9092 (ESTABLISHED)
> java 144319 kafkagod 2040u IPv4 30005643 0t0 TCP 
> mwkafka-prod-02.tbd:36252->mwkafka-prod-02.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2041u IPv4 30012944 0t0 TCP 
> mwkafka-prod-02.tbd:36286->mwkafka-prod-02.dr:9092 (ESTABLISHED)
> java 144319 kafkagod 2042u IPv4 30012973 0t0 TCP 
> mwkafka-prod-02.tbd:9092->mwkafka-prod-01.nyc:51924 (ESTABLISHED)
> java 144319 kafkagod 2043u sock 0,7 0t0 30012463 protocol: TCP
> java 144319 kafkagod 2044u IPv4 30012979 0t0 TCP 
> mwkafka-prod-02.tbd:9092->mwkafka-prod-01.dr:39994 (ESTABLISHED)
> java 144319 kafkagod 2045u IPv4 30012899 0t0 TCP 
> mwkafka-prod-02.tbd:9092->mwkafka-prod-02.nyc:34548 (ESTABLISHED)
> java 144319 kafkagod 2046u sock 0,7 0t0 30003437 protocol: TCP
> java 144319 kafkagod 2047u IPv4 30012980 0t0 TCP 
> mwkafka-prod-02.tbd:9092->mw

[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2019-12-09 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991521#comment-16991521
 ] 

ASF GitHub Bot commented on KAFKA-8733:
---

satishd commented on pull request #7802: [WIP]KAFKA-8733 Initial sketch of the 
solution mentioned in KIP-501.
URL: https://github.com/apache/kafka/pull/7802
 
 
   Follower Replica on leader partition maintains pending fetch requests which 
are to be processed for fetch offsets >= log-end-offset in
   earlier fetch request. Follower replica is considered to be insync if there 
are oending fetch requests.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9279) Silent data loss in Kafka producer

2019-12-09 Thread Andrew Klopper (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991513#comment-16991513
 ] 

Andrew Klopper edited comment on KAFKA-9279 at 12/9/19 11:53 AM:
-

The transaction is supposed to fail if any of the individual sends fails, which 
is clearly stated in the [Kafka Producer 
documentation|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]:
{quote}
When used as part of a transaction, it is not necessary to define a callback or 
check the result of the future in order to detect errors from {{send}}. If any 
of the send calls failed with an irrecoverable error, the final 
[{{commitTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--]
 call will fail and throw the exception from the last failed send. When this 
happens, your application should call 
[{{abortTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--]
 to reset the state and continue to send data.
{quote}
The examples on that page also do not check the result of the individual 
producer send calls that form part of a transaction, which further reinforces 
the idea that failing the transaction on any send failure is the expected 
behaviour.
 


was (Author: andrewrk):
The transaction is supposed to fail if any of the individual sends fails, which 
is clearly stated in the Kafka Producer documentation:

"When used as part of a transaction, it is not necessary to define a callback 
or check the result of the future in order to detect errors from {{send}}. If 
any of the send calls failed with an irrecoverable error, the final 
[{{commitTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--]
 call will fail and throw the exception from the last failed send. When this 
happens, your application should call 
[{{abortTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--]
 to reset the state and continue to send data."

The examples on that page also do not check the result of the individual 
producer send calls that form part of a transaction, which further reinforces 
the idea that failing the transaction on any send failure is the expected 
behaviour.

 

 

 

> Silent data loss in Kafka producer
> --
>
> Key: KAFKA-9279
> URL: https://issues.apache.org/jira/browse/KAFKA-9279
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.0
>Reporter: Andrew Klopper
>Priority: Major
>
> It appears that it is possible for a producer.commitTransaction() call to 
> succeed even if an individual producer.send() call has failed. The following 
> code demonstrates the issue:
> {code:java}
> package org.example.dataloss;
> import java.nio.charset.StandardCharsets;
> import java.util.Properties;
> import java.util.Random;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public class Main {
> public static void main(final String[] args) {
> final Properties producerProps = new Properties();
> if (args.length != 2) {
> System.err.println("Invalid command-line arguments");
> System.exit(1);
> }
> final String bootstrapServer = args[0];
> final String topic = args[1];
> producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServer);
> producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "50");
> producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
> producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
> "100");
> producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
> "true");
> producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> "dataloss_01");
> producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
> "dataloss_01");
> try (final KafkaProducer producer = new 
> KafkaProducer<>(producerProps, new ByteArraySerializer(), new 
> ByteArraySerializer())) {
> producer.initTransactions();
> producer.beginTransaction();
> final Random random = new Random();
> final byte[] largePayload = new byte[200];
> random.nextBytes(largePayload);
> producer.send(
> new ProducerRecord<>(
> topic,
> "large".getBytes(StandardCha

[jira] [Comment Edited] (KAFKA-9279) Silent data loss in Kafka producer

2019-12-09 Thread Andrew Klopper (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991513#comment-16991513
 ] 

Andrew Klopper edited comment on KAFKA-9279 at 12/9/19 11:48 AM:
-

The transaction is supposed to fail if any of the individual sends fails, which 
is clearly stated in the Kafka Producer documentation:

"When used as part of a transaction, it is not necessary to define a callback 
or check the result of the future in order to detect errors from {{send}}. If 
any of the send calls failed with an irrecoverable error, the final 
[{{commitTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--]
 call will fail and throw the exception from the last failed send. When this 
happens, your application should call 
[{{abortTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--]
 to reset the state and continue to send data."

The examples on that page also do not check the result of the individual 
producer send calls that form part of a transaction, which further reinforces 
the idea that failing the transaction on any send failure is the expected 
behaviour.

 

 

 


was (Author: andrewrk):
The transaction is supposed to fail if any of the individual sends fails. The 
Kafka Producer documentation states the following:

"When used as part of a transaction, it is not necessary to define a callback 
or check the result of the future in order to detect errors from {{send}}. If 
any of the send calls failed with an irrecoverable error, the final 
[{{commitTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--]
 call will fail and throw the exception from the last failed send. When this 
happens, your application should call 
[{{abortTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--]
 to reset the state and continue to send data."

 

 

> Silent data loss in Kafka producer
> --
>
> Key: KAFKA-9279
> URL: https://issues.apache.org/jira/browse/KAFKA-9279
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.0
>Reporter: Andrew Klopper
>Priority: Major
>
> It appears that it is possible for a producer.commitTransaction() call to 
> succeed even if an individual producer.send() call has failed. The following 
> code demonstrates the issue:
> {code:java}
> package org.example.dataloss;
> import java.nio.charset.StandardCharsets;
> import java.util.Properties;
> import java.util.Random;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public class Main {
> public static void main(final String[] args) {
> final Properties producerProps = new Properties();
> if (args.length != 2) {
> System.err.println("Invalid command-line arguments");
> System.exit(1);
> }
> final String bootstrapServer = args[0];
> final String topic = args[1];
> producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServer);
> producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "50");
> producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
> producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
> "100");
> producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
> "true");
> producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> "dataloss_01");
> producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
> "dataloss_01");
> try (final KafkaProducer producer = new 
> KafkaProducer<>(producerProps, new ByteArraySerializer(), new 
> ByteArraySerializer())) {
> producer.initTransactions();
> producer.beginTransaction();
> final Random random = new Random();
> final byte[] largePayload = new byte[200];
> random.nextBytes(largePayload);
> producer.send(
> new ProducerRecord<>(
> topic,
> "large".getBytes(StandardCharsets.UTF_8),
> largePayload
> ),
> (metadata, e) -> {
> if (e == null) {
> System.out.println("INFO: Large payload succeeded");
> } else {
> System.err.printf("ERROR: Large payload failed: 
> %s\n", e.getM

[jira] [Commented] (KAFKA-9279) Silent data loss in Kafka producer

2019-12-09 Thread Andrew Klopper (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991513#comment-16991513
 ] 

Andrew Klopper commented on KAFKA-9279:
---

The transaction is supposed to fail if any of the individual sends fails. The 
Kafka Producer documentation states the following:

"When used as part of a transaction, it is not necessary to define a callback 
or check the result of the future in order to detect errors from {{send}}. If 
any of the send calls failed with an irrecoverable error, the final 
[{{commitTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--]
 call will fail and throw the exception from the last failed send. When this 
happens, your application should call 
[{{abortTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--]
 to reset the state and continue to send data."

 

 

> Silent data loss in Kafka producer
> --
>
> Key: KAFKA-9279
> URL: https://issues.apache.org/jira/browse/KAFKA-9279
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.0
>Reporter: Andrew Klopper
>Priority: Major
>
> It appears that it is possible for a producer.commitTransaction() call to 
> succeed even if an individual producer.send() call has failed. The following 
> code demonstrates the issue:
> {code:java}
> package org.example.dataloss;
> import java.nio.charset.StandardCharsets;
> import java.util.Properties;
> import java.util.Random;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public class Main {
> public static void main(final String[] args) {
> final Properties producerProps = new Properties();
> if (args.length != 2) {
> System.err.println("Invalid command-line arguments");
> System.exit(1);
> }
> final String bootstrapServer = args[0];
> final String topic = args[1];
> producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServer);
> producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "50");
> producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
> producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
> "100");
> producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
> "true");
> producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> "dataloss_01");
> producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
> "dataloss_01");
> try (final KafkaProducer producer = new 
> KafkaProducer<>(producerProps, new ByteArraySerializer(), new 
> ByteArraySerializer())) {
> producer.initTransactions();
> producer.beginTransaction();
> final Random random = new Random();
> final byte[] largePayload = new byte[200];
> random.nextBytes(largePayload);
> producer.send(
> new ProducerRecord<>(
> topic,
> "large".getBytes(StandardCharsets.UTF_8),
> largePayload
> ),
> (metadata, e) -> {
> if (e == null) {
> System.out.println("INFO: Large payload succeeded");
> } else {
> System.err.printf("ERROR: Large payload failed: 
> %s\n", e.getMessage());
> }
> }
> );
> producer.commitTransaction();
> System.out.println("Commit succeeded");
> } catch (final Exception e) {
> System.err.printf("FATAL ERROR: %s", e.getMessage());
> }
> }
> }
> {code}
> The code prints the following output:
> {code:java}
> ERROR: Large payload failed: The message is 293 bytes when serialized 
> which is larger than the maximum request size you have configured with the 
> max.request.size configuration.
> Commit succeeded{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7251) Add support for TLS 1.3

2019-12-09 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reassigned KAFKA-7251:
-

Assignee: Rajini Sivaram

> Add support for TLS 1.3
> ---
>
> Key: KAFKA-7251
> URL: https://issues.apache.org/jira/browse/KAFKA-7251
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> Java 11 adds support for TLS 1.3. We should support this after we add support 
> for Java 11.
> Related issues:
> [https://bugs.openjdk.java.net/browse/JDK-8206170]
> [https://bugs.openjdk.java.net/browse/JDK-8206178]
> [https://bugs.openjdk.java.net/browse/JDK-8208538]
> [https://bugs.openjdk.java.net/browse/JDK-8207009]
> [https://bugs.openjdk.java.net/browse/JDK-8209893]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9220) TimeoutException when using kafka-preferred-replica-election

2019-12-09 Thread Or Shemesh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991479#comment-16991479
 ] 

Or Shemesh commented on KAFKA-9220:
---

Hi [~huxi_2b] & [~sbellapu] ,

Thanks, from your experience when do you think this fix is going to be 
published?

> TimeoutException when using kafka-preferred-replica-election
> 
>
> Key: KAFKA-9220
> URL: https://issues.apache.org/jira/browse/KAFKA-9220
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.0
>Reporter: Or Shemesh
>Priority: Major
>
> When running kafka-preferred-replica-election --bootstrap-server xxx:9092
> I'm getting this error:
> Timeout waiting for election resultsTimeout waiting for election 
> resultsException in thread "main" kafka.common.AdminCommandFailedException at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$AdminClientCommand.electPreferredLeaders(PreferredReplicaLeaderElectionCommand.scala:246)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$.run(PreferredReplicaLeaderElectionCommand.scala:78)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$.main(PreferredReplicaLeaderElectionCommand.scala:42)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand.main(PreferredReplicaLeaderElectionCommand.scala)Caused
>  by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
>  
> Because we have a big cluster and getting all the data from the zookeeper is 
> taking more the 30 second.
>  
> After searching the code I saw that the 30 second is hard-coded can you 
> enable us to set the timeout as parameter?
> [https://github.com/confluentinc/kafka/blob/master/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2019-12-09 Thread VIkram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991419#comment-16991419
 ] 

VIkram commented on KAFKA-9280:
---

Let's say high watermark for topic partition is 1000 and leader, follower 
replicas have same messages exactly. In this scenario, producer sends a message 
to leader and other replicas and consumer sends a fetch request to leader. Is 
there a possibility here, where a consumer fetch request will be served before 
other replicas fetch request? 

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _C

[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2019-12-09 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991244#comment-16991244
 ] 

Tim Van Laer commented on KAFKA-8803:
-

[~guozhang] Well, hard to say but to me it looked like the state was 
indefinitely (no recovery at all). Only after restarting the txn coordinator, 
the application started processing again. So it is indeed as [~rocketraman] 
described, without intervention, the application nor the broker recovers itself.

I'm sorry, but I already lost the logs from that particular moment. The issue 
occurred only once and never appeared since then... 



> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Boyang Chen
>Priority: Major
> Attachments: logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)