[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2019-01-10 Thread Ashish Surana (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16740052#comment-16740052
 ] 

Ashish Surana commented on KAFKA-7149:
--

Is this change on track for 2.2.0 release. I believe this is very important 
improvement for anybody using kafka-streams at decent scale.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Navinder Brar
>Priority: Major
> Fix For: 2.2.0
>
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3980) JmxReporter uses excessive memory causing OutOfMemoryException

2019-01-10 Thread flying (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16740039#comment-16740039
 ] 

flying commented on KAFKA-3980:
---

Version:2.1.0, has the same issue.

!issue.jpg!

> JmxReporter uses excessive memory causing OutOfMemoryException
> --
>
> Key: KAFKA-3980
> URL: https://issues.apache.org/jira/browse/KAFKA-3980
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.9.0.1
>Reporter: Andrew Jorgensen
>Priority: Major
> Attachments: heap_img.png, issue.jpg
>
>
> I have some nodes in a kafka cluster that occasionally will run out of memory 
> whenever I restart the producers. I was able to take a heap dump from both a 
> recently restarted Kafka node which weighed in at about 20 MB and a node that 
> has been running for 2 months is using over 700MB of memory. Looking at the 
> heap dump it looks like the JmxReporter is holding on to metrics and causing 
> them to build up over time. 
> !http://imgur.com/N6Cd0Ku.png!
> !http://imgur.com/kQBqA2j.png!
> The ultimate problem this causes is that there is a chance when I restart the 
> producers it will cause the node to experience an Java heap space exception 
> and OOM. The nodes  then fail to startup correctly and write a -1 as the 
> leader number to the partitions they were responsible for effectively 
> resetting the offset and rendering that partition unavailable. The kafka 
> process then needs to go be restarted in order to re-assign the node to the 
> partition that it owns.
> I have a few questions:
> 1. I am not quite sure why there are so many client id entries in that 
> JmxReporter map.
> 2. Is there a way to have the JmxReporter release metrics after a set amount 
> of time or a way to turn certain high cardinality metrics like these off?
> I can provide any logs or heap dumps if more information is needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3980) JmxReporter uses excessive memory causing OutOfMemoryException

2019-01-10 Thread flying (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

flying updated KAFKA-3980:
--
Attachment: issue.jpg

> JmxReporter uses excessive memory causing OutOfMemoryException
> --
>
> Key: KAFKA-3980
> URL: https://issues.apache.org/jira/browse/KAFKA-3980
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.9.0.1
>Reporter: Andrew Jorgensen
>Priority: Major
> Attachments: heap_img.png, issue.jpg
>
>
> I have some nodes in a kafka cluster that occasionally will run out of memory 
> whenever I restart the producers. I was able to take a heap dump from both a 
> recently restarted Kafka node which weighed in at about 20 MB and a node that 
> has been running for 2 months is using over 700MB of memory. Looking at the 
> heap dump it looks like the JmxReporter is holding on to metrics and causing 
> them to build up over time. 
> !http://imgur.com/N6Cd0Ku.png!
> !http://imgur.com/kQBqA2j.png!
> The ultimate problem this causes is that there is a chance when I restart the 
> producers it will cause the node to experience an Java heap space exception 
> and OOM. The nodes  then fail to startup correctly and write a -1 as the 
> leader number to the partitions they were responsible for effectively 
> resetting the offset and rendering that partition unavailable. The kafka 
> process then needs to go be restarted in order to re-assign the node to the 
> partition that it owns.
> I have a few questions:
> 1. I am not quite sure why there are so many client id entries in that 
> JmxReporter map.
> 2. Is there a way to have the JmxReporter release metrics after a set amount 
> of time or a way to turn certain high cardinality metrics like these off?
> I can provide any logs or heap dumps if more information is needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2019-01-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739909#comment-16739909
 ] 

ASF GitHub Bot commented on KAFKA-7040:
---

apovzner commented on pull request #6122: KAFKA-7040: Ignore 
OffsetsForLeaderEpoch response if leader epoch changed while request in flight
URL: https://github.com/apache/kafka/pull/6122
 
 
   This is a "backport" of https://github.com/apache/kafka/pull/6101 + relevant 
parts of https://github.com/apache/kafka/pull/5661 for 2.0 branch.
   
   cc @hachikuji 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: Lucas Wang
>Assignee: Anna Povzner
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7741) Bad dependency via SBT

2019-01-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reopened KAFKA-7741:
-

> Bad dependency via SBT
> --
>
> Key: KAFKA-7741
> URL: https://issues.apache.org/jira/browse/KAFKA-7741
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1, 2.1.0
> Environment: Windows 10 professional, IntelliJ IDEA 2017.1
>Reporter: sacha barber
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> I am using the Kafka-Streams-Scala 2.1.0 JAR.
> And if I create a new Scala project using SBT with these dependencies 
> {code}
> name := "ScalaKafkaStreamsDemo"
> version := "1.0"
> scalaVersion := "2.12.1"
> libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0"
> libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
> //TEST
> libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
> libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % 
> "2.0.0" % Test
> {code}
> I get this error
>  
> {code}
> SBT 'ScalaKafkaStreamsDemo' project refresh failed
> Error:Error while importing SBT project:...[info] Resolving 
> jline#jline;2.14.1 ...
> [warn] [FAILED ] 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}: (0ms)
> [warn]  local: tried
> [warn] 
> C:\Users\sacha\.ivy2\local\javax.ws.rs\javax.ws.rs-api\2.1.1\${packaging.type}s\javax.ws.rs-api.${packaging.type}
> [warn]  public: tried
> [warn] 
> https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.${packaging.type}
> [info] downloading 
> https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-test-utils/2.1.0/kafka-streams-test-utils-2.1.0.jar
>  ...
> [info] [SUCCESSFUL ] 
> org.apache.kafka#kafka-streams-test-utils;2.1.0!kafka-streams-test-utils.jar 
> (344ms)
> [warn] ::
> [warn] :: FAILED DOWNLOADS ::
> [warn] :: ^ see resolution messages for details ^ ::
> [warn] ::
> [warn] :: javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [warn] ::
> [trace] Stack trace suppressed: run 'last *:ssExtractDependencies' for the 
> full output.
> [trace] Stack trace suppressed: run 'last *:update' for the full output.
> [error] (*:ssExtractDependencies) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] (*:update) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] Total time: 8 s, completed 16-Dec-2018 19:27:21
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; 
> support was removed in 8.0See complete log in  href="file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log">file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log
> {code}
> This seems to be a common issue with bad dependency from Kafka to 
> javax.ws.rs-api.
> if I drop the Kafka version down to 2.0.0 and add this line to my SBT file 
> this error goes away
> {code}
> libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" 
> artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))`
> {code}
>  
> However I would like to work with 2.1.0 version.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7741) Bad dependency via SBT

2019-01-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739897#comment-16739897
 ] 

John Roesler commented on KAFKA-7741:
-

The fix is merged, so I'll update the fix versions. I still need to update the 
documentation with a workaround for the versions that are already released.

> Bad dependency via SBT
> --
>
> Key: KAFKA-7741
> URL: https://issues.apache.org/jira/browse/KAFKA-7741
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1, 2.1.0
> Environment: Windows 10 professional, IntelliJ IDEA 2017.1
>Reporter: sacha barber
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> I am using the Kafka-Streams-Scala 2.1.0 JAR.
> And if I create a new Scala project using SBT with these dependencies 
> {code}
> name := "ScalaKafkaStreamsDemo"
> version := "1.0"
> scalaVersion := "2.12.1"
> libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0"
> libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
> //TEST
> libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
> libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % 
> "2.0.0" % Test
> {code}
> I get this error
>  
> {code}
> SBT 'ScalaKafkaStreamsDemo' project refresh failed
> Error:Error while importing SBT project:...[info] Resolving 
> jline#jline;2.14.1 ...
> [warn] [FAILED ] 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}: (0ms)
> [warn]  local: tried
> [warn] 
> C:\Users\sacha\.ivy2\local\javax.ws.rs\javax.ws.rs-api\2.1.1\${packaging.type}s\javax.ws.rs-api.${packaging.type}
> [warn]  public: tried
> [warn] 
> https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.${packaging.type}
> [info] downloading 
> https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-test-utils/2.1.0/kafka-streams-test-utils-2.1.0.jar
>  ...
> [info] [SUCCESSFUL ] 
> org.apache.kafka#kafka-streams-test-utils;2.1.0!kafka-streams-test-utils.jar 
> (344ms)
> [warn] ::
> [warn] :: FAILED DOWNLOADS ::
> [warn] :: ^ see resolution messages for details ^ ::
> [warn] ::
> [warn] :: javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [warn] ::
> [trace] Stack trace suppressed: run 'last *:ssExtractDependencies' for the 
> full output.
> [trace] Stack trace suppressed: run 'last *:update' for the full output.
> [error] (*:ssExtractDependencies) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] (*:update) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] Total time: 8 s, completed 16-Dec-2018 19:27:21
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; 
> support was removed in 8.0See complete log in  href="file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log">file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log
> {code}
> This seems to be a common issue with bad dependency from Kafka to 
> javax.ws.rs-api.
> if I drop the Kafka version down to 2.0.0 and add this line to my SBT file 
> this error goes away
> {code}
> libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" 
> artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))`
> {code}
>  
> However I would like to work with 2.1.0 version.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7741) Bad dependency via SBT

