[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-04-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814015#comment-16814015
 ] 

ASF GitHub Bot commented on KAFKA-7965:
---

huxihx commented on pull request #6557: KAFKA-7965: Fix 
testRollingBrokerRestartsWithSmallerMaxGroup failure
URL: https://github.com/apache/kafka/pull/6557
 
 
   https://issues.apache.org/jira/browse/KAFKA-7965
   
   Most of the time, the group coordinator runs on broker 1. Occasionally the 
group coordinator will be placed on broker 2. If that's the case, the loop 
starting at line 320 have no chance to check and update `kickedOutConsumerIdx`. 
A quick fix is to safely do another round of loop to ensure 
`kickedOutConsumerIdx` always be checked after the last broker restart.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: huxihx
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-04-09 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813990#comment-16813990
 ] 

Dhruvil Shah edited comment on KAFKA-7362 at 4/10/19 2:12 AM:
--

[~xiongqiwu] thank you for working on this! I agree that we need to figure out 
the appropriate time to initiate cleanup of orphaned partitions. Perhaps we 
could discuss more about the implementation after you open the PR for review.


was (Author: dhruvilshah):
[~xiongqiwu] from my understanding, we could only have orphan partitions when 
an offline broker comes back online, so doing this cleanup once on startup 
should be sufficient. We could talk more about the implementation when you open 
the PR for review. Thank you for working on this!

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-04-09 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813990#comment-16813990
 ] 

Dhruvil Shah commented on KAFKA-7362:
-

[~xiongqiwu] from my understanding, we could only have orphan partitions when 
an offline broker comes back online, so doing this cleanup once on startup 
should be sufficient. We could talk more about the implementation when you open 
the PR for review. Thank you for working on this!

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7093) Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0

2019-04-09 Thread Gene Yi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813971#comment-16813971
 ] 

Gene Yi commented on KAFKA-7093:


it's indeed the bug mentioned by [~gwenshap], 
https://issues.apache.org/jira/browse/KAFKA-7415, after we upgrade to 2.0.1 we 
can see leader-epoch-checkpoint for each topic updated to the correct values( 
sync with the epoch number from zookeeper).

> Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0
> 
>
> Key: KAFKA-7093
> URL: https://issues.apache.org/jira/browse/KAFKA-7093
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.1.0
>Reporter: Suleyman
>Priority: Major
>
> I upgraded to kafka version from 0.11.0.1 to 1.1.0. After the upgrade, I'm 
> getting the below warn message too much.
> WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. 
> This implies messages have arrived out of order. New: \{epoch:0, 
> offset:793868383}, Current: \{epoch:4, offset:792201264} for Partition: 
> __consumer_offsets-42 (kafka.server.epoch.LeaderEpochFileCache) 
> How can I resolve this warn messages? And why I'm getting this warn messages?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7093) Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0

2019-04-09 Thread Gene Yi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813937#comment-16813937
 ] 

Gene Yi commented on KAFKA-7093:


seems empty file leader-epoch-checkpoint for each topic will resolve this

> Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0
> 
>
> Key: KAFKA-7093
> URL: https://issues.apache.org/jira/browse/KAFKA-7093
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.1.0
>Reporter: Suleyman
>Priority: Major
>
> I upgraded to kafka version from 0.11.0.1 to 1.1.0. After the upgrade, I'm 
> getting the below warn message too much.
> WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. 
> This implies messages have arrived out of order. New: \{epoch:0, 
> offset:793868383}, Current: \{epoch:4, offset:792201264} for Partition: 
> __consumer_offsets-42 (kafka.server.epoch.LeaderEpochFileCache) 
> How can I resolve this warn messages? And why I'm getting this warn messages?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7897) Invalid use of epoch cache with old message format versions

2019-04-09 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-7897:
---
Fix Version/s: 1.1.2

> Invalid use of epoch cache with old message format versions
> ---
>
> Key: KAFKA-7897
> URL: https://issues.apache.org/jira/browse/KAFKA-7897
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2
>
>
> Message format downgrades are not supported, but they generally work as long 
> as broker/clients at least can continue to parse both message formats. After 
> a downgrade, the truncation logic should revert to using the high watermark, 
> but currently we use the existence of any cached epoch as the sole 
> prerequisite in order to leverage OffsetsForLeaderEpoch. This has the effect 
> of causing a massive truncation after startup which causes re-replication.
> I think our options to fix this are to either 1) clear the cache when we 
> notice a downgrade, or 2) forbid downgrades and raise an error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8200) TopologyTestDriver should offer an iterable signature of readOutput

2019-04-09 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813848#comment-16813848
 ] 

Patrik Kleindl commented on KAFKA-8200:
---

[~mjsax] Does this require a KIP? I took the liberty to try this out, seems to 
work fine, feedback on the PR is welcome.

> TopologyTestDriver should offer an iterable signature of readOutput
> ---
>
> Key: KAFKA-8200
> URL: https://issues.apache.org/jira/browse/KAFKA-8200
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Drogalis
>Priority: Minor
>  Labels: needs-kip
>
> When using the TopologyTestDriver, one examines the output on a topic with 
> the readOutput method. This method returns one record at a time, until no 
> more records can be found, at which point in returns null.
> Many times, the usage pattern around readOutput will involve writing a loop 
> to extract a number of records from the topic, building up a list of records, 
> until it returns null.
> It would be helpful to offer an iterable signature of readOutput, which 
> returns either an iterator or list over the records that are currently 
> available in the topic. This would effectively remove the loop that a user 
> needs to write for him/herself each time.
> Such a signature might look like:
> {code:java}
> public Iterable> readOutput(java.lang.String 
> topic);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8200) TopologyTestDriver should offer an iterable signature of readOutput

2019-04-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813847#comment-16813847
 ] 

ASF GitHub Bot commented on KAFKA-8200:
---

pkleindl commented on pull request #6556: KAFKA-8200: added Iterator methods 
for output to TopologyTestDriver
URL: https://github.com/apache/kafka/pull/6556
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver should offer an iterable signature of readOutput
> ---
>
> Key: KAFKA-8200
> URL: https://issues.apache.org/jira/browse/KAFKA-8200
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Drogalis
>Priority: Minor
>  Labels: needs-kip
>
> When using the TopologyTestDriver, one examines the output on a topic with 
> the readOutput method. This method returns one record at a time, until no 
> more records can be found, at which point in returns null.
> Many times, the usage pattern around readOutput will involve writing a loop 
> to extract a number of records from the topic, building up a list of records, 
> until it returns null.
> It would be helpful to offer an iterable signature of readOutput, which 
> returns either an iterator or list over the records that are currently 
> available in the topic. This would effectively remove the loop that a user 
> needs to write for him/herself each time.
> Such a signature might look like:
> {code:java}
> public Iterable> readOutput(java.lang.String 
> topic);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8204) Streams may flush state stores in the incorrect order

2019-04-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813748#comment-16813748
 ] 

ASF GitHub Bot commented on KAFKA-8204:
---

vvcephei commented on pull request #6555: KAFKA-8204: fix Streams store flush 
order
URL: https://github.com/apache/kafka/pull/6555
 
 
   Streams previously flushed stores in the order of their registration,
   which is arbitrary. Because stores may forward values upon flush
   (as in cached state stores), we must flush stores in topological order.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams may flush state stores in the incorrect order
