[jira] [Comment Edited] (KAFKA-6898) org.apache.kafka.common.errors.TimeoutException

2018-05-24 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490189#comment-16490189
 ] 

huxihx edited comment on KAFKA-6898 at 5/25/18 3:58 AM:


Could you check if the Sender thread which is prefixed with 
`kafka-producer-network-thread-` is still alive in the background? Besides, did 
you employ a same producer instance across multiple threads?


was (Author: huxi_2b):
Could you check if the Sender thread which should be prefixed with 
`kafka-producer-network-thread-` is still alive in the background? Besides, did 
you employ a same producer instance across multiple threads?

> org.apache.kafka.common.errors.TimeoutException
> ---
>
> Key: KAFKA-6898
> URL: https://issues.apache.org/jira/browse/KAFKA-6898
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
> Environment: Production
>Reporter: Rishi
>Priority: Major
>
> Getting error 
> {code:java}
> org.apache.kafka.common.errors.TimeoutException Failed to allocate memory 
> within the configured max blocking time 59927 ms.{code}
> while publishing events to Kafka. We are using Kafka Java client 0.10.2.0 
> with Kafka 0.10.1.0 broker.
> This issue does not happen always but after certain time of applications 
> running in service, it starts happening and applications never recover from 
> this state until the producer instance is restarted.
> The configuration of producer and on Kafka broker is default and hasn't been 
> changed. What should be the course of action for this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6898) org.apache.kafka.common.errors.TimeoutException

2018-05-24 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490189#comment-16490189
 ] 

huxihx commented on KAFKA-6898:
---

Could you check if the Sender thread which should be prefixed with 
`kafka-producer-network-thread-` is still alive in the background? Besides, did 
you employ a same producer instance across multiple threads?

> org.apache.kafka.common.errors.TimeoutException
> ---
>
> Key: KAFKA-6898
> URL: https://issues.apache.org/jira/browse/KAFKA-6898
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
> Environment: Production
>Reporter: Rishi
>Priority: Major
>
> Getting error 
> {code:java}
> org.apache.kafka.common.errors.TimeoutException Failed to allocate memory 
> within the configured max blocking time 59927 ms.{code}
> while publishing events to Kafka. We are using Kafka Java client 0.10.2.0 
> with Kafka 0.10.1.0 broker.
> This issue does not happen always but after certain time of applications 
> running in service, it starts happening and applications never recover from 
> this state until the producer instance is restarted.
> The configuration of producer and on Kafka broker is default and hasn't been 
> changed. What should be the course of action for this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6941) when passing port = 0 to worker, the advertisedPort still is 0 rather than a random port

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490114#comment-16490114
 ] 

ASF GitHub Bot commented on KAFKA-6941:
---

chia7712 opened a new pull request #5076: KAFKA-6941 when passing port = 0 to 
worker, the advertisedPort still …
URL: https://github.com/apache/kafka/pull/5076
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> when passing port = 0 to worker, the advertisedPort still is 0 rather than a 
> random port
> 
>
> Key: KAFKA-6941
> URL: https://issues.apache.org/jira/browse/KAFKA-6941
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {code:java}
> public URI advertisedUrl() {
> UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
> Integer advertisedPort = 
> config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
> if (advertisedPort != null)
> builder.port(advertisedPort);
> else if (serverConnector != null)
> builder.port(serverConnector.getPort()); // should call 
> getLocalPort() instead
> log.info("Advertised URI: {}", builder.build());
> return builder.build();
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6911) Incorrect check for keystore/truststore dynamic update

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489975#comment-16489975
 ] 

ASF GitHub Bot commented on KAFKA-6911:
---

hachikuji closed pull request #5029: KAFKA-6911: Fix dynamic 
keystore/truststore update check
URL: https://github.com/apache/kafka/pull/5029
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 6989349fdbc..055404caab4 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -176,10 +176,10 @@ public void reconfigure(Map configs) throws 
KafkaException {
 }
 
 private SecurityStore maybeCreateNewKeystore(Map configs) {
-boolean keystoreChanged = 
Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), keystore.type) 
||
-
Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), 
keystore.path) ||
-
Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), 
keystore.password) ||
-
Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), 
keystore.keyPassword);
+boolean keystoreChanged = 
!Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), 
keystore.type) ||
+
!Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), 
keystore.path) ||
+
!Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), 
keystore.password) ||
+
!Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), 
keystore.keyPassword);
 
 if (keystoreChanged) {
 return createKeystore((String) 
configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
@@ -191,9 +191,9 @@ private SecurityStore maybeCreateNewKeystore(Map 
configs) {
 }
 
 private SecurityStore maybeCreateNewTruststore(Map configs) {
-boolean truststoreChanged = 
Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), 
truststore.type) ||
-
Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), 
truststore.path) ||
-
Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), 
truststore.password);
+boolean truststoreChanged = 
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), 
truststore.type) ||
+
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), 
truststore.path) ||
+
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), 
truststore.password);
 
 if (truststoreChanged) {
 return createTruststore((String) 
configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index f375dd68a75..2df4c4fe90f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -847,18 +847,14 @@ public void testServerKeystoreDynamicUpdate() throws 
Exception {
 
 CertStores invalidCertStores = new CertStores(true, "server", 
"127.0.0.1");
 Map  invalidConfigs = 
invalidCertStores.getTrustingConfig(clientCertStores);
-try {
-reconfigurableBuilder.validateReconfiguration(invalidConfigs);
-fail("Should have failed validation with an exception with 
different SubjectAltName");
-} catch (KafkaException e) {
-// expected exception
-}
-try {
-reconfigurableBuilder.reconfigure(invalidConfigs);
-fail("Should have failed to reconfigure with different 
SubjectAltName");
-} catch (KafkaException e) {
-// expected exception
-}
+verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, 
"keystore with different SubjectAltName");
+
+Map  missingStoreConfigs = new HashMap<>();
+missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
+missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
"some.keystore.path");
+missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, new 
Password("some.keystore.password"));
+missingStoreConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new 
Password("some.key.password"));
+verifyInvalidReconfigure(r

[jira] [Resolved] (KAFKA-6911) Incorrect check for keystore/truststore dynamic update

2018-05-24 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6911.

Resolution: Fixed

> Incorrect check for keystore/truststore dynamic update
> --
>
> Key: KAFKA-6911
> URL: https://issues.apache.org/jira/browse/KAFKA-6911
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The check to see if keystore or truststore needs updating is incorrect - it 
> checks if one of the configs has not changed, rather than changed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6914) Kafka Connect - Plugins class should have a constructor that can take in parent ClassLoader

2018-05-24 Thread Sriram KS (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489653#comment-16489653
 ] 

Sriram KS commented on KAFKA-6914:
--

[~ewencp] The case i have is I am building a Uber jar which contains all of 
Kafka Connect and its dependent jars so that i can run it as a jar application. 
I use Spring Boot maven plugin to create the Uber jar.

This app i am creating is a Spring boot jar application .

Need : to run the Kafka Connect as a jar application instead of sh.

Reason : the environment i have for deployment is Openshift managed and only 
template available is jar

Solution : wrote a wrapper for Distributed.java exactly having the same code in 
it , but manage the application parameters like properties and other 
dependencies using Spring.

Issue : The classLoader which Plugins class creates is Delegation Class Loader 
which in turn has parent class assigned as System.getClassLoader()  and Spring 
boot has constraint around it because of its own classLoading assumptions

[https://docs.spring.io/spring-boot/docs/current/reference/html/executable-jar.html]

Current Solution : Reimplemented Plugins and Delegating ClassLoader so that i 
create an instance of DelegatingClassLoader by passing on a Parent Class Loader 
which is Spring Boot class Loader

Easy Solution : having a new constructor in Plugin which can taken in a parent 
ClassLoader for the Delegating ClassLoader it creates

> Kafka Connect - Plugins class should have a constructor that can take in 
> parent ClassLoader
> ---
>
> Key: KAFKA-6914
> URL: https://issues.apache.org/jira/browse/KAFKA-6914
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriram KS
>Priority: Major
> Fix For: 1.1.1
>
>
> Currently Plugins class has a single constructor that takes in map of props.
> Please make Plugin class to have a constructor that takes in a classLoader as 
> well and use it to set DelegationClassLoader's parent classLoader.
> Reason:
> This will be useful if i am already having a managed class Loader environment 
> like a Spring boot app which resolves my class dependencies using my 
> maven/gradle dependency management.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6813) Remove deprecated APIs from KIP-120 and KIP-182 in Streams

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489641#comment-16489641
 ] 