2019-01-10 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7741.
--
   Resolution: Fixed
Fix Version/s: 2.0.2
   2.1.1
   2.2.0

> Bad dependency via SBT
> --
>
> Key: KAFKA-7741
> URL: https://issues.apache.org/jira/browse/KAFKA-7741
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1, 2.1.0
> Environment: Windows 10 professional, IntelliJ IDEA 2017.1
>Reporter: sacha barber
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> I am using the Kafka-Streams-Scala 2.1.0 JAR.
> And if I create a new Scala project using SBT with these dependencies 
> {code}
> name := "ScalaKafkaStreamsDemo"
> version := "1.0"
> scalaVersion := "2.12.1"
> libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0"
> libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
> //TEST
> libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
> libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % 
> "2.0.0" % Test
> {code}
> I get this error
>  
> {code}
> SBT 'ScalaKafkaStreamsDemo' project refresh failed
> Error:Error while importing SBT project:...[info] Resolving 
> jline#jline;2.14.1 ...
> [warn] [FAILED ] 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}: (0ms)
> [warn]  local: tried
> [warn] 
> C:\Users\sacha\.ivy2\local\javax.ws.rs\javax.ws.rs-api\2.1.1\${packaging.type}s\javax.ws.rs-api.${packaging.type}
> [warn]  public: tried
> [warn] 
> https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.${packaging.type}
> [info] downloading 
> https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-test-utils/2.1.0/kafka-streams-test-utils-2.1.0.jar
>  ...
> [info] [SUCCESSFUL ] 
> org.apache.kafka#kafka-streams-test-utils;2.1.0!kafka-streams-test-utils.jar 
> (344ms)
> [warn] ::
> [warn] :: FAILED DOWNLOADS ::
> [warn] :: ^ see resolution messages for details ^ ::
> [warn] ::
> [warn] :: javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [warn] ::
> [trace] Stack trace suppressed: run 'last *:ssExtractDependencies' for the 
> full output.
> [trace] Stack trace suppressed: run 'last *:update' for the full output.
> [error] (*:ssExtractDependencies) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] (*:update) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] Total time: 8 s, completed 16-Dec-2018 19:27:21
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; 
> support was removed in 8.0See complete log in  href="file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log">file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log
> {code}
> This seems to be a common issue with bad dependency from Kafka to 
> javax.ws.rs-api.
> if I drop the Kafka version down to 2.0.0 and add this line to my SBT file 
> this error goes away
> {code}
> libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" 
> artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))`
> {code}
>  
> However I would like to work with 2.1.0 version.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7749) confluent does not provide option to set consumer properties at connector level

2019-01-10 Thread Paul Davidson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739888#comment-16739888
 ] 

Paul Davidson commented on KAFKA-7749:
--

Hi [~sliebau]. You can probably ignore the comment about task level settings 
now. It was actually motivated by the problem described here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Add+option+to+make+Kafka+Connect+task+client+ID+values+unique
 and here https://issues.apache.org/jira/browse/KAFKA-5061 . We seem to be 
close to resolving that particular issue by simply including the task id in the 
default client ID. I can't think of any other specific cases where task-level 
settings would be particularly useful.

> confluent does not provide option to set consumer properties at connector 
> level
> ---
>
> Key: KAFKA-7749
> URL: https://issues.apache.org/jira/browse/KAFKA-7749
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Manjeet Duhan
>Priority: Major
>
> _We want to increase consumer.max.poll.record to increase performance but 
> this  value can only be set in worker properties which is applicable to all 
> connectors given cluster._
>  __ 
> _Operative Situation :- We have one project which is communicating with 
> Elasticsearch and we set consumer.max.poll.record=500 after multiple 
> performance tests which worked fine for an year._
>  _Then one more project onboarded in the same cluster which required 
> consumer.max.poll.record=5000 based on their performance tests. This 
> configuration is moved to production._
>   _Admetric started failing as it was taking more than 5 minutes to process 
> 5000 polled records and started throwing commitfailed exception which is 
> vicious cycle as it will process same data over and over again._
>  __ 
> _We can control above if start consumer using plain java but this control was 
> not available at each consumer level in confluent connector._
> _I have overridden kafka code to accept connector properties which will be 
> applied to single connector and others will keep on using default properties 
> . These changes are already running in production for more than 5 months._
> _Some of the properties which were useful for us._
> max.poll.records
> max.poll.interval.ms
> request.timeout.ms
> key.deserializer
> value.deserializer
> heartbeat.interval.ms
> session.timeout.ms
> auto.offset.reset
> connections.max.idle.ms
> enable.auto.commit
>  
> auto.commit.interval.ms
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7741) Bad dependency via SBT

2019-01-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739876#comment-16739876
 ] 

ASF GitHub Bot commented on KAFKA-7741:
---

guozhangwang commented on pull request #6121: KAFKA-7741: Streams exclude javax 
dependency
URL: https://github.com/apache/kafka/pull/6121
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bad dependency via SBT
> --
>
> Key: KAFKA-7741
> URL: https://issues.apache.org/jira/browse/KAFKA-7741
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1, 2.1.0
> Environment: Windows 10 professional, IntelliJ IDEA 2017.1
>Reporter: sacha barber
>Assignee: John Roesler
>Priority: Major
>
> I am using the Kafka-Streams-Scala 2.1.0 JAR.
> And if I create a new Scala project using SBT with these dependencies 
> {code}
> name := "ScalaKafkaStreamsDemo"
> version := "1.0"
> scalaVersion := "2.12.1"
> libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0"
> libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
> //TEST
> libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
> libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % 
> "2.0.0" % Test
> {code}
> I get this error
>  
> {code}
> SBT 'ScalaKafkaStreamsDemo' project refresh failed
> Error:Error while importing SBT project:...[info] Resolving 
> jline#jline;2.14.1 ...
> [warn] [FAILED ] 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}: (0ms)
> [warn]  local: tried
> [warn] 
> C:\Users\sacha\.ivy2\local\javax.ws.rs\javax.ws.rs-api\2.1.1\${packaging.type}s\javax.ws.rs-api.${packaging.type}
> [warn]  public: tried
> [warn] 
> https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.${packaging.type}
> [info] downloading 
> https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-test-utils/2.1.0/kafka-streams-test-utils-2.1.0.jar
>  ...
> [info] [SUCCESSFUL ] 
> org.apache.kafka#kafka-streams-test-utils;2.1.0!kafka-streams-test-utils.jar 
> (344ms)
> [warn] ::
> [warn] :: FAILED DOWNLOADS ::
> [warn] :: ^ see resolution messages for details ^ ::
> [warn] ::
> [warn] :: javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [warn] ::
> [trace] Stack trace suppressed: run 'last *:ssExtractDependencies' for the 
> full output.
> [trace] Stack trace suppressed: run 'last *:update' for the full output.
> [error] (*:ssExtractDependencies) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] (*:update) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] Total time: 8 s, completed 16-Dec-2018 19:27:21
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; 
> support was removed in 8.0See complete log in  href="file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log">file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log
> {code}
> This seems to be a common issue with bad dependency from Kafka to 
> javax.ws.rs-api.
> if I drop the Kafka version down to 2.0.0 and add this line to my SBT file 
> this error goes away
> {code}
> libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" 
> artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))`
> {code}
>  
> However I would like to work with 2.1.0 version.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4453) add request prioritization