> -
>
> Key: KAFKA-8204
> URL: https://issues.apache.org/jira/browse/KAFKA-8204
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Cached state stores may forward records during a flush call, so Streams 
> should flush the stores in topological order. Otherwise, Streams may flush a 
> downstream store before an upstream one, resulting in sink results being 
> committed without the corresponding state changelog updates being committed.
> This behavior is partly responsible for the bug reported in KAFKA-7895 .
> The fix is simply to flush the stores in topological order, then when the 
> upstream store forwards records to a downstream stateful processor, the 
> corresponding state changes will be correctly flushed as well.
> An alternative would be to repeatedly call flush on all state stores until 
> they report there is nothing left to flush, but this requires a public API 
> change to enable state stores to report whether they need a flush or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted

2019-04-09 Thread alex gabriel (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

alex gabriel updated KAFKA-8206:

Description: 
*A consumer can't discover new group coordinator when the cluster was partly 
restarted*

Preconditions:
I use Kafka server and Java kafka-client lib 2.2 version
I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 
ZK(localhost:2181)
I have replication factor 2 for the all my topics and 
'_unclean.leader.election.enable=true_' on both Kafka nodes.

Steps to reproduce:

1) Start 2nodes (localhost:9092/localhost:9093)
2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'
{noformat}
// discovered group coordinator (0-node)
2019-04-09 16:23:18,963 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>

...metadatacache is updated (2 nodes in the cluster list)
2019-04-09 16:23:18,928 DEBUG 
[org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
[Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending 
metadata request (type=MetadataRequest, topics=) to node localhost:9092 
(id: -1 rack: null)>
2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), 
localhost:9093 (id: 1 rack: null)], partitions = [], controller = 
localhost:9092 (id: 0 rack: null))}>
{noformat}
3) Shutdown 1-node (localhost:9093)
{noformat}
// metadata was updated to the 4 version (but for some reasons it still had 2 
alive nodes inside cluster)
2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), 
localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = 
events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], 
offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 
0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = 
localhost:9092 (id: 0 rack: null))}>

//consumers thinks that node-1 is still alive and try to send coordinator 
lookup to it but failed
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or 
invalid, will attempt rediscovery>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
2019-04-09 16:24:01,117 WARN 
[org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 
(localhost:9093) could not be established. Broker may not be available.>

// refreshing metadata again
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled 
request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, 
clientId=events-consumer0, correlationId=112) due to node 1 being disconnected>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Coordinator discovery failed, refreshing metadata>

// metadata was updated to the 5 version where cluster had only 0-node 
localhost:9092 as expected.
2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions 
= [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = 
[0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, 
partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = 
[1])], controller = localhost:9092 (id: 0 rack: null))}>

// 0-node discovered as coordinator
2019-04-09 16:24:01,132 INFO 

[jira] [Updated] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted

2019-04-09 Thread alex gabriel (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

alex gabriel updated KAFKA-8206:

Description: 
*A consumer can't discover new group coordinator when the cluster was partly 
restarted*

Preconditions:
I use Kafka server and Java kafka-client lib 2.2 version
I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 
ZK(localhost:2181)
I have replication factor 2 for the all my topics and 
'_unclean.leader.election.enable=true_' on both Kafka nodes.

Steps to reproduce:

1) Start 2nodes (localhost:9092/localhost:9093)
2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'
{noformat}
// discovered group coordinator (0-node)
2019-04-09 16:23:18,963 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>

...metadatacache is updated (2 nodes in the cluster list)
2019-04-09 16:23:18,928 DEBUG 
[org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
[Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending 
metadata request (type=MetadataRequest, topics=) to node localhost:9092 
(id: -1 rack: null)>
2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), 
localhost:9093 (id: 1 rack: null)], partitions = [], controller = 
localhost:9092 (id: 0 rack: null))}>
{noformat}
3) Shutdown 1-node (localhost:9093)
{noformat}
// metadata was updated to the 4 version (but for some reasons it still had 2 
alive nodes inside cluster)
2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), 
localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = 
events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], 
offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 
0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = 
localhost:9092 (id: 0 rack: null))}>

//consumers thinks that node-1 is still alive and try to send coordinator 
lookup to it but failed
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or 
invalid, will attempt rediscovery>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
2019-04-09 16:24:01,117 WARN 
[org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 
(localhost:9093) could not be established. Broker may not be available.>

// refreshing metadata again
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled 
request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, 
clientId=events-consumer0, correlationId=112) due to node 1 being disconnected>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Coordinator discovery failed, refreshing metadata>

// metadata was updated to the 5 version where cluster had only 0-node 
localhost:9092 as expected.
2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions 
= [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = 
[0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, 
partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = 
[1])], controller = localhost:9092 (id: 0 rack: null))}>

// 0-node discovered as coordinator
2019-04-09 16:24:01,132 INFO 

[jira] [Updated] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted

2019-04-09 Thread alex gabriel (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

alex gabriel updated KAFKA-8206:

Description: 
*A consumer can't discover new group coordinator when the cluster was partly 
restarted*

Preconditions:
I use Kafka server and Java kafka-client lib 2.2 version
I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 
ZK(localhost:2181)
I have replication factor 2 for the all my topics and 
'_unclean.leader.election.enable=true_' on both Kafka nodes.

Steps to reproduce:

1) Start 2nodes (localhost:9092/localhost:9093)
2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'
{noformat}
// discovered group coordinator (0-node)
2019-04-09 16:23:18,963 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>

...metadatacache is updated (2 nodes in the cluster list)
2019-04-09 16:23:18,928 DEBUG 
[org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
[Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending 
metadata request (type=MetadataRequest, topics=) to node localhost:9092 
(id: -1 rack: null)>
2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), 
localhost:9093 (id: 1 rack: null)], partitions = [], controller = 
localhost:9092 (id: 0 rack: null))}>
{noformat}
3) Shutdown 1-node (localhost:9093)
{noformat}
// metadata was updated to the 4 version (but for some reasons it still had 2 
alive nodes inside cluster)
2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), 
localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = 
events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], 
offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 
0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = 
localhost:9092 (id: 0 rack: null))}>

//consumers thinks that node-1 is still alive and try to send coordinator 
lookup to it but failed
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or 
invalid, will attempt rediscovery>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
2019-04-09 16:24:01,117 WARN 
[org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 
(localhost:9093) could not be established. Broker may not be available.>

// refreshing metadata again
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled 
request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, 
clientId=events-consumer0, correlationId=112) due to node 1 being disconnected>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Coordinator discovery failed, refreshing metadata>

// metadata was updated to the 5 version where cluster had only 0-node 
localhost:9092 as expected.
2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions 
= [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = 
[0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, 
partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = 
[1])], controller = localhost:9092 (id: 0 rack: null))}>

// 0-node discovered as coordinator
2019-04-09 16:24:01,132 INFO 

[jira] [Created] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted

2019-04-09 Thread alex gabriel (JIRA)
alex gabriel created KAFKA-8206:
---

 Summary: A consumer can't discover new group coordinator when the 
cluster was partly restarted
 Key: KAFKA-8206
 URL: https://issues.apache.org/jira/browse/KAFKA-8206
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.2.0, 2.0.0, 1.0.0
Reporter: alex gabriel