ASF GitHub Bot commented on KAFKA-6813:
---

vvcephei opened a new pull request #5075: KAFKA-6813: return to double-counting 
for count topo names
URL: https://github.com/apache/kafka/pull/5075
 
 
   #4919 unintentionally changed the topology naming scheme. This change 
returns to the prior scheme.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove deprecated APIs from KIP-120 and KIP-182 in Streams
> --
>
> Key: KAFKA-6813
> URL: https://issues.apache.org/jira/browse/KAFKA-6813
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.0.0
>
>
> As we move on to the next major release 2.0, we can consider removing the 
> deprecated APIs from KIP-120 and KIP-182.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled

2018-05-24 Thread Jon Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee updated KAFKA-6946:
---
Summary: Keep the session id for incremental fetch when fetch responses are 
throttled   (was: Keep the fetch session id for incremental fetch when )

> Keep the session id for incremental fetch when fetch responses are throttled 
> -
>
> Key: KAFKA-6946
> URL: https://issues.apache.org/jira/browse/KAFKA-6946
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jon Lee
>Priority: Major
>
> The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with 
> INVALID_SESSION_ID if the response needs to be throttled due to quota 
> violation. If it is for incremental fetch, this will make the client reset 
> its session and send a full fetch request next time. This is not a 
> correctness issue, but it may affect performance when fetches are throttled.
> In case of incremental fetch, a throttled response should use the same 
> session id as before so that the next unthrottled response can be in the same 
> session. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6946) Keep the fetch session id for incremental fetch when

2018-05-24 Thread Jon Lee (JIRA)
Jon Lee created KAFKA-6946:
--

 Summary: Keep the fetch session id for incremental fetch when 
 Key: KAFKA-6946
 URL: https://issues.apache.org/jira/browse/KAFKA-6946
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.0.0
Reporter: Jon Lee


The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with 
INVALID_SESSION_ID if the response needs to be throttled due to quota 
violation. If it is for incremental fetch, this will make the client reset its 
session and send a full fetch request next time. This is not a correctness 
issue, but it may affect performance when fetches are throttled.

In case of incremental fetch, a throttled response should use the same session 
id as before so that the next unthrottled response can be in the same session. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6945) Add support to allow users to acquire delegation tokens for other users

2018-05-24 Thread Manikumar (JIRA)
Manikumar created KAFKA-6945:


 Summary: Add support to allow users to acquire delegation tokens 
for other users
 Key: KAFKA-6945
 URL: https://issues.apache.org/jira/browse/KAFKA-6945
 Project: Kafka
  Issue Type: Sub-task
Reporter: Manikumar
Assignee: Manikumar


Currently, we only allow a user to create delegation token for that user only. 
We should allow users to acquire delegation tokens for other users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6944) Add system tests testing the new throttling behavior using older clients/brokers

2018-05-24 Thread Jon Lee (JIRA)
Jon Lee created KAFKA-6944:
--

 Summary: Add system tests testing the new throttling behavior 
using older clients/brokers
 Key: KAFKA-6944
 URL: https://issues.apache.org/jira/browse/KAFKA-6944
 Project: Kafka
  Issue Type: Test
  Components: system tests
Affects Versions: 2.0.0
Reporter: Jon Lee


KAFKA-6028 (KIP-219) changes the throttling behavior on quota violation as 
follows:
 * the broker will send out a response with throttle time to the client 
immediately and mute the channel
 * upon receiving a response with a non-zero throttle time, the client will 
also block sending further requests to the broker until the throttle time is 
over.

The current system tests assume that both clients and brokers are of the same 
version. We'll need an additional set of quota tests that test throttling 
behavior between older clients and newer brokers and between newer clients and 
older brokers. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4628) Support KTable/GlobalKTable Joins

2018-05-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489493#comment-16489493
 ] 

Guozhang Wang commented on KAFKA-4628:
--

Hello Adam,

Here is my estimate on the complexity: implementation wise it would be quite 
similar to {{KStreamImpl#globalTableJoin()}}, but the difference is that the 
result should be a KTable, not a KStream, and also if the resulted KTable is 
materialized, we should update the materialized state store as well (a 
reference of this implementation detail would be in 
`KTableKTableAbstractJoin`'s extended classes for inner, outer, left joins). I 
think depending on your familiarity with the existing Kafka Streams code base 
it would be 5 - 8 days.

Since this is a new API you'd also need to file a KIP 
(https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) 
for it, and lead the discussion / drive the voting process. It may take you 
another day or two since it should be a pretty straight-forward one.

> Support KTable/GlobalKTable Joins
> -
>
> Key: KAFKA-4628
> URL: https://issues.apache.org/jira/browse/KAFKA-4628
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Priority: Major
>  Labels: needs-kip
>
> In KIP-99 we have added support for GlobalKTables, however we don't currently 
> support KTable/GlobalKTable joins as they require materializing a state store 
> for the join. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4628) Support KTable/GlobalKTable Joins

2018-05-24 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4628:
-
Labels: needs-kip  (was: )

> Support KTable/GlobalKTable Joins
> -
>
> Key: KAFKA-4628
> URL: https://issues.apache.org/jira/browse/KAFKA-4628
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Priority: Major
>  Labels: needs-kip
>
> In KIP-99 we have added support for GlobalKTables, however we don't currently 
> support KTable/GlobalKTable joins as they require materializing a state store 
> for the join. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6943) Have option to shutdown KS cleanly if any task crashes, or if all tasks crash

2018-05-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489485#comment-16489485
 ] 

Guozhang Wang commented on KAFKA-6943:
--

Hi Antony, for this feature request, what are the common scenarios that caused 
a single task to fail, but not the whole thread to die?

> Have option to shutdown KS cleanly if any task crashes, or if all tasks crash
> -
>
> Key: KAFKA-6943
> URL: https://issues.apache.org/jira/browse/KAFKA-6943
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Priority: Major
>
> ATM users have to implement this themselves. Might be nice to have an option 
> to configure that if all tasks crash, or if any crash, to initiate clean 
> shutdown.
> This also has a gotcha where atm if you call KS#close without a timeout, from 
> the uncaught exception handler, you dead lock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6862) test toolchain

2018-05-24 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6862.
--
Resolution: Invalid

Please reopen the Jira with more details.

> test toolchain
> --
>
> Key: KAFKA-6862
> URL: https://issues.apache.org/jira/browse/KAFKA-6862
> Project: Kafka
>  Issue Type: Test
>  Components: build
>Reporter: ravi
>Priority: Major
>
> test toolchain



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap

2018-05-24 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489444#comment-16489444
 ] 

Matthias J. Sax commented on KAFKA-4217:


Thanks for picking this up!

Added you to the list of contributors and assigned the Jira to you. You can now 
also assign Jiras to yourself.

> KStream.transform equivalent of flatMap
> ---
>
> Key: KAFKA-4217
> URL: https://issues.apache.org/jira/browse/KAFKA-4217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: api, needs-kip, newbie
>
> {{KStream.transform}} gives you access to state stores while allowing you to 
> return zero or one transformed {{KeyValue}}.  Alas, it is unclear what method 
> you should use if you want to access state stores and return zero or multiple 
> {{KeyValue}}.  Presumably you can use {{transform}}, always return {{null}}, 
> and use {{ProcessorContext.forward}} to emit {{KeyValues}}.
> It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or 
> allow store access from other {{KStream}} methods, such as {{flatMap}} itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-4217) KStream.transform equivalent of flatMap