2019-01-10 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739822#comment-16739822
 ] 

Mayuresh Gharat commented on KAFKA-4453:


Hi [~sriharsha], 
Yes, the PR is already up. I got +1 from [~jjkoshy] yesterday. We are waiting 
to see if [~junrao] has any other concerns with the patch.


> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4453) add request prioritization

2019-01-10 Thread Sriharsha Chintalapani (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739798#comment-16739798
 ] 

Sriharsha Chintalapani commented on KAFKA-4453:
---

Hi [~mgharat] checking to see if you are working on this JIRA. We are 
interested in this feature as well.

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7741) Bad dependency via SBT

2019-01-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739745#comment-16739745
 ] 

ASF GitHub Bot commented on KAFKA-7741:
---

vvcephei commented on pull request #6121: KAFKA-7741: Streams exclude javax 
dependency
URL: https://github.com/apache/kafka/pull/6121
 
 
   As documented in https://issues.apache.org/jira/browse/KAFKA-7741,
   the javax dependency we receive transitively from connect is incompatible
   with SBT builds.
   
   Streams doesn't use the portion of Connect that needs the dependency,
   so we can fix the builds by simply excluding it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bad dependency via SBT
> --
>
> Key: KAFKA-7741
> URL: https://issues.apache.org/jira/browse/KAFKA-7741
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1, 2.1.0
> Environment: Windows 10 professional, IntelliJ IDEA 2017.1
>Reporter: sacha barber
>Assignee: John Roesler
>Priority: Major
>
> I am using the Kafka-Streams-Scala 2.1.0 JAR.
> And if I create a new Scala project using SBT with these dependencies 
> {code}
> name := "ScalaKafkaStreamsDemo"
> version := "1.0"
> scalaVersion := "2.12.1"
> libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0"
> libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
> //TEST
> libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
> libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % 
> "2.0.0" % Test
> {code}
> I get this error
>  
> {code}
> SBT 'ScalaKafkaStreamsDemo' project refresh failed
> Error:Error while importing SBT project:...[info] Resolving 
> jline#jline;2.14.1 ...
> [warn] [FAILED ] 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}: (0ms)
> [warn]  local: tried
> [warn] 
> C:\Users\sacha\.ivy2\local\javax.ws.rs\javax.ws.rs-api\2.1.1\${packaging.type}s\javax.ws.rs-api.${packaging.type}
> [warn]  public: tried
> [warn] 
> https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.${packaging.type}
> [info] downloading 
> https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-test-utils/2.1.0/kafka-streams-test-utils-2.1.0.jar
>  ...
> [info] [SUCCESSFUL ] 
> org.apache.kafka#kafka-streams-test-utils;2.1.0!kafka-streams-test-utils.jar 
> (344ms)
> [warn] ::
> [warn] :: FAILED DOWNLOADS ::
> [warn] :: ^ see resolution messages for details ^ ::
> [warn] ::
> [warn] :: javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [warn] ::
> [trace] Stack trace suppressed: run 'last *:ssExtractDependencies' for the 
> full output.
> [trace] Stack trace suppressed: run 'last *:update' for the full output.
> [error] (*:ssExtractDependencies) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] (*:update) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] Total time: 8 s, completed 16-Dec-2018 19:27:21
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; 
> support was removed in 8.0See complete log in  href="file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log">file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log
> {code}
> This seems to be a common issue with bad dependency from Kafka to 
> javax.ws.rs-api.
> if I drop the Kafka version down to 2.0.0 and add this line to my SBT file 
> this error goes away
> {code}
> libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" 
> artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))`
> {code}
>  
> However I would like to work with 2.1.0 version.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7806) Windowed Aggregations should wrap default key serde if none is specified

2019-01-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739701#comment-16739701
 ] 

John Roesler edited comment on KAFKA-7806 at 1/10/19 7:16 PM:
--

Thanks [~bchen225242], I hadn't looked at KIP-393 yet, but I just read it 
through.

But, if I understand the proposal correctly, it should be orthogonal to this 
issue.

The issue I've reported here isn't specific to the changelog topic, but rather 
just propagating the correct key serde down the topology.

They're clearly related to the same part of Streams, but I didn't follow how 
they would overlap. Can you elaborate?

Thanks,

-John


was (Author: vvcephei):
Thanks [~bchen225242], I hadn't looked at KIP-393 yet, but I just read it 
through. I'll leave my comments on the mailing list.

But, if I understand the proposal correctly, it should be orthogonal to this 
issue.

The issue I've reported here isn't specific to the changelog topic, but rather 
just propagating the correct key serde down the topology.

They're clearly related to the same part of Streams, but I didn't follow how 
they would overlap. Can you elaborate?

Thanks,

-John

> Windowed Aggregations should wrap default key serde if none is specified
> 
>
> Key: KAFKA-7806
> URL: https://issues.apache.org/jira/browse/KAFKA-7806
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> In Streams, windowing a stream by either time or session windows causes the 
> stream's keys to be transformed from `K` to `Windowed`.
> Since this is a well defined transition, it's not necessary for developers to 
> explicitly provide a `Serde>`. For convenience, Streams, which 
> already knows the key serde (`Serde`) automatically wraps it in case it's 
> needed by downstream operators.
> However, this automatic wrapping only takes place if the key serde has been 
> explicitly provided in the topology. If the topology relies on the 
> `default.key.serde` configuration, no wrapping takes place, and downstream 
> operators will encounter a ClassCastException trying to cast a `Windowed` 
> (the windowed key) to whatever type the default serde handles (which is the 
> key wrapped inside the windowed key).
> Specifically, they key serde forwarding logic is:
> in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`:
> `materializedInternal.keySerde() != null ? new 
> FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
> null`
> and in 
> `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`:
> `materializedInternal.keySerde() != null ? new 
> WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null`
>  
> This pattern of not "solidifying" the default key serde is common in Streams. 
> Not all operators need a serde, and the default serde may not be applicable 
> to all operators. So, it would be a mistake to arbitrary operators to grab 
> the default serde and pass it downstream as if it had been explicitly set.
>  
> However, in this case specifically, all windowed aggregations are stateful, 
> so if we don't have an explicit key serde at this point, we know that we have 
> used the default serde in the window store. If the default serde were 
> incorrect, an exception would be thrown by the windowed aggregation itself. 
> So it actually is safe to wrap the default serde in a windowed serde and pass 
> it downstream, which would result in a better development experience.
>  
> Unfortunately, the default serde is set via config, but the windowed serde 
> wrapping happens during DSL building, when the config is not generally 
> available. Therefore, we would need a special windowed serde wrapper that 
> signals that it wraps the default serde, which would be fully resolved during 
> operators' init call.
> For example, something of this nature:
> `materializedInternal.keySerde() != null ? new 
> FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
> FullTimeWindowedSerde.wrapDefault(windows.size())`
> etc.
>  
> Complicating the situation slightly, all the windowed serializers and 
> deserializers will resolve a runtime inner class using 
> `default.windowed.key.serde.inner` if given a null inner serde to wrap. 
> However, at this point in the topology build, we do know that the windowed 
> aggregation has specifically used the `default.key.serde`, not the 
> `default.windowed.key.serde.inner` to persist its state to the window store, 
> therefore, it should be correct to wrap the default key serde specifically 
> and not use the `default.windowed.key.serde.inner`.
>  
> In addition to fixing this for TimeWindowed and SessionWindowed streams, we 
> need to have good test 