*A consumer can't discover new group coordinator when the cluster was partly 
restarted*

Preconditions:
I use Kafka server and Java kafka-client lib 2.2 version
I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 
ZK(localhost:2181/localhost:2181)
I have replication factor 2 for the all my topics and 
'_unclean.leader.election.enable=true_' on both Kafka nodes.

Steps to reproduce:

1) Start 2nodes (localhost:9092/localhost:9093)
2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'
{noformat}
// discovered group coordinator (0-node)
2019-04-09 16:23:18,963 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>

...metadatacache is updated (2 nodes in the cluster list)
2019-04-09 16:23:18,928 DEBUG 
[org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
[Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending 
metadata request (type=MetadataRequest, topics=) to node localhost:9092 
(id: -1 rack: null)>
2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), 
localhost:9093 (id: 1 rack: null)], partitions = [], controller = 
localhost:9092 (id: 0 rack: null))}>
{noformat}
3) Shutdown 1-node (localhost:9093)
{noformat}
// metadata was updated to the 4 version (but for some reasons it still had 2 
alive nodes inside cluster)
2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), 
localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = 
events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], 
offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 
0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = 
localhost:9092 (id: 0 rack: null))}>

//consumers thinks that node-1 is still alive and try to send coordinator 
lookup to it but failed
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or 
invalid, will attempt rediscovery>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
2019-04-09 16:24:01,117 WARN 
[org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 
(localhost:9093) could not be established. Broker may not be available.>

// refreshing metadata again
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled 
request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, 
clientId=events-consumer0, correlationId=112) due to node 1 being disconnected>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Coordinator discovery failed, refreshing metadata>

// metadata was updated to the 5 version where cluster had only 0-node 
localhost:9092 as expected.
2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions 
= [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = 
[0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, 
partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = 

[jira] [Commented] (KAFKA-8205) Kafka SSL encryption of data at rest

2019-04-09 Thread Niten Aggarwal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813672#comment-16813672
 ] 

Niten Aggarwal commented on KAFKA-8205:
---

Hi [~sriharsha],

Sure next time i will submit question in mailing list..

We are using String serializers.

props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");

 

Not sure about disk encryption but if i use same cluster with non ssl security 
protocol, we don't see data encrypted in log files. 

Am i missing some configuration?

> Kafka SSL encryption of data at rest
> 
>
> Key: KAFKA-8205
> URL: https://issues.apache.org/jira/browse/KAFKA-8205
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 1.0.1
> Environment: All
>Reporter: Niten Aggarwal
>Priority: Major
>
> Recently we enabled SSL on our kafka cluster which earlier had SASL 
> PLAINTEXT. Everything works fine from both producer and consumer standpoint 
> as expected with one strange behavior. We noticed data in the log file is 
> also encrypted which we didn't thought of because SSL is meant for transport 
> level security not to encrypt data at rest.
> It doesn't mean we have any issues with that but would like to understand 
> what enables to perform encrypting data at rest. Do we have a way to:-
> 1) turn it off
> 2) Extend the encryption algorithm if company would like to use their own key 
> management system and different algorithm.
> After going through Kafka docs, we realized there is a KIP already in 
> discussion but how come it's implemented without been approved?
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+transparent+data+encryption+functionality]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2019-04-09 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813657#comment-16813657
 ] 

Jonathan Santilli edited comment on KAFKA-7656 at 4/9/19 5:38 PM:
--

Currently 2.2, we have updated from 2.0 recently.

This error started showing up since we update to 2.2 version. [~jagsancio]


was (Author: pachilo):
Currently 2.2, we have updated from 2.0 recently.

This error started showing up since we update to 2.2 version.

> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1
> Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 
> EDT 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Patrick Haas
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> (Note: From 2.0.1-cp1 from confluent distribution)
> {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error 
> processing fetch operation on partition __consumer_offsets-20, offset 0 
> (kafka.server.ReplicaManager)}}
> {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log 
> read from segment FileRecords(file= 
> /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
> start=0, end=2147483647)}}
> {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
> {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
> {{ at kafka.log.Log.read(Log.scala:1114)}}
> {{ at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
> {{ at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
> {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
> {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
> {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
> {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
> {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
> {{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
> {{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8205) Kafka SSL encryption of data at rest

2019-04-09 Thread Sriharsha Chintalapani (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813659#comment-16813659
 ] 

Sriharsha Chintalapani commented on KAFKA-8205:
---

[~nitena2019] This is question should be in the mailing list rather than 
opening a JIRA.

Kafka doesn't have data at rest encryption yet. Kafka Ssl provides wire 
encryption only

What you mean by your data in logs are encrypted? is it possible that what you 
are seeing is serialized data from Producers?

And make sure you don't have disk encryption from OS or other third-party 
turned on

> Kafka SSL encryption of data at rest
> 
>
> Key: KAFKA-8205
> URL: https://issues.apache.org/jira/browse/KAFKA-8205
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 1.0.1
> Environment: All
>Reporter: Niten Aggarwal
>Priority: Major
>
> Recently we enabled SSL on our kafka cluster which earlier had SASL 
> PLAINTEXT. Everything works fine from both producer and consumer standpoint 
> as expected with one strange behavior. We noticed data in the log file is 
> also encrypted which we didn't thought of because SSL is meant for transport 
> level security not to encrypt data at rest.
> It doesn't mean we have any issues with that but would like to understand 
> what enables to perform encrypting data at rest. Do we have a way to:-
> 1) turn it off
> 2) Extend the encryption algorithm if company would like to use their own key 
> management system and different algorithm.
> After going through Kafka docs, we realized there is a KIP already in 
> discussion but how come it's implemented without been approved?
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+transparent+data+encryption+functionality]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2019-04-09 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813657#comment-16813657
 ] 

Jonathan Santilli commented on KAFKA-7656:
--

Currently 2.2, we have updated from 2.0 recently.

This error started showing up since we update to 2.2 version.

> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1
> Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 
> EDT 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Patrick Haas
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> (Note: From 2.0.1-cp1 from confluent distribution)
> {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error 
> processing fetch operation on partition __consumer_offsets-20, offset 0 
> (kafka.server.ReplicaManager)}}
> {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log 
> read from segment FileRecords(file= 
> /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
> start=0, end=2147483647)}}
> {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
> {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
> {{ at kafka.log.Log.read(Log.scala:1114)}}
> {{ at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
> {{ at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
> {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
> {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
> {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
> {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
> {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
> {{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
> {{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2019-04-09 Thread Jose Armando Garcia Sancio (JIRA)


[jira] [Updated] (KAFKA-8205) Kafka SSL encryption of data at rest

2019-04-09 Thread Niten Aggarwal (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niten Aggarwal updated KAFKA-8205:
--
Summary: Kafka SSL encryption of data at rest  (was: Kafka SSL encryption 
of dataat rest)

> Kafka SSL encryption of data at rest
> 
>
> Key: KAFKA-8205
> URL: https://issues.apache.org/jira/browse/KAFKA-8205
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 1.0.1
> Environment: All
>Reporter: Niten Aggarwal
>Priority: Major
>
> Recently we enabled SSL on our kafka cluster which earlier had SASL 
> PLAINTEXT. Everything works fine from both producer and consumer standpoint 
> as expected with one strange behavior. We noticed data in the log file is 
> also encrypted which we didn't thought of because SSL is meant for transport 
> level security not to encrypt data at rest.
> It doesn't mean we have any issues with that but would like to understand 
> what enables to perform encrypting data at rest. Do we have a way to:-
> 1) turn it off
> 2) Extend the encryption algorithm if company would like to use their own key 
> management system and different algorithm.
> After going through Kafka docs, we realized there is a KIP already in 
> discussion but how come it's implemented without been approved?
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+transparent+data+encryption+functionality]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8205) Kafka SSL encryption of dataat rest

2019-04-09 Thread Niten Aggarwal (JIRA)
Niten Aggarwal created KAFKA-8205:
-

 Summary: Kafka SSL encryption of dataat rest
 Key: KAFKA-8205
 URL: https://issues.apache.org/jira/browse/KAFKA-8205
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 1.0.1
 Environment: All
Reporter: Niten Aggarwal


Recently we enabled SSL on our kafka cluster which earlier had SASL PLAINTEXT. 
Everything works fine from both producer and consumer standpoint as expected 
with one strange behavior. We noticed data in the log file is also encrypted 
which we didn't thought of because SSL is meant for transport level security 
not to encrypt data at rest.

It doesn't mean we have any issues with that but would like to understand what 
enables to perform encrypting data at rest. Do we have a way to:-

1) turn it off

2) Extend the encryption algorithm if company would like to use their own key 
management system and different algorithm.

After going through Kafka docs, we realized there is a KIP already in 
discussion but how come it's implemented without been approved?

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+transparent+data+encryption+functionality]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-04-09 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813626#comment-16813626
 ] 

John Roesler edited comment on KAFKA-7895 at 4/9/19 5:08 PM:
-

I'm tracking the incorrect flushing behavior in a separate ticket (KAFKA-8204). 
This is the root cause of the continued duplicate results, even when EOS is 
enabled, when the Streams process undergoes an abrubt stop.


was (Author: vvcephei):
I'm tracking the incorrect flushing behavior in a separate ticket. This is the 
root cause of the continued duplicate results, even when EOS is enabled, when 
the Streams process undergoes an abrubt stop.

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-04-09 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813626#comment-16813626
 ] 

John Roesler commented on KAFKA-7895:
-

I'm tracking the incorrect flushing behavior in a separate ticket. This is the 
root cause of the continued duplicate results, even when EOS is enabled, when 
the Streams process undergoes an abrubt stop.

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8204) Streams may flush state stores in the incorrect order

2019-04-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-8204:
---

 Summary: Streams may flush state stores in the incorrect order
 Key: KAFKA-8204
 URL: https://issues.apache.org/jira/browse/KAFKA-8204
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


Cached state stores may forward records during a flush call, so Streams should 
flush the stores in topological order. Otherwise, Streams may flush a 
downstream store before an upstream one, resulting in sink results being 
committed without the corresponding state changelog updates being committed.

This behavior is partly responsible for the bug reported in KAFKA-7895 .

The fix is simply to flush the stores in topological order, then when the 
upstream store forwards records to a downstream stateful processor, the 
corresponding state changes will be correctly flushed as well.

An alternative would be to repeatedly call flush on all state stores until they 
report there is nothing left to flush, but this requires a public API change to 
enable state stores to report whether they need a flush or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers

2019-04-09 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813603#comment-16813603
 ] 

Paul Davidson commented on KAFKA-5061:
--

Voting is now underway on 
[KIP-411|https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct].
  Please vote if you would like this bug resolved! 

> client.id should be set for Connect producers/consumers
> ---
>
> Key: KAFKA-5061
> URL: https://issues.apache.org/jira/browse/KAFKA-5061
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip, newbie++
>
> In order to properly monitor individual tasks using the producer and consumer 
> metrics, we need to have the framework disambiguate them. Currently when we 
> create producers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362)
>  and create consumers 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394)
>  the client ID is not being set. You can override it for the entire worker 
> via worker-level producer/consumer overrides, but you can't get per-task 
> metrics.
> There are a couple of things we might want to consider doing here:
> 1. Provide default client IDs based on the worker group ID + task ID 
> (providing uniqueness for multiple connect clusters up to the scope of the 
> Kafka cluster they are operating on). This seems ideal since it's a good 
> default; however it is a public-facing change and may need a KIP. Normally I 
> would be less worried about this, but some folks may be relying on picking up 
> metrics without this being set, in which case such a change would break their 
> monitoring.
> 2. Allow overriding client.id on a per-connector basis. I'm not sure if this 
> will really be useful or not -- it lets you differentiate between metrics for 
> different connectors' tasks, but within a connector, all metrics would go to 
> a single client.id. On the other hand, this makes the tasks act as a single 
> group from the perspective of broker handling of client IDs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8203) plaintext connections to SSL secured broker can be handled more elegantly

2019-04-09 Thread Sriharsha Chintalapani (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani reassigned KAFKA-8203:
-

Assignee: Satish Duggana

> plaintext connections to SSL secured broker can be handled more elegantly
> -
>
> Key: KAFKA-8203
> URL: https://issues.apache.org/jira/browse/KAFKA-8203
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.1.1
>Reporter: Jorg Heymans
>Assignee: Satish Duggana
>Priority: Major
>
> Mailing list thread: 
> [https://lists.apache.org/thread.html/39935157351c0ad590e6cf02027816d664f1fd3724a25c1133a3bba6@%3Cusers.kafka.apache.org%3E]
> -reproduced here
> We have our brokers secured with these standard properties
>  
> {code:java}
> listeners=SSL://a.b.c:9030 
> ssl.truststore.location=... 
> ssl.truststore.password=... 
> ssl.keystore.location=... 
> ssl.keystore.password=... 
> ssl.key.password=... 
> ssl.client.auth=required 
> ssl.enabled.protocols=TLSv1.2 {code}
> It's a bit surprising to see that when a (java) client attempts to connect 
> without having SSL configured, so doing a PLAINTEXT connection instead, it 
> does not get a TLS exception indicating that SSL is required. Somehow i would 
> have expected a hard transport-level exception making it clear that non-SSL 
> connections are not allowed, instead the client sees this (when debug logging 
> is enabled)
> {code:java}
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 
> 21234bee31165527 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer 
> - [Consumer clientId=consumer-1, groupId=my-test-group] Kafka consumer 
> initialized [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - 
> [Consumer clientId=consumer-1, groupId=my-test-group] Subscribed to topic(s): 
> events [main] DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=consumer-1, groupId=my-test-group] Sending FindCoordinator request 
> to broker a.b.c:9030 (id: -1 rack: null) [main] DEBUG 
> org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
> groupId=my-test-group] Initiating connection to node a.b.c:9030 (id: -1 rack: 
> null) using address /a.b.c [main] DEBUG 
> org.apache.kafka.common.metrics.Metrics - Added sensor with name 
> node--1.bytes-sent [main] DEBUG org.apache.kafka.common.metrics.Metrics - 
> Added sensor with name node--1.bytes-received [main] DEBUG 
> org.apache.kafka.common.metrics.Metrics - Added sensor with name 
> node--1.latency [main] DEBUG org.apache.kafka.common.network.Selector - 
> [Consumer clientId=consumer-1, groupId=my-test-group] Created socket with 
> SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 [main] DEBUG 
> org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
> groupId=my-test-group] Completed connection to node -1. Fetching API 
> versions. [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer 
> clientId=consumer-1, groupId=my-test-group] Initiating API versions fetch 
> from node -1. [main] DEBUG org.apache.kafka.common.network.Selector - 
> [Consumer clientId=consumer-1, groupId=my-test-group] Connection with /a.b.c 
> disconnected java.io.EOFException at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
>  at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) 
> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) 
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) 
> at org.apache.kafka.common.network.Selector.poll(Selector.java:467) at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) 
> at eu.europa.ec.han.TestConsumer.main(TestConsumer.java:22) [main] DEBUG 
> org.apache.kafka.clients.NetworkClient - 

[jira] [Commented] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes

2019-04-09 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813573#comment-16813573
 ] 

Bill Bejeck commented on KAFKA-8201:


Hi Anders Aagaard,

I'm not sure this is a bug for Kafka Streams since this is known behavior (as 
you referenced in the above Jira) and is controlled by the broker.

However, there is a workaround you can do.

Kafka Streams users have control over the settings for repartition (internal) 
topic.  When setting up your application you can adjust any of the settings 
you've listed above by using the StreamsConfig.topicPrefix method along with 
the relevant topic configuration value. 

For example:

 
{noformat}
final Properties props = new Properties();

props.put(StreamsConfig.topicPrefix(TopicConfig.CLEANUP_POLICY_CONFIG),
TopicConfig.CLEANUP_POLICY_COMPACT);

props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_MS_CONFIG), "XX");

props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), 
"XXX");{noformat}
Just note that any settings set this way will apply to all Kafka Streams 
internal topics.

 

 