2018-05-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4217:
--

Assignee: Bruno Cadonna

> KStream.transform equivalent of flatMap
> ---
>
> Key: KAFKA-4217
> URL: https://issues.apache.org/jira/browse/KAFKA-4217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: api, needs-kip, newbie
>
> {{KStream.transform}} gives you access to state stores while allowing you to 
> return zero or one transformed {{KeyValue}}.  Alas, it is unclear what method 
> you should use if you want to access state stores and return zero or multiple 
> {{KeyValue}}.  Presumably you can use {{transform}}, always return {{null}}, 
> and use {{ProcessorContext.forward}} to emit {{KeyValues}}.
> It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or 
> allow store access from other {{KStream}} methods, such as {{flatMap}} itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6925) Memory leak in org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl

2018-05-24 Thread John Roesler (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-6925:

Description: 
*Note: this issue was fixed incidentally in 2.0, so it is only present in 
versions 0.x and 1.x.*

 

The retained heap of 
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
 is surprisingly high for long running job. Over 100MB of heap for every stream 
after a week of uptime, when for the same application a few hours after start 
heap takes 2MB.

For the problematic instance majority of memory StreamsMetricsThreadImpl is 
occupied by hash map entries in parentSensors, over 8000 elements 100+kB each. 
For fresh instance there are less than 200 elements.

Below you could find retained set report generated from Eclipse Mat but I'm not 
fully sure about correctness due to complex object graph in the metrics related 
code. Number of objects in single StreamThread$StreamsMetricsThreadImpl  
instance.

 
{code:java}
Class Name | Objects | Shallow Heap
---
org.apache.kafka.common.metrics.KafkaMetric | 140,476 | 4,495,232
org.apache.kafka.common.MetricName | 140,476 | 4,495,232
org.apache.kafka.common.metrics.stats.SampledStat$Sample | 73,599 | 3,532,752
org.apache.kafka.common.metrics.stats.Meter | 42,104 | 1,347,328
org.apache.kafka.common.metrics.stats.Count | 42,104 | 1,347,328
org.apache.kafka.common.metrics.stats.Rate | 42,104 | 1,010,496
org.apache.kafka.common.metrics.stats.Total | 42,104 | 1,010,496
org.apache.kafka.common.metrics.stats.Max | 28,134 | 900,288
org.apache.kafka.common.metrics.stats.Avg | 28,134 | 900,288
org.apache.kafka.common.metrics.Sensor | 3,164 | 202,496
org.apache.kafka.common.metrics.Sensor[] | 3,164 | 71,088
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl|
 1 | 56
---
{code}
 

  was:
The retained heap of 
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
 is surprisingly high for long running job. Over 100MB of heap for every stream 
after a week of uptime, when for the same application a few hours after start 
heap takes 2MB.

For the problematic instance majority of memory StreamsMetricsThreadImpl is 
occupied by hash map entries in parentSensors, over 8000 elements 100+kB each. 
For fresh instance there are less than 200 elements.

Below you could find retained set report generated from Eclipse Mat but I'm not 
fully sure about correctness due to complex object graph in the metrics related 
code. Number of objects in single StreamThread$StreamsMetricsThreadImpl  
instance.

 
{code:java}
Class Name | Objects | Shallow Heap
---
org.apache.kafka.common.metrics.KafkaMetric | 140,476 | 4,495,232
org.apache.kafka.common.MetricName | 140,476 | 4,495,232
org.apache.kafka.common.metrics.stats.SampledStat$Sample | 73,599 | 3,532,752
org.apache.kafka.common.metrics.stats.Meter | 42,104 | 1,347,328
org.apache.kafka.common.metrics.stats.Count | 42,104 | 1,347,328
org.apache.kafka.common.metrics.stats.Rate | 42,104 | 1,010,496
org.apache.kafka.common.metrics.stats.Total | 42,104 | 1,010,496
org.apache.kafka.common.metrics.stats.Max | 28,134 | 900,288
org.apache.kafka.common.metrics.stats.Avg | 28,134 | 900,288
org.apache.kafka.common.metrics.Sensor | 3,164 | 202,496
org.apache.kafka.common.metrics.Sensor[] | 3,164 | 71,088
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl|
 1 | 56
---
{code}
 


> Memory leak in 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
> -
>
> Key: KAFKA-6925
> URL: https://issues.apache.org/jira/browse/KAFKA-6925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Marcin Kuthan
>Assignee: John Roesler
>Priority: Major
>
> *Note: this issue was fixed incidentally in 2.0, so it is only present in 
> versions 0.x and 1.x.*
>  
> The retained heap of 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  is surprisingly high for long running job. Over 100MB of heap for every 
> stream after a week of uptime, when for the same application a few hours 
> after start heap takes 2MB.
> For the problematic instance majority of memory StreamsMetricsThreadImpl is 
> occupied by hash map entries in parentSen

[jira] [Assigned] (KAFKA-6925) Memory leak in org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl

2018-05-24 Thread John Roesler (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-6925:
---

Assignee: John Roesler

> Memory leak in 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
> -
>
> Key: KAFKA-6925
> URL: https://issues.apache.org/jira/browse/KAFKA-6925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Marcin Kuthan
>Assignee: John Roesler
>Priority: Major
>
> The retained heap of 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  is surprisingly high for long running job. Over 100MB of heap for every 
> stream after a week of uptime, when for the same application a few hours 
> after start heap takes 2MB.
> For the problematic instance majority of memory StreamsMetricsThreadImpl is 
> occupied by hash map entries in parentSensors, over 8000 elements 100+kB 
> each. For fresh instance there are less than 200 elements.
> Below you could find retained set report generated from Eclipse Mat but I'm 
> not fully sure about correctness due to complex object graph in the metrics 
> related code. Number of objects in single 
> StreamThread$StreamsMetricsThreadImpl  instance.
>  
> {code:java}
> Class Name | Objects | Shallow Heap
> ---
> org.apache.kafka.common.metrics.KafkaMetric | 140,476 | 4,495,232
> org.apache.kafka.common.MetricName | 140,476 | 4,495,232
> org.apache.kafka.common.metrics.stats.SampledStat$Sample | 73,599 | 3,532,752
> org.apache.kafka.common.metrics.stats.Meter | 42,104 | 1,347,328
> org.apache.kafka.common.metrics.stats.Count | 42,104 | 1,347,328
> org.apache.kafka.common.metrics.stats.Rate | 42,104 | 1,010,496
> org.apache.kafka.common.metrics.stats.Total | 42,104 | 1,010,496
> org.apache.kafka.common.metrics.stats.Max | 28,134 | 900,288
> org.apache.kafka.common.metrics.stats.Avg | 28,134 | 900,288
> org.apache.kafka.common.metrics.Sensor | 3,164 | 202,496
> org.apache.kafka.common.metrics.Sensor[] | 3,164 | 71,088
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl|
>  1 | 56
> ---
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6925) Memory leak in org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl

2018-05-24 Thread John Roesler (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-6925:

Affects Version/s: 0.11.0.2
   1.1.0

> Memory leak in 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
> -
>
> Key: KAFKA-6925
> URL: https://issues.apache.org/jira/browse/KAFKA-6925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Marcin Kuthan
>Priority: Major
>
> The retained heap of 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  is surprisingly high for long running job. Over 100MB of heap for every 
> stream after a week of uptime, when for the same application a few hours 
> after start heap takes 2MB.
> For the problematic instance majority of memory StreamsMetricsThreadImpl is 
> occupied by hash map entries in parentSensors, over 8000 elements 100+kB 
> each. For fresh instance there are less than 200 elements.
> Below you could find retained set report generated from Eclipse Mat but I'm 
> not fully sure about correctness due to complex object graph in the metrics 
> related code. Number of objects in single 
> StreamThread$StreamsMetricsThreadImpl  instance.
>  
> {code:java}
> Class Name | Objects | Shallow Heap
> ---
> org.apache.kafka.common.metrics.KafkaMetric | 140,476 | 4,495,232
> org.apache.kafka.common.MetricName | 140,476 | 4,495,232
> org.apache.kafka.common.metrics.stats.SampledStat$Sample | 73,599 | 3,532,752
> org.apache.kafka.common.metrics.stats.Meter | 42,104 | 1,347,328
> org.apache.kafka.common.metrics.stats.Count | 42,104 | 1,347,328
> org.apache.kafka.common.metrics.stats.Rate | 42,104 | 1,010,496
> org.apache.kafka.common.metrics.stats.Total | 42,104 | 1,010,496
> org.apache.kafka.common.metrics.stats.Max | 28,134 | 900,288
> org.apache.kafka.common.metrics.stats.Avg | 28,134 | 900,288
> org.apache.kafka.common.metrics.Sensor | 3,164 | 202,496
> org.apache.kafka.common.metrics.Sensor[] | 3,164 | 71,088
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl|
>  1 | 56
> ---
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap

2018-05-24 Thread Bruno Cadonna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489401#comment-16489401
 ] 

Bruno Cadonna commented on KAFKA-4217:
--

I would like to work on this issue. Could somebody assign this issue to me or 
give me appropriate permissions to do so?  

> KStream.transform equivalent of flatMap
> ---
>
> Key: KAFKA-4217
> URL: https://issues.apache.org/jira/browse/KAFKA-4217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api, needs-kip, newbie
>
> {{KStream.transform}} gives you access to state stores while allowing you to 
> return zero or one transformed {{KeyValue}}.  Alas, it is unclear what method 
> you should use if you want to access state stores and return zero or multiple 
> {{KeyValue}}.  Presumably you can use {{transform}}, always return {{null}}, 
> and use {{ProcessorContext.forward}} to emit {{KeyValues}}.
> It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or 
> allow store access from other {{KStream}} methods, such as {{flatMap}} itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6925) Memory leak in org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl

2018-05-24 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489252#comment-16489252
 ] 

John Roesler commented on KAFKA-6925:
-

I read the ticket and took another look at parentSensors in 1.0.


I think my trunk change would fix it, since we no longer have a parentSensors 
map at all, but it might be tricky to make that change surgically and not wind 
up with another 1k+ diff to review and merge.

Alternatively, looking at parentSensors, it seems we only add to the map. I 
think removeSensor should remove the sensor from the parentSensors map after it 
removes them from the registry.

 
{noformat}
public void removeSensor(Sensor sensor) {
 Objects.requireNonNull(sensor, "Sensor is null");
 metrics.removeSensor(sensor.name());

 final Sensor parent = parentSensors.get(sensor);
 if (parent != null) {
  metrics.removeSensor(parent.name());
 }
}{noformat}
Since parentSensors are keyed and valued by Sensor instances, it will retain 
the sensors even after they have been otherwise unloaded. Other memory 
improvements here would be to store the names instead of the whole sensor 
(since we actually just need the name to remove them), or to use weak 
references in the map so that the map alone can't keep the objects alive.

All in all, I think that rather than "backporting" my change (which would 
really be a re-implementation of one aspect of it), I'd recommend to make 
parentSensors a Map (childName to parentName) and to remove from 
the map during removeSensor.

> Memory leak in 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
> -
>
> Key: KAFKA-6925
> URL: https://issues.apache.org/jira/browse/KAFKA-6925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Marcin Kuthan
>Priority: Major
>
> The retained heap of 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  is surprisingly high for long running job. Over 100MB of heap for every 
> stream after a week of uptime, when for the same application a few hours 
> after start heap takes 2MB.
> For the problematic instance majority of memory StreamsMetricsThreadImpl is 
> occupied by hash map entries in parentSensors, over 8000 elements 100+kB 
> each. For fresh instance there are less than 200 elements.
> Below you could find retained set report generated from Eclipse Mat but I'm 
> not fully sure about correctness due to complex object graph in the metrics 
> related code. Number of objects in single 
> StreamThread$StreamsMetricsThreadImpl  instance.
>  
> {code:java}
> Class Name | Objects | Shallow Heap
> ---
> org.apache.kafka.common.metrics.KafkaMetric | 140,476 | 4,495,232
> org.apache.kafka.common.MetricName | 140,476 | 4,495,232
> org.apache.kafka.common.metrics.stats.SampledStat$Sample | 73,599 | 3,532,752
> org.apache.kafka.common.metrics.stats.Meter | 42,104 | 1,347,328
> org.apache.kafka.common.metrics.stats.Count | 42,104 | 1,347,328
> org.apache.kafka.common.metrics.stats.Rate | 42,104 | 1,010,496
> org.apache.kafka.common.metrics.stats.Total | 42,104 | 1,010,496
> org.apache.kafka.common.metrics.stats.Max | 28,134 | 900,288
> org.apache.kafka.common.metrics.stats.Avg | 28,134 | 900,288
> org.apache.kafka.common.metrics.Sensor | 3,164 | 202,496
> org.apache.kafka.common.metrics.Sensor[] | 3,164 | 71,088
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl|
>  1 | 56
> ---
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4628) Support KTable/GlobalKTable Joins

2018-05-24 Thread Adam Bellemare (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489168#comment-16489168
 ] 

Adam Bellemare commented on KAFKA-4628:
---

[~guozhang] - Do you happen to know an estimate of the complexity of this 
ticket? I would be willing to look into addressing it as I believe that it is 
quite useful in cases where you need a gating pattern. If the left table is 
very large and the right table is very small, repartitioning the left table on 
the right's keys can lead to an unbounded size growth of the event value.

 

These use-cases are especially prevalent with migrations away from relational 
database type data, using Kafka Connect or other CDC patterns. The highly 
normalized data makes it difficult to use as events. 

If anyone else can offer up an estimate of the complexity of this task I would 
be very appreciative. I will do my own investigation too and hopefully come up 
with something.

 

> Support KTable/GlobalKTable Joins
> -
>
> Key: KAFKA-4628
> URL: https://issues.apache.org/jira/browse/KAFKA-4628
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Priority: Major
>
> In KIP-99 we have added support for GlobalKTables, however we don't currently 
> support KTable/GlobalKTable joins as they require materializing a state store 
> for the join. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6940) Kafka Cluster and Zookeeper ensemble configuration with SASL authentication

2018-05-24 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6940:
-
Component/s: (was: producer )
 (was: log)
 (was: KafkaConnect)
 (was: consumer)
 core