[jira] [Comment Edited] (KAFKA-7806) Windowed Aggregations should wrap default key serde if none is specified

2019-01-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739701#comment-16739701
 ] 

John Roesler edited comment on KAFKA-7806 at 1/10/19 7:16 PM:
--

Thanks [~bchen225242], I hadn't looked at KIP-393 yet, but I just read it 
through.

If I understand the proposal correctly, it should be orthogonal to this issue.

The issue I've reported here isn't specific to the changelog topic, but rather 
just propagating the correct key serde down the topology.

They're clearly related to the same part of Streams, but I didn't follow how 
they would overlap. Can you elaborate?

Thanks,

-John


was (Author: vvcephei):
Thanks [~bchen225242], I hadn't looked at KIP-393 yet, but I just read it 
through.

But, if I understand the proposal correctly, it should be orthogonal to this 
issue.

The issue I've reported here isn't specific to the changelog topic, but rather 
just propagating the correct key serde down the topology.

They're clearly related to the same part of Streams, but I didn't follow how 
they would overlap. Can you elaborate?

Thanks,

-John

> Windowed Aggregations should wrap default key serde if none is specified
> 
>
> Key: KAFKA-7806
> URL: https://issues.apache.org/jira/browse/KAFKA-7806
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> In Streams, windowing a stream by either time or session windows causes the 
> stream's keys to be transformed from `K` to `Windowed`.
> Since this is a well defined transition, it's not necessary for developers to 
> explicitly provide a `Serde>`. For convenience, Streams, which 
> already knows the key serde (`Serde`) automatically wraps it in case it's 
> needed by downstream operators.
> However, this automatic wrapping only takes place if the key serde has been 
> explicitly provided in the topology. If the topology relies on the 
> `default.key.serde` configuration, no wrapping takes place, and downstream 
> operators will encounter a ClassCastException trying to cast a `Windowed` 
> (the windowed key) to whatever type the default serde handles (which is the 
> key wrapped inside the windowed key).
> Specifically, they key serde forwarding logic is:
> in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`:
> `materializedInternal.keySerde() != null ? new 
> FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
> null`
> and in 
> `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`:
> `materializedInternal.keySerde() != null ? new 
> WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null`
>  
> This pattern of not "solidifying" the default key serde is common in Streams. 
> Not all operators need a serde, and the default serde may not be applicable 
> to all operators. So, it would be a mistake to arbitrary operators to grab 
> the default serde and pass it downstream as if it had been explicitly set.
>  
> However, in this case specifically, all windowed aggregations are stateful, 
> so if we don't have an explicit key serde at this point, we know that we have 
> used the default serde in the window store. If the default serde were 
> incorrect, an exception would be thrown by the windowed aggregation itself. 
> So it actually is safe to wrap the default serde in a windowed serde and pass 
> it downstream, which would result in a better development experience.
>  
> Unfortunately, the default serde is set via config, but the windowed serde 
> wrapping happens during DSL building, when the config is not generally 
> available. Therefore, we would need a special windowed serde wrapper that 
> signals that it wraps the default serde, which would be fully resolved during 
> operators' init call.
> For example, something of this nature:
> `materializedInternal.keySerde() != null ? new 
> FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
> FullTimeWindowedSerde.wrapDefault(windows.size())`
> etc.
>  
> Complicating the situation slightly, all the windowed serializers and 
> deserializers will resolve a runtime inner class using 
> `default.windowed.key.serde.inner` if given a null inner serde to wrap. 
> However, at this point in the topology build, we do know that the windowed 
> aggregation has specifically used the `default.key.serde`, not the 
> `default.windowed.key.serde.inner` to persist its state to the window store, 
> therefore, it should be correct to wrap the default key serde specifically 
> and not use the `default.windowed.key.serde.inner`.
>  
> In addition to fixing this for TimeWindowed and SessionWindowed streams, we 
> need to have good test coverage of the new code. There is clearly a blind 
> 

[jira] [Commented] (KAFKA-7806) Windowed Aggregations should wrap default key serde if none is specified

2019-01-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739701#comment-16739701
 ] 

John Roesler commented on KAFKA-7806:
-

Thanks [~bchen225242], I hadn't looked at KIP-393 yet, but I just read it 
through. I'll leave my comments on the mailing list.

But, if I understand the proposal correctly, it should be orthogonal to this 
issue.

The issue I've reported here isn't specific to the changelog topic, but rather 
just propagating the correct key serde down the topology.

They're clearly related to the same part of Streams, but I didn't follow how 
they would overlap. Can you elaborate?

Thanks,

-John

> Windowed Aggregations should wrap default key serde if none is specified
> 
>
> Key: KAFKA-7806
> URL: https://issues.apache.org/jira/browse/KAFKA-7806
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> In Streams, windowing a stream by either time or session windows causes the 
> stream's keys to be transformed from `K` to `Windowed`.
> Since this is a well defined transition, it's not necessary for developers to 
> explicitly provide a `Serde>`. For convenience, Streams, which 
> already knows the key serde (`Serde`) automatically wraps it in case it's 
> needed by downstream operators.
> However, this automatic wrapping only takes place if the key serde has been 
> explicitly provided in the topology. If the topology relies on the 
> `default.key.serde` configuration, no wrapping takes place, and downstream 
> operators will encounter a ClassCastException trying to cast a `Windowed` 
> (the windowed key) to whatever type the default serde handles (which is the 
> key wrapped inside the windowed key).
> Specifically, they key serde forwarding logic is:
> in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`:
> `materializedInternal.keySerde() != null ? new 
> FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
> null`
> and in 
> `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`:
> `materializedInternal.keySerde() != null ? new 
> WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null`
>  
> This pattern of not "solidifying" the default key serde is common in Streams. 
> Not all operators need a serde, and the default serde may not be applicable 
> to all operators. So, it would be a mistake to arbitrary operators to grab 
> the default serde and pass it downstream as if it had been explicitly set.
>  
> However, in this case specifically, all windowed aggregations are stateful, 
> so if we don't have an explicit key serde at this point, we know that we have 
> used the default serde in the window store. If the default serde were 
> incorrect, an exception would be thrown by the windowed aggregation itself. 
> So it actually is safe to wrap the default serde in a windowed serde and pass 
> it downstream, which would result in a better development experience.
>  
> Unfortunately, the default serde is set via config, but the windowed serde 
> wrapping happens during DSL building, when the config is not generally 
> available. Therefore, we would need a special windowed serde wrapper that 
> signals that it wraps the default serde, which would be fully resolved during 
> operators' init call.
> For example, something of this nature:
> `materializedInternal.keySerde() != null ? new 
> FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
> FullTimeWindowedSerde.wrapDefault(windows.size())`
> etc.
>  
> Complicating the situation slightly, all the windowed serializers and 
> deserializers will resolve a runtime inner class using 
> `default.windowed.key.serde.inner` if given a null inner serde to wrap. 
> However, at this point in the topology build, we do know that the windowed 
> aggregation has specifically used the `default.key.serde`, not the 
> `default.windowed.key.serde.inner` to persist its state to the window store, 
> therefore, it should be correct to wrap the default key serde specifically 
> and not use the `default.windowed.key.serde.inner`.
>  
> In addition to fixing this for TimeWindowed and SessionWindowed streams, we 
> need to have good test coverage of the new code. There is clearly a blind 
> spot in the tests, or we would have noticed this sooner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7790) Fix Bugs in Trogdor Task Expiration