HTH,

Bill

> Kafka streams repartitioning topic settings crashing multiple nodes
> ---
>
> Key: KAFKA-8201
> URL: https://issues.apache.org/jira/browse/KAFKA-8201
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Anders Aagaard
>Priority: Major
>
> We had an incident in a setup using kafka streams version 2.0.0 and kafka 
> version 2.0.0 protocol version 2.0-IV1. The reason for it is a combination of 
> kafka streams defaults and a bug in kafka.
> Info about the setup: Streams application reading a log compacted input 
> topic, and performing a groupby operation requiring repartitioning.
> Kafka streams automatically creates a repartitioning topic with 24 partitions 
> and the following options:
> segment.bytes=52428800, retention.ms=9223372036854775807, 
> segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60.
>  
> This should mean we roll out a new segment when the active one reaches 50mb 
> or is older than 10 mniutes. However, the different timestamps coming into 
> the topic due to log compaction (sometimes varying in multiple days) means 
> the server will see a message which is older than segments.ms and 
> automatically trigger a new segment roll out. This causes a segment 
> explosion. Where new segments are continuously rolled out.
> There seems to be a bug report for this server side here : 
> https://issues.apache.org/jira/browse/KAFKA-4336.
> This effectively took down several nodes and a broker in our cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-09 Thread Dmitry Minkovsky (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813550#comment-16813550
 ] 

Dmitry Minkovsky commented on KAFKA-5998:
-

[~guozhang] I am using kafka_2.12-2.1.0. The behavior seems identical on my 
local dev environment macOS 10.14/APFS and whatever Linux GKE runs, with Kafka 
inside a container using data on ext4. 

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  

[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-04-09 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813425#comment-16813425
 ] 

Bill Bejeck commented on KAFKA-7965:


Saw this failure again 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20786/]
{noformat}
Error Message
org.scalatest.junit.JUnitTestFailedError: Should have received an class 
org.apache.kafka.common.errors.GroupMaxSizeReachedException during the cluster 
roll
Stacktrace
org.scalatest.junit.JUnitTestFailedError: Should have received an class 
org.apache.kafka.common.errors.GroupMaxSizeReachedException during the cluster 
roll
at 
org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:100)
at 
org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:71)
at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
at org.scalatest.junit.JUnitSuite.fail(JUnitSuite.scala:71)
at 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:344)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 

[jira] [Updated] (KAFKA-8194) MessagesInPerSec incorrect value when when transactional messaging are enabled

2019-04-09 Thread Odyldzhon Toshbekov (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Odyldzhon Toshbekov updated KAFKA-8194:
---
Summary: MessagesInPerSec incorrect value when when transactional messaging 
are enabled  (was: MessagesInPerSec incorrect value when Stream produce 
messages)

> MessagesInPerSec incorrect value when when transactional messaging are enabled
> --
>
> Key: KAFKA-8194
> URL: https://issues.apache.org/jira/browse/KAFKA-8194
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 1.1.0, 2.2.0
>Reporter: Odyldzhon Toshbekov
>Priority: Trivial
> Attachments: Screen Shot 2019-04-05 at 17.51.03.png, Screen Shot 
> 2019-04-05 at 17.52.22.png
>
>
> Looks like metric
> {code:java}
> kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec{code}
> has incorrect value when messages come via Kafka Stream API.
> I noticed that offset for every message from Kafka Stream can be increased by 
> 1,2,... However if messages come to Broker from Kafka producer it's always 
> incremented by 1.
> Unfortunately the metric mentioned above calculated based on offset changes 
> and as result we cannot use streams because metric will be always incorrect.
> For Kafka 2.2.0
> !Screen Shot 2019-04-05 at 17.51.03.png|width=100%!
>  
> [https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/server/ReplicaManager.scala]
> And this is the method used to get "numAppendedMessages"
>  !Screen Shot 2019-04-05 at 17.52.22.png|width=100%!
> https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/log/Log.scala



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8203) plaintext connections to SSL secured broker can be handled more elegantly

2019-04-09 Thread Jorg Heymans (JIRA)
Jorg Heymans created KAFKA-8203:
---

 Summary: plaintext connections to SSL secured broker can be 
handled more elegantly
 Key: KAFKA-8203
 URL: https://issues.apache.org/jira/browse/KAFKA-8203
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.1.1
Reporter: Jorg Heymans