> Kafka Cluster and Zookeeper ensemble configuration with SASL authentication
> ---
>
> Key: KAFKA-6940
> URL: https://issues.apache.org/jira/browse/KAFKA-6940
> Project: Kafka
>  Issue Type: Task
>  Components: core, security, zkclient
>Affects Versions: 0.11.0.2
> Environment: PRE Production
>Reporter: Shashank Jain
>Priority: Blocker
>  Labels: security, test
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> Hi All, 
>  
>  
> I have a working  Kafka Cluster and Zookeeper Ensemble  but  after  
> integrating   SASL authentication I am facing below exception, 
>  
>  
> Zookeeper:- 
>  
>  
> 2018-05-23 07:39:59,476 [myid:1] - INFO  [ProcessThread(sid:1 cport:-1):: ] - 
> Got user-level KeeperException when processing sessionid:0x301cae0b3480002 
> type:delete cxid:0x48 zxid:0x2004e txntype:-1 reqpath:n/a Error 
> Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for 
> /admin/preferred_replica_election
> 2018-05-23 07:40:39,240 [myid:1] - INFO  [ProcessThread(sid:1 
> cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when 
> processing sessionid:0x200b4f13c190006 type:create cxid:0x20 zxid:0x20052 
> txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists 
> for /brokers
> 2018-05-23 07:40:39,240 [myid:1] - INFO  [ProcessThread(sid:1 
> cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when 
> processing sessionid:0x200b4f13c190006 type:create cxid:0x21 zxid:0x20053 
> txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = 
> NodeExists for /brokers/ids
> 2018-05-23 07:41:00,864 [myid:1] - INFO  [ProcessThread(sid:1 
> cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when 
> processing sessionid:0x301cae0b3480004 type:create cxid:0x20 zxid:0x20058 
> txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists 
> for /brokers
> 2018-05-23 07:41:00,864 [myid:1] - INFO  [ProcessThread(sid:1 
> cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when 
> processing sessionid:0x301cae0b3480004 type:create cxid:0x21 zxid:0x20059 
> txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = 
> NodeExists for /brokers/ids
> 2018-05-23 07:41:28,456 [myid:1] - INFO  [ProcessThread(sid:1 
> cport:-1)::PrepRequestProcessor@487] - Processed session termination for 
> sessionid: 0x200b4f13c190002
> 2018-05-23 07:41:29,563 [myid:1] - INFO  [ProcessThread(sid:1 
> cport:-1)::PrepRequestProcessor@487] - Processed session termination for 
> sessionid: 0x301cae0b3480002
> 2018-05-23 07:41:29,569 [myid:1] - INFO  [ProcessThread(sid:1 
> cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when 
> processing sessionid:0x200b4f13c190006 type:create cxid:0x2d zxid:0x2005f 
> txntype:-1 reqpath:n/a Error Path:/controller Error:KeeperErrorCode = 
> NodeExists for /controller
> 2018-05-23 07:41:29,679 [myid:1] - INFO  [ProcessThread(sid:1 
> cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when 
> processing sessionid:0x301cae0b3480004 type:delete cxid:0x4e zxid:0x20061 
> txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election 
> Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
>  
>  
> Kafka:- 
>  
> [2018-05-23 09:06:31,969] ERROR [ReplicaFetcherThread-0-1]: Error for 
> partition [23MAY,0] to broker 
> 1:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
>  
>  
>  
> ERROR [ReplicaFetcherThread-0-2]: Current offset 142474 for partition 
> [23MAY,1] out of range; reset offset to 142478 
> (kafka.server.ReplicaFetcherThread)
>  
>  
> ERROR [ReplicaFetcherThread-0-2]: Error for partition [23MAY,2] to broker 
> 2:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
> is not the leader for that topic-partition. 
> (kafka.server.ReplicaFetcherThread)
>  
>  
>  
> Below are my configuration:- 
>  
>  
> Zookeeper:- 
>  
>  java.env
> SERVER_JVMFLAGS="-Djava.security.auth.login.config=/usr/local/zookeeper/conf/ZK_jaas.conf"
>  
>  
> ZK_jaas.conf
> Server
>  
> { org.apache.zookeeper.server.auth.DigestLoginModule required
>   username="admin"
>   password="admin-secret"
>   user_admin="admin-secret";
>  };
>  
> QuorumServer {
>        org.apache.zookeeper.server.auth.DigestLoginModule required
>        

[jira] [Created] (KAFKA-6943) Have option to shutdown KS cleanly if any task crashes, or if all tasks crash

2018-05-24 Thread Antony Stubbs (JIRA)
Antony Stubbs created KAFKA-6943:


 Summary: Have option to shutdown KS cleanly if any task crashes, 
or if all tasks crash
 Key: KAFKA-6943
 URL: https://issues.apache.org/jira/browse/KAFKA-6943
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.1.0
Reporter: Antony Stubbs


ATM users have to implement this themselves. Might be nice to have an option to 
configure that if all tasks crash, or if any crash, to initiate clean shutdown.

This also has a gotcha where atm if you call KS#close without a timeout, from 
the uncaught exception handler, you dead lock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6942) Connect connectors api doesn't show versions of connectors

2018-05-24 Thread Antony Stubbs (JIRA)
Antony Stubbs created KAFKA-6942:


 Summary: Connect connectors api doesn't show versions of connectors
 Key: KAFKA-6942
 URL: https://issues.apache.org/jira/browse/KAFKA-6942
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Affects Versions: 1.1.0
Reporter: Antony Stubbs


Would be very useful to have the connector list API response also return the 
version of the installed connectors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6762) log-cleaner thread terminates due to java.lang.IllegalStateException

2018-05-24 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488998#comment-16488998
 ] 

Ismael Juma commented on KAFKA-6762:


This has been fixed in CP 4.1.1, please check the release notes. It only 
happens if transactions are being used and there's already a JIRA for it.

> log-cleaner thread terminates due to java.lang.IllegalStateException
> 
>
> Key: KAFKA-6762
> URL: https://issues.apache.org/jira/browse/KAFKA-6762
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
> Environment: os: GNU/Linux 
> arch: x86_64 
> Kernel: 4.9.77 
> jvm: OpenJDK 1.8.0
>Reporter: Ricardo Bartolome
>Priority: Major
> Attachments: __consumer_offsets-9_.tar.xz
>
>
> We are experiencing some problems with kafka log-cleaner thread on Kafka 
> 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order 
> to fix KAFKA-6683, but until then we can only confirm that it happens in 
> 1.0.0.
> log-cleaner thread crashes after a while with the following error:
> {code:java}
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-31. (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for 
> __consumer_offsets-31... (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). 
> (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log 
> __consumer_offsets-31 complete. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 
> (cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior 
> to Sat Feb 24 11:04:21 GMT 2018
> )... (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 
> 0, discarding deletes. (kafka.log.LogClea
> ner)
> [2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 262144bytes to 524288 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 524288bytes to 112 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.lang.IllegalStateException: This log contains a message larger than 
> maximum allowable size of 112.
> at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459)
> at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396)
> at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395)
> at scala.collection.immutable.List.foreach(List.scala:389)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:395)
> at kafka.log.Cleaner.clean(LogCleaner.scala:372)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> [2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,773] INFO The cleaning for partition 
> __broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,773] INFO Compaction for partition 
> __broker-11-health-check-0 is resumed (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,774] INFO The cleaning for partition 
> __broker-11-health-check-0 is aborted (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown 
> completed (kafka.log.LogCleaner)
> {code}
> What we know so far is:
>  * We are unable to reproduce it yet in a consistent manner.
>  * It only happens in the PRO cluster and not in the PRE cluster for the same 
> customer (which message payloads are very similar)
>  * Checking our Kafka logs, it only happened on the internal topics 
> *__consumer_offsets-**
>  * When we restart the broker process the log-cleaner starts working again 
> but it can take between 3 minutes and some hours to die again.
>  * We workaround it by temporary increasing the message.max.bytes and 
> replica.fetch.max.bytes values to 10485760 (10MB) from default 112 (~1MB).
> ** Before message.max.bytes = 10MB, we tried to mat

[jira] [Resolved] (KAFKA-4505) Cannot get topic lag since kafka upgrade from 0.8.1.0 to 0.10.1.0

2018-05-24 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4505.
--
Resolution: Auto Closed

{color:#00}Closing inactive issue. Please reopen if you think the issue 
still exists{color}
 

> Cannot get topic lag since kafka upgrade from 0.8.1.0 to 0.10.1.0
> -
>
> Key: KAFKA-4505
> URL: https://issues.apache.org/jira/browse/KAFKA-4505
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, metrics, offset manager
>Affects Versions: 0.10.1.0
>Reporter: Romaric Parmentier
>Priority: Critical
>
> We were using kafka 0.8.1.1 and we just migrate to version 0.10.1.0.
> Since we migrate we are using the new script kafka-consumer-groups.sh to 
> retreive topic lags but it don't seem to work anymore. 
> Because the application is using the 0.8 driver we have added the following 
> conf to each kafka servers:
> log.message.format.version=0.8.2
> inter.broker.protocol.version=0.10.0.0
> When I'm using the option --list with kafka-consumer-groups.sh I can see 
> every consumer groups I'm using but the --describe is not working:
> /usr/share/kafka$ bin/kafka-consumer-groups.sh --zookeeper ip:2181 --describe 
> --group group_name
> No topic available for consumer group provided
> GROUP  TOPIC  PARTITION  
> CURRENT-OFFSET  LOG-END-OFFSET  LAG OWNER
> When I'm looking into zookeeper I can see the offset increasing for this 
> consumer group.
> Any idea ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6616) kafka-merge-pr.py should use GitHub's REST API to merge

2018-05-24 Thread Viktor Popov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488814#comment-16488814
 ] 

Viktor Popov commented on KAFKA-6616:
-

[~ijuma], this one looks really interesting. May I start working on it.

Regards,

Victor

 

> kafka-merge-pr.py should use GitHub's REST API to merge
> ---
>
> Key: KAFKA-6616
> URL: https://issues.apache.org/jira/browse/KAFKA-6616
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>  Labels: newbie
>
> The merge script currently squashes the commits in the pull request locally 
> and then merges it to the target branch. It can also cherry-pick it to other 
> branches. The downside is that GitHub doesn't know that the pull request has 
> been merged. As a workaround, the script includes a keyword in the commit 
> message to close the pull request. Since the merged commit is different to 
> the pull request branch, GitHub assumes that the PR was not merged.
> [~hachikuji] suggested that an API may be available that mimics what the 
> GitHub merge button does. And he is correct. Given our recent transition to 
> GitBox, committers have write access to GitHub, so it's feasible to update 
> the merge script to do this. Rough steps:
>  # Replace local squashing and merging with GitHub REST API for merging 
> ([https://developer.github.com/v3/pulls/#merge-a-pull-request-merge-button)]
>  # After the merge, pull changes from target branch and offer the option to 
> cherry-pick to other branches (i.e. the code may have to be updated a little 
> for the rest of the workflow to work).
>  # Remove the code that adds "Closes #1234..." (e.g. "Closes #3917 from 
> ewencp/stage-docs") to the squashed commit message.
>  # Update wiki documentation and code to state that GITHUB_OAUTH_KEY must be 
> set (it's currently optional since we don't rely on any operations that 
> require authentication).
>  # Update wiki documentation to remove the main downside for using the merge 
> script and perhaps to recommend it.
> Wiki documentation: 
> [https://cwiki.apache.org/confluence/display/KAFKA/Merging+Github+Pull+Requests]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2036) Consumer and broker have different networks

2018-05-24 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2036.
--
Resolution: Duplicate

Resolving this duplicate of KIP-235/KIP-302 

> Consumer and broker have different networks
> ---
>
> Key: KAFKA-2036
> URL: https://issues.apache.org/jira/browse/KAFKA-2036
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.8.2.1
> Environment: oracle java {7,8}, ipv6 only consumer, ipv4 + ipv6 broker
>Reporter: Arsenii Krasikov
>Assignee: Jun Rao
>Priority: Major
> Attachments: patch, patch2
>
>
> If broker is connected to several networks ( for example ipv6 and ipv4 ) and 
> not all of them are reachable to consumer then 
> {{kafka.network.BlockingChannel}} gives up to connect after the first 
> "Network is unreachable" error not triyng remaining networks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6941) when passing port = 0 to worker, the advertisedPort still is 0 rather than a random port

2018-05-24 Thread Chia-Ping Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488767#comment-16488767
 ] 

Chia-Ping Tsai commented on KAFKA-6941:
---

BTW, the worker id base on the advertisedHost and advertisedPort. However, we 
have to build the worker id before doing the port binding so the worker id 
always is host:0 if the port is assigned to zero. It may cause some troubles 
when invoking many workers in a single JVM. I guess the problem can be resolved 
by making worker id configurable (KAFKA-6931)

> when passing port = 0 to worker, the advertisedPort still is 0 rather than a 
> random port
> 
>
> Key: KAFKA-6941
> URL: https://issues.apache.org/jira/browse/KAFKA-6941
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {code:java}
> public URI advertisedUrl() {
> UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
> Integer advertisedPort = 
> config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
> if (advertisedPort != null)
> builder.port(advertisedPort);
> else if (serverConnector != null)
> builder.port(serverConnector.getPort()); // should call 
> getLocalPort() instead
> log.info("Advertised URI: {}", builder.build());
> return builder.build();
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3793) Kafka Python Consumer library messages gets truncated

2018-05-24 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3793.
--
Resolution: Cannot Reproduce

{color:#00}Closing inactive issue. Please reopen if you think the issue 
still exists{color}
 

> Kafka Python Consumer library messages gets truncated
> -
>
> Key: KAFKA-3793
> URL: https://issues.apache.org/jira/browse/KAFKA-3793
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rahul
>Priority: Major
>
> Snippet code is below:
> from kafka import KafkaConsumer
> from kafka.client import KafkaClient
> from kafka.consumer import SimpleConsumer
> consumer = KafkaConsumer('eventdetails_ingestion' , 
> group_id='1',bootstrap_servers=‘:9092', 
> max_partition_fetch_bytes=1055)
> for msg in consumer:
>try:
>jValue = json.loads(str(msg.value))
>   except ValueError:
>fileErr.write(str(msg.value)+"\n")
> Steps:
> We send/produce large sets of messages to Kafka of around 20 to 30 KB size 
> each messages in JSON format and producing around 200 messages / sec for 1 
> hour duration. We have 3 Kafka Brokers running and I am trying to consume the 
> messages from these 3 Kafka Brokers from the same topic using the above code. 
> The problem is that sometimes some of the messages gets truncated, I am not 
> sure why it happen ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6941) when passing port = 0 to worker, the advertisedPort still is 0 rather than a random port

2018-05-24 Thread Chia-Ping Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488758#comment-16488758
 ] 

Chia-Ping Tsai commented on KAFKA-6941:
---

I noticed this issue when building the mini connector cluster. After assigning 
the zero port to worker, I try to get the random port through 
RestServer#advertisedUrl. However, it always fail to get a truly random port. 

> when passing port = 0 to worker, the advertisedPort still is 0 rather than a 
> random port
> 
>
> Key: KAFKA-6941
> URL: https://issues.apache.org/jira/browse/KAFKA-6941
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {code:java}
> public URI advertisedUrl() {
> UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
> Integer advertisedPort = 
> config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
> if (advertisedPort != null)
> builder.port(advertisedPort);
> else if (serverConnector != null)
> builder.port(serverConnector.getPort()); // should call 
> getLocalPort() instead
> log.info("Advertised URI: {}", builder.build());
> return builder.build();
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6941) when passing port = 0 to worker, the advertisedPort still is 0 rather than a random port

2018-05-24 Thread Chia-Ping Tsai (JIRA)
Chia-Ping Tsai created KAFKA-6941:
-

 Summary: when passing port = 0 to worker, the advertisedPort still 
is 0 rather than a random port
 Key: KAFKA-6941
 URL: https://issues.apache.org/jira/browse/KAFKA-6941
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


{code:java}
public URI advertisedUrl() {
UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());

Integer advertisedPort = 
config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
if (advertisedPort != null)
builder.port(advertisedPort);
else if (serverConnector != null)
builder.port(serverConnector.getPort()); // should call getLocalPort() 
instead

log.info("Advertised URI: {}", builder.build());

return builder.build();
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5519) Support for multiple certificates in a single keystore

2018-05-24 Thread sebastien diaz (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488721#comment-16488721
 ] 

sebastien diaz commented on KAFKA-5519:
---

I m working for a future production with a centralized monitoring tool.

Mixing different encryption technologes for JMX/RMI/... on a weblogic server. 
the usage of a single keystore on the same server is not optionnal and by 
server design.

Please add a config setCertAlias for clients/producer/consumer.

 

 

> Support for multiple certificates in a single keystore
> --
>
> Key: KAFKA-5519
> URL: https://issues.apache.org/jira/browse/KAFKA-5519
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Affects Versions: 0.10.2.1
>Reporter: Alla Tumarkin
>Priority: Major
>  Labels: upstream-issue
>
> Background
> Currently, we need to have a keystore exclusive to the component with exactly 
> one key in it. Looking at the JSSE Reference guide, it seems like we would 
> need to introduce our own KeyManager into the SSLContext which selects a 
> configurable key alias name.
> https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509KeyManager.html 
> has methods for dealing with aliases.
> The goal here to use a specific certificate (with proper ACLs set for this 
> client), and not just the first one that matches.
> Looks like it requires a code change to the SSLChannelBuilder



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6762) log-cleaner thread terminates due to java.lang.IllegalStateException

2018-05-24 Thread Uwe Eisele (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488588#comment-16488588
 ] 

Uwe Eisele edited comment on KAFKA-6762 at 5/24/18 8:26 AM:


After a restart of the broker, the log-cleaner thread runs in the same problem 
again. During debugging of the log cleaner we saw that the affected log 
segement only contains deletable control batches in at least the first 112 
bytes. In this case the log cleaner states that zero messages have been read 
and finally throws the IllegalStateExeption. It seems that the log cleaner can 
not handle larger chunks of a segment that only contain deletable records.

{code:java}
092 class LogCleaner ... {
...
558 private[log] def cleanInto(...) {
595   while (position < sourceRecords.sizeInBytes) {
...
601  sourceRecords.readInto(readBuffer, position)
602  val records = MemoryRecords.readableRecords(readBuffer)
...
604  val result = records.filterTo(topicPartition, logCleanerFilter, 
writeBuffer, maxLogMessageSize, decompressionBufferSupplier) //in this case an 
empty result with zero read messages is returned, though the read buffer 
contains records (as mentioned above all of them are deletable control batches)
...
626  if (readBuffer.limit() > 0 && result.messagesRead == 0)
627growBuffers(maxLogMessageSize) // throws the IllegalStateException 
if the readBuffer has reached the max messages size 
628   }
...
{code}

The reason that no records are returned in the filter method is the following 
code block in the MemoryRecords class:
{code:java}
141 private static FilterResult filterTo(...) {
...
154   for (MutableRecordBatch batch : batches) {
155 bytesRead += batch.sizeInBytes();
156
157 BatchRetention batchRetention = filter.checkBatchRetention(batch);
158 if (batchRetention == BatchRetention.DELETE)
159   continue;
{code}

The segment which caused this problem can be found here 
[^__consumer_offsets-9_.tar.xz] .

We run Confluent Kafka 4.1.0 with Kafka commitId : 93e03414f72c2485


was (Author: ueisele):
After a restart of the broker, log-cleaner thread runs in the same problem 
again. During debugging of the log cleaner we saw that the affected log 
segement only contains deletable control batches in at least the first 112 
bytes. In this case the log cleaner states that zero messages have been read 
and finally throws the IllegalStateExeption. It seems that the log cleaner can 
not handle larger chunks of a segment that only contain deletable records.

{code:java}
092 class LogCleaner ... {
...
558 private[log] def cleanInto(...) {
595   while (position < sourceRecords.sizeInBytes) {
...
601  sourceRecords.readInto(readBuffer, position)
602  val records = MemoryRecords.readableRecords(readBuffer)
...
604  val result = records.filterTo(topicPartition, logCleanerFilter, 
writeBuffer, maxLogMessageSize, decompressionBufferSupplier) //in this case an 
empty result with zero read messages is returned, though the read buffer 
contains records (as mentioned above all of them are deletable control batches)
...
626  if (readBuffer.limit() > 0 && result.messagesRead == 0)
627growBuffers(maxLogMessageSize) // throws the IllegalStateException 
if the readBuffer has reached the max messages size 
628   }
...
{code}

The reason that no records are returned in the filter method is the following 
code block in the MemoryRecords class:
{code:java}
141 private static FilterResult filterTo(...) {
...
154   for (MutableRecordBatch batch : batches) {
155 bytesRead += batch.sizeInBytes();
156
157 BatchRetention batchRetention = filter.checkBatchRetention(batch);
158 if (batchRetention == BatchRetention.DELETE)
159   continue;
{code}

The segment which caused this problem can be found here 
[^__consumer_offsets-9_.tar.xz] .

We run Confluent Kafka 4.1.0 with Kafka commitId : 93e03414f72c2485

> log-cleaner thread terminates due to java.lang.IllegalStateException
> 
>
> Key: KAFKA-6762
> URL: https://issues.apache.org/jira/browse/KAFKA-6762
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
> Environment: os: GNU/Linux 
> arch: x86_64 
> Kernel: 4.9.77 
> jvm: OpenJDK 1.8.0
>Reporter: Ricardo Bartolome
>Priority: Major
> Attachments: __consumer_offsets-9_.tar.xz
>
>
> We are experiencing some problems with kafka log-cleaner thread on Kafka 
> 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order 
> to fix KAFKA-6683, but until then we can only confirm that it happens in 
> 1.0.0.
> log-cleaner thread crashes after a while with the following error:
> {code:java}
> [2018-03-28 

[jira] [Comment Edited] (KAFKA-6762) log-cleaner thread terminates due to java.lang.IllegalStateException

2018-05-24 Thread Uwe Eisele (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488588#comment-16488588
 ] 

Uwe Eisele edited comment on KAFKA-6762 at 5/24/18 8:25 AM:


After a restart of the broker, log-cleaner thread runs in the same problem 
again. During debugging of the log cleaner we saw that the affected log 
segement only contains deletable control batches in at least the first 112 
bytes. In this case the log cleaner states that zero messages have been read 
and finally throws the IllegalStateExeption. It seems that the log cleaner can 
not handle larger chunks of a segment that only contain deletable records.

{code:java}
092 class LogCleaner ... {
...
558 private[log] def cleanInto(...) {
595   while (position < sourceRecords.sizeInBytes) {
...
601  sourceRecords.readInto(readBuffer, position)
602  val records = MemoryRecords.readableRecords(readBuffer)
...
604  val result = records.filterTo(topicPartition, logCleanerFilter, 
writeBuffer, maxLogMessageSize, decompressionBufferSupplier) //in this case an 
empty result with zero read messages is returned, though the read buffer 
contains records (as mentioned above all of them are deletable control batches)
...
626  if (readBuffer.limit() > 0 && result.messagesRead == 0)
627growBuffers(maxLogMessageSize) // throws the IllegalStateException 
if the readBuffer has reached the max messages size 
628   }
...
{code}

The reason that no records are returned in the filter method is the following 
code block in the MemoryRecords class:
{code:java}
141 private static FilterResult filterTo(...) {
...
154   for (MutableRecordBatch batch : batches) {
155 bytesRead += batch.sizeInBytes();
156
157 BatchRetention batchRetention = filter.checkBatchRetention(batch);
158 if (batchRetention == BatchRetention.DELETE)
159   continue;
{code}

The segment which caused this problem can be found here 
[^__consumer_offsets-9_.tar.xz] .

We run Confluent Kafka 4.1.0 with Kafka commitId : 93e03414f72c2485


was (Author: ueisele):
After a restart of the broker, log-cleaner thread runs in the same problem 
again. During debugging of the log cleaner we saw that the affected log 
segement only contains deletable control batches in at least the first 112 
bytes. In this case the log cleaner states that zero messages have been read 
and finally throws the IllegalStateExeption. It seems that the log cleaner can 
not handle larger chunks of a segment that only contain deletable records.


{code:java}
092 class LogCleaner ... {
...
558 private[log] def cleanInto(...) {
595   while (position < sourceRecords.sizeInBytes) {
...
601  sourceRecords.readInto(readBuffer, position)
602  val records = MemoryRecords.readableRecords(readBuffer)
...
604  val result = records.filterTo(topicPartition, logCleanerFilter, 
writeBuffer, maxLogMessageSize, decompressionBufferSupplier) //in this case an 
empty result with zero read messages is returned, though the read buffer 
contains records (as mentioned above all of them are deletable control batches)
...
626  if (readBuffer.limit() > 0 && result.messagesRead == 0)
627growBuffers(maxLogMessageSize) // throws the IllegalStateException 
if the readBuffer has reached the max messages size 
628   }
...
{code}

The reason that no records are returned in the filter method is the following 
code block in the MemoryRecords class:
{code:java}
141 private static FilterResult filterTo(...) {
...
154   for (MutableRecordBatch batch : batches) {
155 bytesRead += batch.sizeInBytes();
156
157 BatchRetention batchRetention = filter.checkBatchRetention(batch);
158 if (batchRetention == BatchRetention.DELETE)
159   continue;
{code}

The segment which caused this problem can be found here 
[^__consumer_offsets-9_.tar.xz] .

> log-cleaner thread terminates due to java.lang.IllegalStateException
> 
>
> Key: KAFKA-6762
> URL: https://issues.apache.org/jira/browse/KAFKA-6762
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
> Environment: os: GNU/Linux 
> arch: x86_64 
> Kernel: 4.9.77 
> jvm: OpenJDK 1.8.0
>Reporter: Ricardo Bartolome
>Priority: Major
> Attachments: __consumer_offsets-9_.tar.xz
>
>
> We are experiencing some problems with kafka log-cleaner thread on Kafka 
> 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order 
> to fix KAFKA-6683, but until then we can only confirm that it happens in 
> 1.0.0.
> log-cleaner thread crashes after a while with the following error:
> {code:java}
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_of

[jira] [Commented] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2018-05-24 Thread pradeep kumbhar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488628#comment-16488628
 ] 

pradeep kumbhar commented on KAFKA-3410:


Does the fix for KAFKA-1211 solves this issue?

> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Priority: Major
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6762) log-cleaner thread terminates due to java.lang.IllegalStateException

2018-05-24 Thread Uwe Eisele (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488588#comment-16488588
 ] 

Uwe Eisele commented on KAFKA-6762:
---

After a restart of the broker, log-cleaner thread runs in the same problem 
again. During debugging of the log cleaner we saw that the affected log 
segement only contains deletable control batches in at least the first 112 
bytes. In this case the log cleaner states that zero messages have been read 
and finally throws the IllegalStateExeption. It seems that the log cleaner can 
not handle larger chunks of a segment that only contain deletable records.


{code:java}
092 class LogCleaner ... {
...
558 private[log] def cleanInto(...) {
595   while (position < sourceRecords.sizeInBytes) {
...
601  sourceRecords.readInto(readBuffer, position)
602  val records = MemoryRecords.readableRecords(readBuffer)
...
604  val result = records.filterTo(topicPartition, logCleanerFilter, 
writeBuffer, maxLogMessageSize, decompressionBufferSupplier) //in this case an 
empty result with zero read messages is returned, though the read buffer 
contains records (as mentioned above all of them are deletable control batches)
...
626  if (readBuffer.limit() > 0 && result.messagesRead == 0)
627growBuffers(maxLogMessageSize) // throws the IllegalStateException 
if the readBuffer has reached the max messages size 
628   }
...
{code}

The reason that no records are returned in the filter method is the following 
code block in the MemoryRecords class:
{code:java}
141 private static FilterResult filterTo(...) {
...
154   for (MutableRecordBatch batch : batches) {
155 bytesRead += batch.sizeInBytes();
156
157 BatchRetention batchRetention = filter.checkBatchRetention(batch);
158 if (batchRetention == BatchRetention.DELETE)
159   continue;
{code}

The segment which caused this problem can be found here 
[^__consumer_offsets-9_.tar.xz] .

> log-cleaner thread terminates due to java.lang.IllegalStateException
> 
>
> Key: KAFKA-6762
> URL: https://issues.apache.org/jira/browse/KAFKA-6762
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
> Environment: os: GNU/Linux 
> arch: x86_64 
> Kernel: 4.9.77 
> jvm: OpenJDK 1.8.0
>Reporter: Ricardo Bartolome
>Priority: Major
> Attachments: __consumer_offsets-9_.tar.xz
>
>
> We are experiencing some problems with kafka log-cleaner thread on Kafka 
> 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order 
> to fix KAFKA-6683, but until then we can only confirm that it happens in 
> 1.0.0.
> log-cleaner thread crashes after a while with the following error:
> {code:java}
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-31. (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for 
> __consumer_offsets-31... (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). 
> (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log 
> __consumer_offsets-31 complete. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 
> (cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior 
> to Sat Feb 24 11:04:21 GMT 2018
> )... (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 
> 0, discarding deletes. (kafka.log.LogClea
> ner)
> [2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 262144bytes to 524288 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 524288bytes to 112 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.lang.IllegalStateException: This log contains a message larger than 
> maximum allowable size of 112.
> at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459)
> at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396)
> at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395)
> at scala.collection.immutable.List.foreach(List.scala:389)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:395)
> at kafka.log.Cleaner.clean(LogCleaner.scala:372)
> at 
> kafka.log.LogCleaner$CleanerThrea

[jira] [Updated] (KAFKA-6762) log-cleaner thread terminates due to java.lang.IllegalStateException

2018-05-24 Thread Uwe Eisele (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uwe Eisele updated KAFKA-6762:
--
Attachment: __consumer_offsets-9_.tar.xz

> log-cleaner thread terminates due to java.lang.IllegalStateException
> 
>
> Key: KAFKA-6762
> URL: https://issues.apache.org/jira/browse/KAFKA-6762
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
> Environment: os: GNU/Linux 
> arch: x86_64 
> Kernel: 4.9.77 
> jvm: OpenJDK 1.8.0
>Reporter: Ricardo Bartolome
>Priority: Major
> Attachments: __consumer_offsets-9_.tar.xz
>
>
> We are experiencing some problems with kafka log-cleaner thread on Kafka 
> 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order 
> to fix KAFKA-6683, but until then we can only confirm that it happens in 
> 1.0.0.
> log-cleaner thread crashes after a while with the following error:
> {code:java}
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-31. (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for 
> __consumer_offsets-31... (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). 
> (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log 
> __consumer_offsets-31 complete. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 
> (cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior 
> to Sat Feb 24 11:04:21 GMT 2018
> )... (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 
> 0, discarding deletes. (kafka.log.LogClea
> ner)
> [2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 262144bytes to 524288 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 524288bytes to 112 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.lang.IllegalStateException: This log contains a message larger than 
> maximum allowable size of 112.
> at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459)
> at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396)
> at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395)
> at scala.collection.immutable.List.foreach(List.scala:389)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:395)
> at kafka.log.Cleaner.clean(LogCleaner.scala:372)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> [2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,773] INFO The cleaning for partition 
> __broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,773] INFO Compaction for partition 
> __broker-11-health-check-0 is resumed (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,774] INFO The cleaning for partition 
> __broker-11-health-check-0 is aborted (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown 
> completed (kafka.log.LogCleaner)
> {code}
> What we know so far is:
>  * We are unable to reproduce it yet in a consistent manner.
>  * It only happens in the PRO cluster and not in the PRE cluster for the same 
> customer (which message payloads are very similar)
>  * Checking our Kafka logs, it only happened on the internal topics 
> *__consumer_offsets-**
>  * When we restart the broker process the log-cleaner starts working again 
> but it can take between 3 minutes and some hours to die again.
>  * We workaround it by temporary increasing the message.max.bytes and 
> replica.fetch.max.bytes values to 10485760 (10MB) from default 112 (~1MB).
> ** Before message.max.bytes = 10MB, we tried to match message.max.size with 
> the value of replica.fetch.max.size (1048576), but log-cleaned died with the 
> same error but different limit.
>  

[jira] [Commented] (KAFKA-6936) Scala API Wrapper for Streams uses default serializer for table aggregate

2018-05-24 Thread Daniel Heinrich (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488550#comment-16488550
 ] 

Daniel Heinrich commented on KAFKA-6936:


ah, didn't see that PR. Yes this will fix this issue.

> Scala API Wrapper for Streams uses default serializer for table aggregate
> -
>
> Key: KAFKA-6936
> URL: https://issues.apache.org/jira/browse/KAFKA-6936
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Daniel Heinrich
>Priority: Major
>
> On of the goals of the Scala API is to not fall back on the configured 
> default serializer, but let the compiler provide them through implicits.
> The aggregate method on KGroupedStream misses to achieve this goal.
> Compared to the Java API is this behavior very supprising, because no other 
> stream operation falls back to the default serializer and a developer assums, 
> that the compiler checks for the correct serializer type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)