2019-01-10 Thread Colin P. McCabe (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-7790:
---
Description: If an Agent process is restarted, it will be re-sent the 
worker specifications for any tasks that are not DONE.  The agent will run 
these tasks for the original time period.  It should be fixed to run them only 
for the remaining task time.  There is also a bug where the coordinator can 
sometimes re-create a worker even when the task is DONE.  (was: If an Agent 
process is restarted, it will be re-sent the worker specifications for any 
tasks that are not DONE.  The agent will run these tasks for the original time 
period.  It should be fixed to run them only for the remaining task time.)

> Fix Bugs in Trogdor Task Expiration
> ---
>
> Key: KAFKA-7790
> URL: https://issues.apache.org/jira/browse/KAFKA-7790
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> If an Agent process is restarted, it will be re-sent the worker 
> specifications for any tasks that are not DONE.  The agent will run these 
> tasks for the original time period.  It should be fixed to run them only for 
> the remaining task time.  There is also a bug where the coordinator can 
> sometimes re-create a worker even when the task is DONE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7790) Fix Bugs in Trogdor Task Expiration

2019-01-10 Thread Colin P. McCabe (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-7790:
---
Description: If an Agent process is restarted, it will be re-sent the 
worker specifications for any tasks that are not DONE.  The agent will run 
these tasks for the original time period.  It should be fixed to run them only 
for the remaining task time.  (was: If an Agent process is restarted, it will 
be re-sent the worker specifications for any tasks that are not DONE.  The 
agent will run these tasks for the original time period.  It should be fixed to 
run them only for the remaining time.)

> Fix Bugs in Trogdor Task Expiration
> ---
>
> Key: KAFKA-7790
> URL: https://issues.apache.org/jira/browse/KAFKA-7790
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> If an Agent process is restarted, it will be re-sent the worker 
> specifications for any tasks that are not DONE.  The agent will run these 
> tasks for the original time period.  It should be fixed to run them only for 
> the remaining task time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7790) Fix Bugs in Trogdor Task Expiration

2019-01-10 Thread Colin P. McCabe (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-7790:
---
Description: If an Agent process is restarted, it will be re-sent the 
worker specifications for any tasks that are not DONE.  The agent will run 
these tasks for the original time period.  It should be fixed to run them only 
for the remaining time.  (was: All Trogdor task specifications have a defined 
`startMs` and `durationMs`. Under conditions of task failure and restarts, it 
is intuitive to assume that a task would not be re-ran after a certain time 
period.

Let's best illustrate the issue with an example:
{code:java}
startMs = 12PM; durationMs = 1hour;
# 12:02 - Coordinator schedules a task to run on agent-0
# 12:45 - agent-0 process dies. Coordinator's heartbeats to agent-0 fail.
# 12:47 - agent-0 comes back up. Coordinator's heartbeats pass and it 
re-schedules tasks that are not running in agent-0
# 13:20 - agent-0 process dies.
# 13:22 - agent-0 comes back up. Coordinator re-schedules task{code}
This can result in an endless loop of task rescheduling. If there are more 
tasks scheduled on agent-0 (e.g a task scheduled to start each on hour), we can 
end up in a scenario where we overwhelm the agent with tasks that we would 
rather have dropped.
h2. Changes

We propose that the Trogdor Coordinator does not re-schedule a task if the 
current time of re-scheduling is greater than the start time of the task and 
its duration combined. More specifically:
{code:java}
if (currentTimeMs > startTimeMs + durationTimeMs)
  scheduleTask()
else
  failTask(){code}
 

 

 )

> Fix Bugs in Trogdor Task Expiration
> ---
>
> Key: KAFKA-7790
> URL: https://issues.apache.org/jira/browse/KAFKA-7790
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> If an Agent process is restarted, it will be re-sent the worker 
> specifications for any tasks that are not DONE.  The agent will run these 
> tasks for the original time period.  It should be fixed to run them only for 
> the remaining time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7790) Fix Bugs in Trogdor Task Expiration

2019-01-10 Thread Colin P. McCabe (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-7790:
---
Summary: Fix Bugs in Trogdor Task Expiration  (was: Trogdor - Does not time 
out tasks in time)

> Fix Bugs in Trogdor Task Expiration
> ---
>
> Key: KAFKA-7790
> URL: https://issues.apache.org/jira/browse/KAFKA-7790
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> All Trogdor task specifications have a defined `startMs` and `durationMs`. 
> Under conditions of task failure and restarts, it is intuitive to assume that 
> a task would not be re-ran after a certain time period.
> Let's best illustrate the issue with an example:
> {code:java}
> startMs = 12PM; durationMs = 1hour;
> # 12:02 - Coordinator schedules a task to run on agent-0
> # 12:45 - agent-0 process dies. Coordinator's heartbeats to agent-0 fail.
> # 12:47 - agent-0 comes back up. Coordinator's heartbeats pass and it 
> re-schedules tasks that are not running in agent-0
> # 13:20 - agent-0 process dies.
> # 13:22 - agent-0 comes back up. Coordinator re-schedules task{code}
> This can result in an endless loop of task rescheduling. If there are more 
> tasks scheduled on agent-0 (e.g a task scheduled to start each on hour), we 
> can end up in a scenario where we overwhelm the agent with tasks that we 
> would rather have dropped.
> h2. Changes
> We propose that the Trogdor Coordinator does not re-schedule a task if the 
> current time of re-scheduling is greater than the start time of the task and 
> its duration combined. More specifically:
> {code:java}
> if (currentTimeMs > startTimeMs + durationTimeMs)
>   scheduleTask()
> else
>   failTask(){code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7748) Add wall clock TimeDefinition for suppression of intermediate events

2019-01-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739601#comment-16739601
 ] 

John Roesler edited comment on KAFKA-7748 at 1/10/19 4:50 PM:
--

This has been a big stumbling block for users of Suppress. It would be good to 
get it implemented ASAP.

This feature requires a KIP, but it would be a very small one.


was (Author: vvcephei):
This has been a big stumbling block for users of Suppress. It would be good to 
get it implemented ASAP.

> Add wall clock TimeDefinition for suppression of intermediate events
> 
>
> Key: KAFKA-7748
> URL: https://issues.apache.org/jira/browse/KAFKA-7748
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jonathan Gordon
>Priority: Major
>  Labels: needs-kip
>
> Currently, Kafka Streams offers the ability to suppress intermediate events 
> based on either RecordTime or WindowEndTime, which are in turn defined by 
> stream time:
> {{Suppressed.untilTimeLimit(final Duration timeToWaitForMoreEvents, final 
> BufferConfig bufferConfig)}}
> It would be helpful to have another option that would allow suppression of 
> intermediate events based on wall clock time. This would allow us to only 
> produce a limited number of aggregates independent of their stream time 
> (which in our case is event time).
> For reference, here's the relevant KIP:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables#KIP-328:AbilitytosuppressupdatesforKTables-Best-effortratelimitperkey]
> And here's the relevant Confluent Slack thread:
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1544468349201700
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7748) Add wall clock TimeDefinition for suppression of intermediate events

2019-01-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7748:

Labels: needs-kip  (was: )