Mailing list thread: 
[https://lists.apache.org/thread.html/39935157351c0ad590e6cf02027816d664f1fd3724a25c1133a3bba6@%3Cusers.kafka.apache.org%3E]

-reproduced here

We have our brokers secured with these standard properties

 
{code:java}
listeners=SSL://a.b.c:9030 
ssl.truststore.location=... 
ssl.truststore.password=... 
ssl.keystore.location=... 
ssl.keystore.password=... 
ssl.key.password=... 
ssl.client.auth=required 
ssl.enabled.protocols=TLSv1.2 {code}
It's a bit surprising to see that when a (java) client attempts to connect 
without having SSL configured, so doing a PLAINTEXT connection instead, it does 
not get a TLS exception indicating that SSL is required. Somehow i would have 
expected a hard transport-level exception making it clear that non-SSL 
connections are not allowed, instead the client sees this (when debug logging 
is enabled)


{code:java}
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 
21234bee31165527 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - 
[Consumer clientId=consumer-1, groupId=my-test-group] Kafka consumer 
initialized [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - 
[Consumer clientId=consumer-1, groupId=my-test-group] Subscribed to topic(s): 
events [main] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
clientId=consumer-1, groupId=my-test-group] Sending FindCoordinator request to 
broker a.b.c:9030 (id: -1 rack: null) [main] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=my-test-group] Initiating connection to node a.b.c:9030 (id: -1 rack: 
null) using address /a.b.c [main] DEBUG org.apache.kafka.common.metrics.Metrics 
- Added sensor with name node--1.bytes-sent [main] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node--1.bytes-received [main] DEBUG org.apache.kafka.common.metrics.Metrics - 
Added sensor with name node--1.latency [main] DEBUG 
org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-1, 
groupId=my-test-group] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 
131072, SO_TIMEOUT = 0 to node -1 [main] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=my-test-group] Completed connection to node -1. Fetching API versions. 
[main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer 
clientId=consumer-1, groupId=my-test-group] Initiating API versions fetch from 
node -1. [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer 
clientId=consumer-1, groupId=my-test-group] Connection with /a.b.c disconnected 
java.io.EOFException at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
 at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) 
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) at 
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) 
at org.apache.kafka.common.network.Selector.poll(Selector.java:467) at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) 
at eu.europa.ec.han.TestConsumer.main(TestConsumer.java:22) [main] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=my-test-group] Node -1 disconnected. [main] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient - [Consumer 
clientId=consumer-1, groupId=my-test-group] Cancelled request with header 
RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-1, 
correlationId=0) due to node -1 being disconnected [main] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 

[jira] [Commented] (KAFKA-6675) Connect workers should log plugin path and available plugins more clearly

2019-04-09 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813204#comment-16813204
 ] 

Valeria Vasylieva commented on KAFKA-6675:
--

[~rhauch] thank you for clarification, I got it and would proceed with the task.

> Connect workers should log plugin path and available plugins more clearly
> -
>
> Key: KAFKA-6675
> URL: https://issues.apache.org/jira/browse/KAFKA-6675
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.1
>Reporter: Randall Hauch
>Assignee: Valeria Vasylieva
>Priority: Minor
>
> Users struggle with setting the plugin path and properly installing plugins. 
> If users get any of this wrong, they get strange errors only after they run 
> the worker and attempt to deploy connectors or use transformations. 
> The Connect worker should more obviously output the plugin path directories 
> and the available plugins. For example, if the {{plugin.path}} were:
> {code}
> plugin.path=/usr/local/share/java,/usr/local/plugins
> {code}
> then the worker might output something like the following information in the 
> log:
> {noformat}
> Looking for plugins on classpath and inside plugin.path directories:
>   /usr/local/share/java
>   /usr/local/plugins
>  
> Source Connector(s):
>   FileStreamSource  (org.apache.kafka.connect.file.FileStreamSourceConnector) 
>   @ classpath
>   FileStreamSink(org.apache.kafka.connect.file.FileStreamSinkConnector)   
>   @ classpath
>   JdbcSource(io.confluent.connect.jdbc.JdbcSourceConnector)   
>   @ /usr/local/share/java/kafka-connect-jdbc
>   MySql (io.debezium.connector.mysql.MySqlConnector)  
>   @ /usr/local/plugins/debezium-connector-mysql
> Converter(s):
>   JsonConverter (org.apache.kafka.connect.json.JsonConverter) 
>   @ classpath
>   ByteArrayConverter
> (org.apache.kafka.connect.converters.ByteArrayConverter)@ classpath
>   SimpleHeaderConverter 
> (org.apache.kafka.connect.converters.SimpleHeaderConverter) @ classpath
>   AvroConverter (io.confluent.connect.avro.AvroConverter) 
>   @ /usr/local/share/java/kafka-serde-tools
> Transformation(s):
>   InsertField   (org.apache.kafka.connect.transforms.InsertField) 
>   @ classpath
>   ReplaceField  (org.apache.kafka.connect.transforms.ReplaceField)
>   @ classpath
>   MaskField (org.apache.kafka.connect.transforms.MaskField)   
>   @ classpath
>   ValueToKey(org.apache.kafka.connect.transforms.ValueToKey)  
>   @ classpath
>   HoistField(org.apache.kafka.connect.transforms.HoistField)  
>   @ classpath
>   ExtractField  (org.apache.kafka.connect.transforms.ExtractField)
>   @ classpath
>   SetSchemaMetadata (org.apache.kafka.connect.transforms.SetSchemaMetadata)   
>   @ classpath
>   RegexRouter   (org.apache.kafka.connect.transforms.RegexRouter) 
>   @ classpath
>   TimestampRouter   (org.apache.kafka.connect.transforms.TimestampRouter) 
>   @ classpath
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6755) MaskField SMT should optionally take a literal value to use instead of using null

2019-04-09 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813197#comment-16813197
 ] 

Valeria Vasylieva commented on KAFKA-6755:
--

[~rhauch] discussion did not gain lot of interest from the community, should I 
proceed to vote, so that we finally decide if this change should be merged or 
discarded?

> MaskField SMT should optionally take a literal value to use instead of using 
> null
> -
>
> Key: KAFKA-6755
> URL: https://issues.apache.org/jira/browse/KAFKA-6755
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Valeria Vasylieva
>Priority: Major
>  Labels: needs-kip, newbie
>   Original Estimate: 8h
>  Remaining Estimate: 8h
>
> The existing {{org.apache.kafka.connect.transforms.MaskField}} SMT always 
> uses the null value for the type of field. It'd be nice to *optionally* be 
> able to specify a literal value for the type, where the SMT would convert the 
> literal string value in the configuration to the desired type (using the new 
> {{Values}} methods).
> Use cases: mask out the IP address, or SSN, or other personally identifiable 
> information (PII).
> Since this changes the API, and thus will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8202) StackOverflowError on producer when splitting batches