> Add wall clock TimeDefinition for suppression of intermediate events
> 
>
> Key: KAFKA-7748
> URL: https://issues.apache.org/jira/browse/KAFKA-7748
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jonathan Gordon
>Priority: Major
>  Labels: needs-kip
>
> Currently, Kafka Streams offers the ability to suppress intermediate events 
> based on either RecordTime or WindowEndTime, which are in turn defined by 
> stream time:
> {{Suppressed.untilTimeLimit(final Duration timeToWaitForMoreEvents, final 
> BufferConfig bufferConfig)}}
> It would be helpful to have another option that would allow suppression of 
> intermediate events based on wall clock time. This would allow us to only 
> produce a limited number of aggregates independent of their stream time 
> (which in our case is event time).
> For reference, here's the relevant KIP:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables#KIP-328:AbilitytosuppressupdatesforKTables-Best-effortratelimitperkey]
> And here's the relevant Confluent Slack thread:
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1544468349201700
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7748) Add wall clock TimeDefinition for suppression of intermediate events

2019-01-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739601#comment-16739601
 ] 

John Roesler commented on KAFKA-7748:
-

This has been a big stumbling block for users of Suppress. It would be good to 
get it implemented ASAP.

> Add wall clock TimeDefinition for suppression of intermediate events
> 
>
> Key: KAFKA-7748
> URL: https://issues.apache.org/jira/browse/KAFKA-7748
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jonathan Gordon
>Priority: Major
>
> Currently, Kafka Streams offers the ability to suppress intermediate events 
> based on either RecordTime or WindowEndTime, which are in turn defined by 
> stream time:
> {{Suppressed.untilTimeLimit(final Duration timeToWaitForMoreEvents, final 
> BufferConfig bufferConfig)}}
> It would be helpful to have another option that would allow suppression of 
> intermediate events based on wall clock time. This would allow us to only 
> produce a limited number of aggregates independent of their stream time 
> (which in our case is event time).
> For reference, here's the relevant KIP:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables#KIP-328:AbilitytosuppressupdatesforKTables-Best-effortratelimitperkey]
> And here's the relevant Confluent Slack thread:
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1544468349201700
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7802) Connection to Broker Disconnected Taking Down the Whole Cluster

2019-01-10 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739567#comment-16739567
 ] 

Christoffer Hammarström commented on KAFKA-7802:


Sounds like you might be affected by 
https://jira.apache.org/jira/browse/KAFKA-7697 ?

> Connection to Broker Disconnected Taking Down the Whole Cluster
> ---
>
> Key: KAFKA-7802
> URL: https://issues.apache.org/jira/browse/KAFKA-7802
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Candice Wan
>Priority: Critical
> Attachments: thread_dump.log
>
>
> We recently upgraded to 2.1.0. Since then, several times per day, we observe 
> some brokers were disconnected when other brokers were trying to fetch the 
> replicas. This issue took down the whole cluster, making all the producers 
> and consumers not able to publish or consume messages. It could be quickly 
> fixed by restarting the problematic broker.
> Here is an example of what we're seeing in the broker which was trying to 
> send fetch request to the problematic one:
> 2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] INFO 
> o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=0] Error sending fetch request (sessionId=937967566, epoch=1599941) 
> to node 3: java.io.IOException: Connection to 3 was disconnected before the 
> response was read.
>  2019-01-09 08:05:10.445 [ReplicaFetcherThread-1-3] INFO 
> o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=1] Error sending fetch request (sessionId=506217047, epoch=1375749) 
> to node 3: java.io.IOException: Connection to 3 was disconnected before the 
> response was read.
>  2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] WARN 
> kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-11=(offset=421032847, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[178])}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=937967566, 
> epoch=1599941))
>  java.io.IOException: Connection to 3 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:199)
>  at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  
>  
>  Below is the suspicious log of the problematic broker when the issue 
> happened:
> 2019-01-09 08:04:50.177 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Member 
> consumer-2-7d46fda9-afef-4705-b632-17f0255d5045 in group talon-instance1 has 
> failed, rem
>  oving it from the group
>  2019-01-09 08:04:50.177 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Preparing to 
> rebalance group talon-instance1 in state PreparingRebalance with old 
> generation 27
>  0 (__consumer_offsets-47) (reason: removing member 
> consumer-2-7d46fda9-afef-4705-b632-17f0255d5045 on heartbeat expiration)
>  2019-01-09 08:04:50.297 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Member 
> consumer-5-94b7eb6d-bc39-48ed-99b8-2e0f55edd60b in group 
> Notifications.ASIA1546980352799 has failed, removing it from the group
>  2019-01-09 08:04:50.297 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Preparing to 
> rebalance group Notifications.ASIA1546980352799 in state PreparingRebalance 
> with old generation 1 (__consumer_offsets-44) (reason: removing member 
> consumer-5-94b7eb6d-bc39-48ed-99b8-2e0f55edd60b on heartbeat expiration)
>  2019-01-09 08:04:50.297 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Group 
> Notifications.ASIA1546980352799 with generation 2 is now empty 
> (__consumer_offsets-44)
>  2019-01-09 08:04:50.388 

[jira] [Comment Edited] (KAFKA-7802) Connection to Broker Disconnected Taking Down the Whole Cluster

2019-01-10 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739567#comment-16739567
 ] 

Christoffer Hammarström edited comment on KAFKA-7802 at 1/10/19 4:15 PM:
-

Sounds like you might be affected by KAFKA-7697 ?


was (Author: kreiger):
Sounds like you might be affected by 
https://jira.apache.org/jira/browse/KAFKA-7697 ?

> Connection to Broker Disconnected Taking Down the Whole Cluster
> ---
>
> Key: KAFKA-7802
> URL: https://issues.apache.org/jira/browse/KAFKA-7802
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Candice Wan
>Priority: Critical
> Attachments: thread_dump.log
>
>
> We recently upgraded to 2.1.0. Since then, several times per day, we observe 
> some brokers were disconnected when other brokers were trying to fetch the 
> replicas. This issue took down the whole cluster, making all the producers 
> and consumers not able to publish or consume messages. It could be quickly 
> fixed by restarting the problematic broker.
> Here is an example of what we're seeing in the broker which was trying to 
> send fetch request to the problematic one:
> 2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] INFO 
> o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=0] Error sending fetch request (sessionId=937967566, epoch=1599941) 
> to node 3: java.io.IOException: Connection to 3 was disconnected before the 
> response was read.
>  2019-01-09 08:05:10.445 [ReplicaFetcherThread-1-3] INFO 
> o.a.k.clients.FetchSessionHandler - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=1] Error sending fetch request (sessionId=506217047, epoch=1375749) 
> to node 3: java.io.IOException: Connection to 3 was disconnected before the 
> response was read.
>  2019-01-09 08:05:10.445 [ReplicaFetcherThread-0-3] WARN 
> kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-11=(offset=421032847, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[178])}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=937967566, 
> epoch=1599941))
>  java.io.IOException: Connection to 3 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
>  at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:199)
>  at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  
>  
>  Below is the suspicious log of the problematic broker when the issue 
> happened:
> 2019-01-09 08:04:50.177 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Member 
> consumer-2-7d46fda9-afef-4705-b632-17f0255d5045 in group talon-instance1 has 
> failed, rem
>  oving it from the group
>  2019-01-09 08:04:50.177 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Preparing to 
> rebalance group talon-instance1 in state PreparingRebalance with old 
> generation 27
>  0 (__consumer_offsets-47) (reason: removing member 
> consumer-2-7d46fda9-afef-4705-b632-17f0255d5045 on heartbeat expiration)
>  2019-01-09 08:04:50.297 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Member 
> consumer-5-94b7eb6d-bc39-48ed-99b8-2e0f55edd60b in group 
> Notifications.ASIA1546980352799 has failed, removing it from the group
>  2019-01-09 08:04:50.297 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Preparing to 
> rebalance group Notifications.ASIA1546980352799 in state PreparingRebalance 
> with old generation 1 (__consumer_offsets-44) (reason: removing member 
> consumer-5-94b7eb6d-bc39-48ed-99b8-2e0f55edd60b on heartbeat expiration)
>  2019-01-09 08:04:50.297 [executor-Heartbeat] INFO 
> k.coordinator.group.GroupCoordinator - [GroupCoordinator 3]: Group 
> 

[jira] [Updated] (KAFKA-7811) Avoid unnecessary lock acquire when KafkaConsumer commit offsets

2019-01-10 Thread lambdaliu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lambdaliu updated KAFKA-7811:
-
Description: 
In KafkaConsumer#commitSync, we have the following logic:
{code:java}
public void commitAsync(OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
commitAsync(subscriptions.allConsumed(), callback);
} finally {
release();
}
}
{code}
This function calls another commitAsync which also call `acquireAndEnsureOpen`.

 

  was:
In KafkaConsumer#commitSync, we have the following logic:

 
{code:java}
public void commitAsync(OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
commitAsync(subscriptions.allConsumed(), callback);
} finally {
release();
}
}
{code}
This function calls another commitAsync which also call `acquireAndEnsureOpen`.

 


> Avoid unnecessary lock acquire when KafkaConsumer commit offsets
> 
>
> Key: KAFKA-7811
> URL: https://issues.apache.org/jira/browse/KAFKA-7811
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, 2.0.1, 2.1.0
>Reporter: lambdaliu
>Assignee: lambdaliu
>Priority: Major
>
> In KafkaConsumer#commitSync, we have the following logic:
> {code:java}
> public void commitAsync(OffsetCommitCallback callback) {
> acquireAndEnsureOpen();
> try {
> commitAsync(subscriptions.allConsumed(), callback);
> } finally {
> release();
> }
> }
> {code}
> This function calls another commitAsync which also call 
> `acquireAndEnsureOpen`.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5994) Improve transparency of broker user ACL misconfigurations

2019-01-10 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar reassigned KAFKA-5994:


Assignee: Manikumar

> Improve transparency of broker user ACL misconfigurations
> -
>
> Key: KAFKA-5994
> URL: https://issues.apache.org/jira/browse/KAFKA-5994
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.2.1
>Reporter: Dustin Cote
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.2.0
>
>
> When the user for inter broker communication is not a super user and ACLs are 
> configured with allow.everyone.if.no.acl.found=false, the cluster will not 
> serve data. This is extremely confusing to debug because there is no security 
> negotiation problem or indication of an error other than no data can make it 
> in or out of the broker. If one knew to look in the authorizer log, it would 
> be more clear, but that didn't make it into my workflow at least. Here's an 
> example of a problematic debugging scenario
> SASL_SSL, SSL, SASL_PLAINTEXT ports on the brokers
> SASL user specified in `super.users`
> SSL specified as the inter broker protocol
> The only way I could figure out ACLs were an issue without gleaning it 
> through configuration inspection was that controlled shutdown indicated that 
> a cluster action had failed. 
> It would be good if we could be more transparent about the failure here. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5994) Improve transparency of broker user ACL misconfigurations

2019-01-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739407#comment-16739407
 ] 

ASF GitHub Bot commented on KAFKA-5994:
---

omkreddy commented on pull request #5021: KAFKA-5994: Log 
ClusterAuthorizationException for all ClusterAction requests
URL: https://github.com/apache/kafka/pull/5021
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve transparency of broker user ACL misconfigurations
> -
>
> Key: KAFKA-5994
> URL: https://issues.apache.org/jira/browse/KAFKA-5994
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.2.1
>Reporter: Dustin Cote
>Priority: Major
> Fix For: 2.2.0
>
>
> When the user for inter broker communication is not a super user and ACLs are 
> configured with allow.everyone.if.no.acl.found=false, the cluster will not 
> serve data. This is extremely confusing to debug because there is no security 
> negotiation problem or indication of an error other than no data can make it 
> in or out of the broker. If one knew to look in the authorizer log, it would 
> be more clear, but that didn't make it into my workflow at least. Here's an 
> example of a problematic debugging scenario
> SASL_SSL, SSL, SASL_PLAINTEXT ports on the brokers
> SASL user specified in `super.users`
> SSL specified as the inter broker protocol
> The only way I could figure out ACLs were an issue without gleaning it 
> through configuration inspection was that controlled shutdown indicated that 
> a cluster action had failed. 
> It would be good if we could be more transparent about the failure here. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5994) Improve transparency of broker user ACL misconfigurations

2019-01-10 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5994.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

Issue resolved by pull request 5021
[https://github.com/apache/kafka/pull/5021]

> Improve transparency of broker user ACL misconfigurations
> -
>
> Key: KAFKA-5994
> URL: https://issues.apache.org/jira/browse/KAFKA-5994
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.2.1
>Reporter: Dustin Cote
>Priority: Major
> Fix For: 2.2.0
>
>
> When the user for inter broker communication is not a super user and ACLs are 
> configured with allow.everyone.if.no.acl.found=false, the cluster will not 
> serve data. This is extremely confusing to debug because there is no security 
> negotiation problem or indication of an error other than no data can make it 
> in or out of the broker. If one knew to look in the authorizer log, it would 
> be more clear, but that didn't make it into my workflow at least. Here's an 
> example of a problematic debugging scenario
> SASL_SSL, SSL, SASL_PLAINTEXT ports on the brokers
> SASL user specified in `super.users`
> SSL specified as the inter broker protocol
> The only way I could figure out ACLs were an issue without gleaning it 
> through configuration inspection was that controlled shutdown indicated that 
> a cluster action had failed. 
> It would be good if we could be more transparent about the failure here. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7811) Avoid unnecessary lock acquire when KafkaConsumer commit offsets

2019-01-10 Thread lambdaliu (JIRA)
lambdaliu created KAFKA-7811:


 Summary: Avoid unnecessary lock acquire when KafkaConsumer commit 
offsets
 Key: KAFKA-7811
 URL: https://issues.apache.org/jira/browse/KAFKA-7811
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.1.0, 2.0.1, 1.1.1, 1.0.2, 0.11.0.3, 0.10.2.2
Reporter: lambdaliu
Assignee: lambdaliu


In KafkaConsumer#commitSync, we have the following logic:

 
{code:java}
public void commitAsync(OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
commitAsync(subscriptions.allConsumed(), callback);
} finally {
release();
}
}
{code}
This function calls another commitAsync which also call `acquireAndEnsureOpen`.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7804) Update the docs for KIP-377

2019-01-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739349#comment-16739349
 ] 

ASF GitHub Bot commented on KAFKA-7804:
---

viktorsomogyi commented on pull request #6118: KAFKA-7804: Update docs for 
topic-command related KIP-377
URL: https://github.com/apache/kafka/pull/6118
 
 
   This PR adds a upgrade notes and changes examples to use the 
bootstrap-server.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update the docs for KIP-377
> ---
>
> Key: KAFKA-7804
> URL: https://issues.apache.org/jira/browse/KAFKA-7804
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>Priority: Major
>
> KIP-377 introduced the {{--bootstrap-server}} option to the 
> {{kafka-topics.sh}} command. The documentation (examples and notable changes) 
> should be updated accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-01-10 Thread Daniele Ascione (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniele Ascione updated KAFKA-7794:
---
Description: 
For some input for the timestamps (different from -1 or -2) the GetOffset is 
not able to retrieve the offset.

For example, if _x_ is the timestamp in that "not working range", and you 
execute:
{code:java}
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_ADDRESS 
--topic $MY_TOPIC --time x
{code}
The output is:
{code:java}
MY_TOPIC:8:
MY_TOPIC:2:
MY_TOPIC:5:
MY_TOPIC:4:
MY_TOPIC:7:
MY_TOPIC:1:
MY_TOPIC:9:{code}
while after the last ":" an integer representing the offset is expected.

Steps to reproduce it:
 # Consume all the messages from the beginning and print the timestamp:
{code:java}
bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
$KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
messages{code}

 # Sort the messages by timestamp and get some of the oldest messages:
{code:java}
 awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}

 # Take (for example) the timestamp of the 10th oldest message, and see if 