2019-04-09 Thread Daniel Krawczyk (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Krawczyk updated KAFKA-8202:
---
Description: 
Hello,

recently we came across a StackOverflowError error in the Kafka producer java 
library. The error caused the Kafka producer to stop (we had to restart our 
service due to: IllegalStateException: Cannot perform operation after producer 
has been closed).

The stack trace was as follows:
{code:java}
java.lang.StackOverflowError: null
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
// […]
{code}
The piece of code responsible for the error:
{code:java}
/**
 * This method is used when we have to split a large batch in smaller ones. A 
chained metadata will allow the
 * future that has already returned to the users to wait on the newly created 
split batches even after the
 * old big batch has been deemed as done.
 */
void chain(FutureRecordMetadata futureRecordMetadata) {
if (nextRecordMetadata == null)
nextRecordMetadata = futureRecordMetadata;
else
nextRecordMetadata.chain(futureRecordMetadata);
}
{code}
Before the error occurred we observed large amount of logs related to record 
batches being split (caused by MESSAGE_TOO_LARGE error) on one of our topics 
(logged by org.apache.kafka.clients.producer.internals.Sender):
{code:java}
[Producer clientId=producer-1] Got error produce response in correlation id 
158621342 on topic-partition , splitting and retrying (2147483647 
attempts left). Error: MESSAGE_TOO_LARGE
{code}
All logs had different correlation ids, but the same counters of attempts left 
(2147483647), so it looked like they were related to different requests and all 
of them were succeeding with no further retries.

We are using kafka-clients java library in version 2.0.0, the brokers are 2.1.1.

Thanks in advance.

  was:
Hello,

recently we came across a StackOverflowError error in the Kafka producer java 
library. The error caused the Kafka producer to stop.

The stack trace was as follows:
{code:java}
java.lang.StackOverflowError: null
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
// […]
{code}
The piece of code responsible for the error:
{code:java}
/**
 * This method is used when we have to split a large batch in smaller ones. A 
chained metadata will allow the
 * future that 

[jira] [Created] (KAFKA-8202) StackOverflowError on producer when splitting batches

2019-04-09 Thread Daniel Krawczyk (JIRA)
Daniel Krawczyk created KAFKA-8202:
--

 Summary: StackOverflowError on producer when splitting batches
 Key: KAFKA-8202
 URL: https://issues.apache.org/jira/browse/KAFKA-8202
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Daniel Krawczyk


Hello,

recently we came across a StackOverflowError error in the Kafka producer java 
library. The error caused the Kafka producer to stop.

The stack trace was as follows:
{code:java}
java.lang.StackOverflowError: null
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89)
// […]
{code}
The piece of code responsible for the error:
{code:java}
/**
 * This method is used when we have to split a large batch in smaller ones. A 
chained metadata will allow the
 * future that has already returned to the users to wait on the newly created 
split batches even after the
 * old big batch has been deemed as done.
 */
void chain(FutureRecordMetadata futureRecordMetadata) {
if (nextRecordMetadata == null)
nextRecordMetadata = futureRecordMetadata;
else
nextRecordMetadata.chain(futureRecordMetadata);
}
{code}
Before the error occurred we observed large amount of logs related to record 
batches being split (caused by MESSAGE_TOO_LARGE error) on one of our topics 
(logged by org.apache.kafka.clients.producer.internals.Sender):
{code:java}
[Producer clientId=producer-1] Got error produce response in correlation id 
158621342 on topic-partition , splitting and retrying (2147483647 
attempts left). Error: MESSAGE_TOO_LARGE
{code}
All logs had different correlation ids, but the same counters of attempts left 
(2147483647), so it looked like they were related to different requests and all 
of them were succeeding with no further retries.

We are using kafka-clients java library in version 2.0.0, the brokers are 2.1.1.

Thanks in advance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2019-04-09 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813140#comment-16813140
 ] 

Jonathan Santilli edited comment on KAFKA-7656 at 4/9/19 8:53 AM:
--

Hello [~jagsancio], this is the only log I see (on the leader):

 
{code:java}
 [2019-04-08 22:35:27,145] ERROR [ReplicaManager broker=1] Error processing 
fetch with max size -2147483648 from consumer on partition 
__consumer_offsets-14: (fetchOffset=0, logStartOffset=-1, maxBytes=-2147483648, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) 
java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/opt/kafka/logdata/__consumer_offsets-14/.log, start=0, 
end=2147483647) at kafka.log.LogSegment.read(LogSegment.scala:274) at 
kafka.log.Log.$anonfun$read$2(Log.scala:1245) at 
kafka.log.Log.maybeHandleIOException(Log.scala:2013) at 
kafka.log.Log.read(Log.scala:1200) at 
kafka.cluster.Partition.$anonfun$readRecords$1(Partition.scala:805) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at 
kafka.cluster.Partition.readRecords(Partition.scala:781) at 
kafka.server.ReplicaManager.read$1(ReplicaManager.scala:926) at 
kafka.server.ReplicaManager.$anonfun$readFromLocalLog$4(ReplicaManager.scala:991)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:990) at 
kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:840) at 
kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:845) at 
kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:723) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:109) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
java.lang.Thread.run(Thread.java:748){code}
 

Is the only partition with the problem so far.


was (Author: pachilo):
Hello [~jagsancio], this is the only log I see (on the leader):

 
{noformat}
[2019-04-08 22:35:27,145] ERROR [ReplicaManager broker=1] Error processing 
fetch with max size -2147483648 from consumer on partition 
__consumer_offsets-14: (fetchOffset=0, logStartOffset=-1, maxBytes=-2147483648, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) 
java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/opt/kafka/logdata/__consumer_offsets-14/.log, start=0, 
end=2147483647) at kafka.log.LogSegment.read(LogSegment.scala:274) at 
kafka.log.Log.$anonfun$read$2(Log.scala:1245) at 
kafka.log.Log.maybeHandleIOException(Log.scala:2013) at 
kafka.log.Log.read(Log.scala:1200) at 
kafka.cluster.Partition.$anonfun$readRecords$1(Partition.scala:805) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at 
kafka.cluster.Partition.readRecords(Partition.scala:781) at 
kafka.server.ReplicaManager.read$1(ReplicaManager.scala:926) at 
kafka.server.ReplicaManager.$anonfun$readFromLocalLog$4(ReplicaManager.scala:991)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:990) at 
kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:840) at 
kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:845) at 
kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:723) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:109) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
java.lang.Thread.run(Thread.java:748)
{noformat}
 

Is the only partition with the problem so far.

> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1
> Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 
> EDT 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Patrick Haas
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> (Note: From 2.0.1-cp1 from confluent distribution)
> {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error 
> processing fetch operation on partition __consumer_offsets-20, offset 0 
> (kafka.server.ReplicaManager)}}
> {{java.lang.IllegalArgumentException: Invalid max 

[jira] [Commented] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2019-04-09 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813140#comment-16813140
 ] 

Jonathan Santilli commented on KAFKA-7656:
--

Hello [~jagsancio], this is the only log I see (on the leader):

 
{noformat}
[2019-04-08 22:35:27,145] ERROR [ReplicaManager broker=1] Error processing 
fetch with max size -2147483648 from consumer on partition 
__consumer_offsets-14: (fetchOffset=0, logStartOffset=-1, maxBytes=-2147483648, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) 
java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/opt/kafka/logdata/__consumer_offsets-14/.log, start=0, 
end=2147483647) at kafka.log.LogSegment.read(LogSegment.scala:274) at 
kafka.log.Log.$anonfun$read$2(Log.scala:1245) at 
kafka.log.Log.maybeHandleIOException(Log.scala:2013) at 
kafka.log.Log.read(Log.scala:1200) at 
kafka.cluster.Partition.$anonfun$readRecords$1(Partition.scala:805) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at 
kafka.cluster.Partition.readRecords(Partition.scala:781) at 
kafka.server.ReplicaManager.read$1(ReplicaManager.scala:926) at 
kafka.server.ReplicaManager.$anonfun$readFromLocalLog$4(ReplicaManager.scala:991)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:990) at 
kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:840) at 
kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:845) at 
kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:723) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:109) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
java.lang.Thread.run(Thread.java:748)
{noformat}
 

Is the only partition with the problem so far.

> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1
> Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 
> EDT 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Patrick Haas
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> (Note: From 2.0.1-cp1 from confluent distribution)
> {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error 
> processing fetch operation on partition __consumer_offsets-20, offset 0 
> (kafka.server.ReplicaManager)}}
> {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log 
> read from segment FileRecords(file= 
> /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
> start=0, end=2147483647)}}
> {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
> {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
> {{ at kafka.log.Log.read(Log.scala:1114)}}
> {{ at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
> {{ at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
> {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
> {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
> {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
> {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
> {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
> {{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
> {{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes

2019-04-09 Thread Anders Aagaard (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anders Aagaard updated KAFKA-8201:
--
Description: 
We had an incident in a setup using kafka streams version 2.0.0 and kafka 
version 2.0.0 protocol version 2.0-IV1. The reason for it is a combination of 
kafka streams defaults and a bug in kafka.

Info about the setup: Streams application reading a log compacted input topic, 
and performing a groupby operation requiring repartitioning.

Kafka streams automatically creates a repartitioning topic with 24 partitions 
and the following options:

segment.bytes=52428800, retention.ms=9223372036854775807, 
segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60.

 

This should mean we roll out a new segment when the active one reaches 50mb or 
is older than 10 mniutes. However, the different timestamps coming into the 
topic due to log compaction (sometimes varying in multiple days) means the 
server will see a message which is older than segments.ms and automatically 
trigger a new segment roll out. This causes a segment explosion. Where new 
segments are continuously rolled out.

There seems to be a bug report for this server side here : 
https://issues.apache.org/jira/browse/KAFKA-4336.

This effectively took down several nodes and a broker in our cluster.

  was:
We had an incident in a setup using kafka streams version 2.0.0 and kafka 
version 2.0.0. The reason for it is a combination of kafka streams defaults and 
a bug in kafka.

Info about the setup: Streams application reading a log compacted input topic, 
and performing a groupby operation requiring repartitioning.

Kafka streams automatically creates a repartitioning topic with 24 partitions 
and the following options:

segment.bytes=52428800, retention.ms=9223372036854775807, 
segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60.

 

This should mean we roll out a new segment when the active one reaches 50mb or 
is older than 10 mniutes. However, the different timestamps coming into the 
topic due to log compaction (sometimes varying in multiple days) means the 
server will see a message which is older than segments.ms and automatically 
trigger a new segment roll out. This causes a segment explosion. Where new 
segments are continuously rolled out.

There seems to be a bug report for this server side here : 
https://issues.apache.org/jira/browse/KAFKA-4336.

This effectively took down several nodes and a broker in our cluster.


> Kafka streams repartitioning topic settings crashing multiple nodes
> ---
>
> Key: KAFKA-8201
> URL: https://issues.apache.org/jira/browse/KAFKA-8201
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Anders Aagaard
>Priority: Major
>
> We had an incident in a setup using kafka streams version 2.0.0 and kafka 
> version 2.0.0 protocol version 2.0-IV1. The reason for it is a combination of 
> kafka streams defaults and a bug in kafka.
> Info about the setup: Streams application reading a log compacted input 
> topic, and performing a groupby operation requiring repartitioning.
> Kafka streams automatically creates a repartitioning topic with 24 partitions 
> and the following options:
> segment.bytes=52428800, retention.ms=9223372036854775807, 
> segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60.
>  
> This should mean we roll out a new segment when the active one reaches 50mb 
> or is older than 10 mniutes. However, the different timestamps coming into 
> the topic due to log compaction (sometimes varying in multiple days) means 
> the server will see a message which is older than segments.ms and 
> automatically trigger a new segment roll out. This causes a segment 
> explosion. Where new segments are continuously rolled out.
> There seems to be a bug report for this server side here : 
> https://issues.apache.org/jira/browse/KAFKA-4336.
> This effectively took down several nodes and a broker in our cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes

2019-04-09 Thread Anders Aagaard (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813069#comment-16813069
 ] 

Anders Aagaard commented on KAFKA-8201:
---

Another question here : Why use infinite retention and cleanup.policy delete 
instead of log compaction for this case?

> Kafka streams repartitioning topic settings crashing multiple nodes
> ---
>
> Key: KAFKA-8201
> URL: https://issues.apache.org/jira/browse/KAFKA-8201
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Anders Aagaard
>Priority: Major
>
> We had an incident in a setup using kafka streams version 2.0.0 and kafka 
> version 2.0.0. The reason for it is a combination of kafka streams defaults 
> and a bug in kafka.
> Info about the setup: Streams application reading a log compacted input 
> topic, and performing a groupby operation requiring repartitioning.
> Kafka streams automatically creates a repartitioning topic with 24 partitions 
> and the following options:
> segment.bytes=52428800, retention.ms=9223372036854775807, 
> segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60.
>  
> This should mean we roll out a new segment when the active one reaches 50mb 
> or is older than 10 mniutes. However, the different timestamps coming into 
> the topic due to log compaction (sometimes varying in multiple days) means 
> the server will see a message which is older than segments.ms and 
> automatically trigger a new segment roll out. This causes a segment 
> explosion. Where new segments are continuously rolled out.
> There seems to be a bug report for this server side here : 
> https://issues.apache.org/jira/browse/KAFKA-4336.
> This effectively took down several nodes and a broker in our cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes

2019-04-09 Thread Anders Aagaard (JIRA)
Anders Aagaard created KAFKA-8201:
-

 Summary: Kafka streams repartitioning topic settings crashing 
multiple nodes
 Key: KAFKA-8201
 URL: https://issues.apache.org/jira/browse/KAFKA-8201
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Anders Aagaard


We had an incident in a setup using kafka streams version 2.0.0 and kafka 
version 2.0.0. The reason for it is a combination of kafka streams defaults and 
a bug in kafka.

Info about the setup: Streams application reading a log compacted input topic, 
and performing a groupby operation requiring repartitioning.

Kafka streams automatically creates a repartitioning topic with 24 partitions 
and the following options:

segment.bytes=52428800, retention.ms=9223372036854775807, 
segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60.

 

This should mean we roll out a new segment when the active one reaches 50mb or 
is older than 10 mniutes. However, the different timestamps coming into the 
topic due to log compaction (sometimes varying in multiple days) means the 
server will see a message which is older than segments.ms and automatically 
trigger a new segment roll out. This causes a segment explosion. Where new 
segments are continuously rolled out.

There seems to be a bug report for this server side here : 
https://issues.apache.org/jira/browse/KAFKA-4336.

This effectively took down several nodes and a broker in our cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes

2019-04-09 Thread Anders Aagaard (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anders Aagaard updated KAFKA-8201:
--
Affects Version/s: 2.0.0

> Kafka streams repartitioning topic settings crashing multiple nodes
> ---
>
> Key: KAFKA-8201
> URL: https://issues.apache.org/jira/browse/KAFKA-8201
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Anders Aagaard
>Priority: Major
>
> We had an incident in a setup using kafka streams version 2.0.0 and kafka 
> version 2.0.0. The reason for it is a combination of kafka streams defaults 
> and a bug in kafka.
> Info about the setup: Streams application reading a log compacted input 
> topic, and performing a groupby operation requiring repartitioning.
> Kafka streams automatically creates a repartitioning topic with 24 partitions 
> and the following options:
> segment.bytes=52428800, retention.ms=9223372036854775807, 
> segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60.
>  
> This should mean we roll out a new segment when the active one reaches 50mb 
> or is older than 10 mniutes. However, the different timestamps coming into 
> the topic due to log compaction (sometimes varying in multiple days) means 
> the server will see a message which is older than segments.ms and 
> automatically trigger a new segment roll out. This causes a segment 
> explosion. Where new segments are continuously rolled out.
> There seems to be a bug report for this server side here : 
> https://issues.apache.org/jira/browse/KAFKA-4336.
> This effectively took down several nodes and a broker in our cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)