GetOffsetShell is not able to print the offset:
{code:java}
timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_ADDRESS 
--topic $MY_TOPIC--time $timestamp
# The output should be something like:
# MY_TOPIC:1:
# MY_TOPIC:2:
(repeated for every partition){code}

 # Verify that the message with that timestamp is still in Kafka:
{code:java}
bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
$KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
"CreateTime:$timestamp" {code}

 

  was:
For some input for the timestamps (different from -1 or -2) the GetOffset is 
not able to retrieve the offset.

For example, if _x_ is the timestamp in that "not working range", and you 
execute:
{code:java}
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_ADDRESS 
--topic $MY_TOPIC--time x
{code}
The output is:
{code:java}
MY_TOPIC:8:
MY_TOPIC:2:
MY_TOPIC:5:
MY_TOPIC:4:
MY_TOPIC:7:
MY_TOPIC:1:
MY_TOPIC:9:{code}
while after the last ":" an integer representing the offset is expected.

Steps to reproduce it:
 # Consume all the messages from the beginning and print the timestamp:
{code:java}
bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
$KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
messages{code}
 # Sort the messages by timestamp and get some of the oldest messages:
{code:java}
 awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
 # Take (for example) the timestamp of the 10th oldest message, and see if 
GetOffsetShell is not able to print the offset:
{code:java}
timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_ADDRESS 
--topic $MY_TOPIC--time $timestamp
# The output should be something like:
# MY_TOPIC:1:
# MY_TOPIC:2:
(repeated for every partition){code}
 # Verify that the message with that timestamp is still in Kafka:
{code:java}
bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
$KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
"CreateTime:$timestamp" {code}

 


> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for 

[jira] [Commented] (KAFKA-7810) AdminClient not found flink consumer group

2019-01-10 Thread huxihx (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739280#comment-16739280
 ] 

huxihx commented on KAFKA-7810:
---

Seems you are using kafka.admin.AdminClient which is already marked as 
Deprecated. This class could not track offsets for stand-alone consumers. Try 
to use org.apache.kafka.clients.admin.AdminClient and its `listConsumerGroups` 
method instead.

> AdminClient not found flink consumer group
> --
>
> Key: KAFKA-7810
> URL: https://issues.apache.org/jira/browse/KAFKA-7810
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients, consumer
>Affects Versions: 2.1.0
>Reporter: dengjie
>Priority: Major
>
> Start the Flink (v1.7.1) application, then use the *listAllConsumerGroups()* 
> of the Kafka *AdminClient* class to not find the Flink consumer group name.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7810) AdminClient not found flink consumer group

2019-01-10 Thread dengjie (JIRA)
dengjie created KAFKA-7810:
--

 Summary: AdminClient not found flink consumer group
 Key: KAFKA-7810
 URL: https://issues.apache.org/jira/browse/KAFKA-7810
 Project: Kafka
  Issue Type: Bug
  Components: admin, clients, consumer
Affects Versions: 2.1.0
Reporter: dengjie


Start the Flink (v1.7.1) application, then use the *listAllConsumerGroups()* of 
the Kafka *AdminClient* class to not find the Flink consumer group name.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7809) Getting uncaught exception in kafka-producer-network-thread | producer-7686: java.lang.OutOfMemoryError: Java heap space

2019-01-10 Thread Suman Kalyan Ghosh (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739218#comment-16739218
 ] 

Suman Kalyan Ghosh commented on KAFKA-7809:
---

Can someone help on this ?

This is a blocker for us .

> Getting uncaught exception in kafka-producer-network-thread | producer-7686: 
> java.lang.OutOfMemoryError: Java heap space
> 
>
> Key: KAFKA-7809
> URL: https://issues.apache.org/jira/browse/KAFKA-7809
> Project: Kafka
>  Issue Type: Bug
>  Components: log
> Environment: Performance
>Reporter: Suman Kalyan Ghosh
>Priority: Major
>
> Getting uncaught exception in kafka-producer-network-thread | producer-7686: 
> java.lang.OutOfMemoryError: Java heap space 
> Env: Performance environment
>  
> Kafka Verison : 0.9.0.1
> Kafka Set up : it is a  3 nodes single cluster environment.
> Appserver : Weblogic 
> Logs : 
>  failed because of an error: java.lang.OutOfMemoryError: Java heap space
>  java.lang.OutOfMemoryError: Java heap space
>  >
>  Jan 09, 2019 5:09:09 AM org.apache.kafka.common.utils.KafkaThread$1 
> uncaughtException
>  SEVERE: Uncaught exception in kafka-producer-network-thread | producer-7686:
>  java.lang.OutOfMemoryError: Java heap space
>  at java.lang.AbstractStringBuilder.(AbstractStringBuilder.java:68)
>  at java.lang.StringBuilder.(StringBuilder.java:89)
>  at java.net.InetAddress.toString(InetAddress.java:725)
>  at 
> org.apache.kafka.common.network.KafkaChannel.socketDescription(KafkaChannel.java:117)
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:305)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>  at java.lang.Thread.run(Thread.java:748)
>   
>in kafka-producer-network-thread | producer-7686:
>  java.lang.OutOfMemoryError: Java heap space
>  at java.lang.AbstractStringBuilder.(AbstractStringBuilder.java:68)
>  at java.lang.StringBuilder.(StringBuilder.java:89)
>  at java.net.InetAddress.toString(InetAddress.java:725)
>  at 
> org.apache.kafka.common.network.KafkaChannel.socketDescription(KafkaChannel.java:117)
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:305)
>  Truncated. see log file for complete stacktrace
>  >



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists

2019-01-10 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739197#comment-16739197
 ] 

Patrik Kleindl commented on KAFKA-6767:
---

[~guozhang] Thanks, a restart of the whole application "fixed" it for the 
moment, but I'll keep an eye on it.

And I'm pretty sure we are/were affected by 
https://issues.apache.org/jira/browse/KAFKA-7672 too.

> OffsetCheckpoint write assumes parent directory exists
> --
>
> Key: KAFKA-6767
> URL: https://issues.apache.org/jira/browse/KAFKA-6767
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an 
> instance dies it is created from scratch, rather than reusing the existing 
> RocksDB.)
> We routinely see:
> {code:java}
> 2018-04-09T19:14:35.004Z WARN <> 
> [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset 
> checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
> java.io.FileNotFoundException: 
> /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> at java.io.FileOutputStream.(FileOutputStream.java:162)
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
> Inspecting the state store directory, I can indeed see that {{chat/0_11}} 
> does not exist (although many other partitions do).
>  
> Looking at the OffsetCheckpoint write method, it seems to try to open a new 
> checkpoint file without first ensuring that the parent directory exists.
>  
> {code:java}
>     public void write(final Map offsets) throws 
> IOException {
>     // if there is no offsets, skip writing the file to save disk IOs
>     if (offsets.isEmpty()) {
>     return;
>     }
>     synchronized (lock) {
>     // write to temp file and then swap with the existing file
>     final File temp = new File(file.getAbsolutePath() + ".tmp");{code}
>  
> Either the OffsetCheckpoint class should initialize the directories if 
> needed, or some precondition of it being called should ensure that is the 
> case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)