[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2018-09-04 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602839#comment-16602839
 ] 

Rajini Sivaram commented on KAFKA-7304:
---

[~yuyang08] Thank you. Would it be possible to share the heap dump from an OOM 
(either the original one reported or a test scenario that resulted in OOM)? I 
am hoping it would compress well, but not sure if it will be possible to upload 
on the JIRA. The difference in memory usage between jdk 10.0.2 and 1.8 may be 
GC timing, since the peak usage is much lower than the allocated heap. But it 
will be good if you can generate a heap dump with live objects and check if the 
number of KafkaChannels referenced from Selector is much higher than expected.

[~ijuma] You mentioned in a comment: "The thing we need to figure out is why we 
have a connection in both `channels` and `closingChannels`. It should be in one 
or the other and we do the right thing if that is the case." Where did you see 
the same channel in both?


> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at 
> 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot 
> 2018-08-29 at 10.50.47 AM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7374) Tiered Storage

2018-09-04 Thread JIRA
Maciej Bryński created KAFKA-7374:
-

 Summary: Tiered Storage
 Key: KAFKA-7374
 URL: https://issues.apache.org/jira/browse/KAFKA-7374
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.0.0
Reporter: Maciej Bryński


Both Pravega and Pulsar gives possibility to use tiered storage.
We can store old messages on different FS like S3 or HDFS.

Kafka should give similar possibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7258) Different offset numbers in Windows and Linux

2018-09-04 Thread Ansel Zandegran (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ansel Zandegran updated KAFKA-7258:
---
Affects Version/s: 2.0.0

> Different offset numbers in Windows and Linux
> -
>
> Key: KAFKA-7258
> URL: https://issues.apache.org/jira/browse/KAFKA-7258
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1, 2.0.0
> Environment: windows10, Cent OS, Java8
>Reporter: Ansel Zandegran
>Priority: Major
>
> We are building a metrics reporter and one of the tests find different offset 
> (6) in Linux. The following tests pass in Windows
> {code:java}
> createTopic(TOPIC1, 1, 1); // 1 partition 1 replication
> runConsumer(TOPIC1); 
> runProducer(TOPIC1, "key", "value", 5); // 5 messages
> zkMetricsReporter.sendConsumerLagMetrics(Clock.defaultClock().time() /
> 1000);
> PartionOffsets partionOffsets = getPartionOffset(); // For topic1-0
> assertEquals(5, partionOffsets.getCurrentOffset());
> assertEquals(5, partionOffsets.getEndOffset());
> {code}
> and this is how we get offsets
> {code:java}
> Map logEndOffset = getLogEndOffset(topic, host);
> KafkaConsumer consumer = createNewConsumer(groupId, host);
> BinaryOperator mergeFunction = (a, b) -> {
> throw new IllegalStateException();
> };
> Map result =
> logEndOffset.entrySet()
> .stream()
> .collect(Collectors.toMap(entry -> (entry.getKey()),
> entry -> {
> OffsetAndMetadata committed =
> consumer.committed(entry.getKey());
> return new PartionOffsets(entry.getValue(),
> committed.offset(),
> entry.getKey()
> .partition(),
> topic);
> },
> mergeFunction));
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7375) Improve error messages verbosity

2018-09-04 Thread Seweryn Habdank-Wojewodzki (JIRA)
Seweryn Habdank-Wojewodzki created KAFKA-7375:
-

 Summary: Improve error messages verbosity
 Key: KAFKA-7375
 URL: https://issues.apache.org/jira/browse/KAFKA-7375
 Project: Kafka
  Issue Type: Task
Affects Versions: 1.1.1
Reporter: Seweryn Habdank-Wojewodzki


Dears,

Very often when clients are trying to connect we see in Kafka logs:

{code}
“org.apache.kafka.common.network.SslTransportLayer  - Failed to send SSL Close 
message“
{code}

The problem here is following: there is no word who? No IP, no client addres, 
nothing.

Would be great to have in all error or warning reports like this one, very 
precize information which client has a problem, to be able to solve it. When 
the number of clients is more than 10, this message is completely useless and 
when there are even more clients it really spams logs.

Thanks in advance for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7376) After Kafka upgrade to v2.0.0 , Controller unable to communicate with brokers on SASL_SSL

2018-09-04 Thread Sridhar (JIRA)
Sridhar created KAFKA-7376:
--

 Summary: After Kafka upgrade to v2.0.0 , Controller unable to 
communicate with brokers on SASL_SSL
 Key: KAFKA-7376
 URL: https://issues.apache.org/jira/browse/KAFKA-7376
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 2.0.0
Reporter: Sridhar


Hi ,

We upgraded our Kafka cluster (3x nodes running on AWS cloud) to 2.0.0 version 
and enabled security with SASL_SSL (plain) encryption for Inter-broker and 
Client connection . 

But there are lot of errors in the controller log for the inter-broker 
communication .I have the followed exactly same steps as mentioned in the 
document and set all kafka brokers fqdn hostname in the SAN 
(SubjectAlternativeName) of my server certificate (selfsigned) .

[http://kafka.apache.org/documentation.html#security|http://example.com/]

 

openssl s_client -connect kafka-reghub-d2-3:9093
CONNECTED(0003)
depth=1

Noticed someone else also facing the similar problem .

[https://github.com/confluentinc/common/issues/158]

 

 
{noformat}
Server Configuration : 
listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093
advertised.listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093
#Security
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin
ssl.client.auth=required
ssl.endpoint.identification.algorithm=
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=
ssl.key.password=

#Zookeeper
zookeeper.connect=zk-reghub-d2-1:2181,zk-reghub-d2-2:2181,zk-reghub-d2-3:2181
zookeeper.connection.timeout.ms=6000
{noformat}
 

 
{code:java}
 
[2018-09-04 12:02:57,289] WARN [RequestSendThread controllerId=2] Controller 
2's connection to broker kafka-3:9093 (id: 3 rack: eu-central-1c) was 
unsuccessful (kafka.controller.RequestSendThread)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1529)
at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:134)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:330)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:322)
at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1614)
at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1052)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:992)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:989)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1467)
at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:473)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:331)
... 9 more
Caused by: java.security.cert.CertificateException: No name matching kafka-3 
found
at sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:231)
at sun.security.util.HostnameChecker.match(HostnameChecker.java:96)
at 
sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
at 
sun.security.ssl.X509T

[jira] [Updated] (KAFKA-7376) After Kafka upgrade to v2.0.0 , Controller unable to communicate with brokers on SASL_SSL

2018-09-04 Thread Sridhar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sridhar updated KAFKA-7376:
---
Description: 
Hi ,

We upgraded our Kafka cluster (3x nodes running on AWS cloud) to 2.0.0 version 
and enabled security with SASL_SSL (plain) encryption for Inter-broker and 
Client connection . 

But there are lot of errors in the controller log for the inter-broker 
communication .I have the followed exactly same steps as mentioned in the 
document and set all kafka brokers fqdn hostname in the SAN 
(SubjectAlternativeName) of my server certificate (selfsigned) .

[http://kafka.apache.org/documentation.html#security|http://example.com/]

 

openssl s_client -connect kafka-3:9093
 CONNECTED(0003)
 depth=1

Noticed someone else also facing the similar problem .

[https://github.com/confluentinc/common/issues/158]

 

 
{noformat}
Server Configuration : 
listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093
advertised.listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093
#Security
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin
ssl.client.auth=required
ssl.endpoint.identification.algorithm=
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=
ssl.key.password=

#Zookeeper
zookeeper.connect=zk-reghub-d2-1:2181,zk-reghub-d2-2:2181,zk-reghub-d2-3:2181
zookeeper.connection.timeout.ms=6000
{noformat}
 

 
{code:java}
 
[2018-09-04 12:02:57,289] WARN [RequestSendThread controllerId=2] Controller 
2's connection to broker kafka-3:9093 (id: 3 rack: eu-central-1c) was 
unsuccessful (kafka.controller.RequestSendThread)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1529)
at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:134)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:330)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:322)
at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1614)
at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1052)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:992)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:989)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1467)
at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:473)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:331)
... 9 more
Caused by: java.security.cert.CertificateException: No name matching kafka-3 
found
at sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:231)
at sun.security.util.HostnameChecker.match(HostnameChecker.java:96)
at 
sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
at 
sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436)
at 
sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252)
at 
sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.

[jira] [Updated] (KAFKA-7376) After Kafka upgrade to v2.0.0 , Controller unable to communicate with brokers on SASL_SSL

2018-09-04 Thread Sridhar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sridhar updated KAFKA-7376:
---
Description: 
Hi ,

We upgraded our Kafka cluster (3x nodes running on AWS cloud) to 2.0.0 version 
and enabled security with SASL_SSL (plain) encryption for Inter-broker and 
Client connection . 

But there are lot of errors in the controller log for the inter-broker 
communication .I have the followed exactly same steps as mentioned in the 
document and set all kafka brokers fqdn hostname in the SAN 
(SubjectAlternativeName) of my server certificate (selfsigned) .

[http://kafka.apache.org/documentation.html#security|http://example.com/]

 

openssl s_client -connect kafka-3:9093
 CONNECTED(0003)
 depth=1

Noticed someone else also facing the similar problem .

[https://github.com/confluentinc/common/issues/158]

 

 
{noformat}
Server Configuration : 
listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093
advertised.listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093
#Security
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin
ssl.client.auth=required
ssl.endpoint.identification.algorithm=
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=
ssl.key.password=

#Zookeeper
zookeeper.connect=zk-1:2181,zk-2:2181,zk-3:2181
zookeeper.connection.timeout.ms=6000
{noformat}
 

 
{code:java}
 
[2018-09-04 12:02:57,289] WARN [RequestSendThread controllerId=2] Controller 
2's connection to broker kafka-3:9093 (id: 3 rack: eu-central-1c) was 
unsuccessful (kafka.controller.RequestSendThread)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1529)
at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:134)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:330)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:322)
at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1614)
at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1052)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:992)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:989)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1467)
at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:473)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:331)
... 9 more
Caused by: java.security.cert.CertificateException: No name matching kafka-3 
found
at sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:231)
at sun.security.util.HostnameChecker.match(HostnameChecker.java:96)
at 
sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
at 
sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436)
at 
sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252)
at 
sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
at 
sun.security.ssl

[jira] [Resolved] (KAFKA-949) Integrate kafka into YARN

2018-09-04 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-949.
-
Resolution: Resolved

Resolving as duplicate of KAFKA-1754

> Integrate kafka into YARN
> -
>
> Key: KAFKA-949
> URL: https://issues.apache.org/jira/browse/KAFKA-949
> Project: Kafka
>  Issue Type: New Feature
>  Components: contrib
>Affects Versions: 0.8.0
> Environment: hadoop 2-0.X
>Reporter: Kam Kasravi
>Priority: Major
>
> kafka is being added to bigtop (BIGTOP-989). Having kafka services available 
> under YARN will enable a number of cluster operations for kafka that YARN 
> handles.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7376) After Kafka upgrade to v2.0.0 , Controller unable to communicate with brokers on SASL_SSL

2018-09-04 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603209#comment-16603209
 ] 

Ismael Juma commented on KAFKA-7376:


Probably related to the ssl.endpoint.identification.algorithm change, the 
following is from the release notes:

{code}
The default value for ssl.endpoint.identification.algorithm was changed to 
https, which performs hostname verification (man-in-the-middle attacks are 
possible otherwise). Set ssl.endpoint.identification.algorithm to an empty 
string to restore the previous behaviour.
{code}

> After Kafka upgrade to v2.0.0 , Controller unable to communicate with brokers 
> on SASL_SSL
> -
>
> Key: KAFKA-7376
> URL: https://issues.apache.org/jira/browse/KAFKA-7376
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sridhar
>Priority: Major
>
> Hi ,
> We upgraded our Kafka cluster (3x nodes running on AWS cloud) to 2.0.0 
> version and enabled security with SASL_SSL (plain) encryption for 
> Inter-broker and Client connection . 
> But there are lot of errors in the controller log for the inter-broker 
> communication .I have the followed exactly same steps as mentioned in the 
> document and set all kafka brokers fqdn hostname in the SAN 
> (SubjectAlternativeName) of my server certificate (selfsigned) .
> [http://kafka.apache.org/documentation.html#security|http://example.com/]
>  
> openssl s_client -connect kafka-3:9093
>  CONNECTED(0003)
>  depth=1
> Noticed someone else also facing the similar problem .
> [https://github.com/confluentinc/common/issues/158]
>  
>  
> {noformat}
> Server Configuration : 
> listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093
> advertised.listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093
> #Security
> authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
> allow.everyone.if.no.acl.found=false
> security.inter.broker.protocol=SASL_SSL
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
> super.users=User:admin
> ssl.client.auth=required
> ssl.endpoint.identification.algorithm=
> ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
> ssl.truststore.password=
> ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
> ssl.keystore.password=
> ssl.key.password=
> #Zookeeper
> zookeeper.connect=zk-1:2181,zk-2:2181,zk-3:2181
> zookeeper.connection.timeout.ms=6000
> {noformat}
>  
>  
> {code:java}
>  
> [2018-09-04 12:02:57,289] WARN [RequestSendThread controllerId=2] Controller 
> 2's connection to broker kafka-3:9093 (id: 3 rack: eu-central-1c) was 
> unsuccessful (kafka.controller.RequestSendThread)
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
> at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1529)
> at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
> at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
> at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
> at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
> at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:439)
> at 
> org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:304)
> at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
> at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:134)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
> at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
> at 
> kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279)
> at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
> at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
> at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
> at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:330)
> at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:322)
> at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1614)
> at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
> at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1052)
> at sun.security.ssl.Handshaker$1.run(Handshaker.java:992)
> at sun.sec

[jira] [Resolved] (KAFKA-1950) Expose methods from SerializationTestUtils to all tests

2018-09-04 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1950.
--
Resolution: Auto Closed

Closing as SerializationTestUtils class is removed from code.

> Expose methods from SerializationTestUtils to all tests
> ---
>
> Key: KAFKA-1950
> URL: https://issues.apache.org/jira/browse/KAFKA-1950
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Priority: Major
>
> There are some super-useful setup methods in SerializationTestUtils. They can 
> be used to make a lot of our test code simpler to write and read.
> I suggest pulling them into a more general test utils object (outside the api 
> package) and exposing more of the methods as public.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (KAFKA-7363) How num.stream.threads in streaming application influence memory consumption?

2018-09-04 Thread Seweryn Habdank-Wojewodzki (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki closed KAFKA-7363.
-

> How num.stream.threads in streaming application influence memory consumption?
> -
>
> Key: KAFKA-7363
> URL: https://issues.apache.org/jira/browse/KAFKA-7363
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> Dears,
> How option _num.stream.threads_ in streaming application influence memory 
> consumption?
> I see that by increasing num.stream.threads my application needs more memory.
> This is obvious, but it is not obvious how much I need to give it. Try and 
> error method does not work, as it seems to be highly dependen on forced 
> throughput.
> I mean: higher load more memory is needed.
> Thanks for help and regards,
> Seweryn.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7363) How num.stream.threads in streaming application influence memory consumption?

2018-09-04 Thread Seweryn Habdank-Wojewodzki (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603250#comment-16603250
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-7363:
---

Thanks for hints.

Honestly spoken I would expect extended information in the documentation, 
as it is now completely impossible to plan server resources when using kafka 
streams.

I was experimenting quite a lot, and I cannot clearly create my resource model.

I mean I am getting from operation and customers information how many messages 
per second they will be send and either how many MB perseconds they will send 
or what is the avarage message size.

Then I need to allocate more resources - clear. But how much?

I was guessing, and there give more memory or CPUs. 
Cool, but this is not really engineering, this is speculation.
When comes next users with yet another requirements, I will guess again - not 
very effective.

But I am accepting proposal to send mail to the mailing list and closing ticket.

> How num.stream.threads in streaming application influence memory consumption?
> -
>
> Key: KAFKA-7363
> URL: https://issues.apache.org/jira/browse/KAFKA-7363
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> Dears,
> How option _num.stream.threads_ in streaming application influence memory 
> consumption?
> I see that by increasing num.stream.threads my application needs more memory.
> This is obvious, but it is not obvious how much I need to give it. Try and 
> error method does not work, as it seems to be highly dependen on forced 
> throughput.
> I mean: higher load more memory is needed.
> Thanks for help and regards,
> Seweryn.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3971) Consumers drop from coordinator and cannot reconnect

2018-09-04 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3971.
--
Resolution: Auto Closed

Closing inactive issue. Please reopen if the issue still exists in newer 
versions.

> Consumers drop from coordinator and cannot reconnect
> 
>
> Key: KAFKA-3971
> URL: https://issues.apache.org/jira/browse/KAFKA-3971
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
> Environment: version 0.9.0.1
>Reporter: Lei Wang
>Priority: Major
>  Labels: reliability
> Attachments: KAFKA-3971.txt
>
>
> From time to time, we're creating new topics, and all consumers will pickup 
> those new topics. When starting to consume from these new topics, we often 
> see some of random consumers cannot connect to the coordinator. The log will 
> be flushed with the following log message tens of thousands every second:
> {noformat}
> 16/07/18 18:18:36.003 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.004 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> 16/07/18 18:18:36.005 INFO (AbstractCoordinator.java:529): Marking the 
> coordinator 2147483645 dead.
> {noformat}
> the servers seem working fine, and other consumers are also happy.
> from the log, looks like it's retrying multiple times every millisecond but 
> all failing.
> the same process are consuming from many topics, some of them are still 
> working well, but those random topics will fail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4949) Calling kaka-consumer-group.sh to get the consumer offset throws OOM with heap space error

2018-09-04 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4949.
--
Resolution: Duplicate

Resolving as duplicate of KAFKA-4090

> Calling kaka-consumer-group.sh to get the consumer offset throws OOM with 
> heap space error
> --
>
> Key: KAFKA-4949
> URL: https://issues.apache.org/jira/browse/KAFKA-4949
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.10.1.0
>Reporter: T Rao
>Priority: Major
>
> Command --> 
> bin/kafka-consumer-groups.sh --bootstrap-server 
> broker1:9092,broker2:9092,broker3:9092 --describe --group testgroups
> Error 
> -
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>   at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
>   at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> kafka.admin.AdminClient.kafka$admin$AdminClient$$send(AdminClient.scala:49)
>   at 
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:61)
>   at 
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:58)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:58)
>   at kafka.admin.AdminClient.findCoordinator(AdminClient.scala:72)
>   at kafka.admin.AdminClient.describeGroup(AdminClient.scala:125)
>   at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:147)
>   at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:308)
>   at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:89)
>   at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describe(ConsumerGroupCommand.scala:296)
>   at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
>   at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7211) MM should handle timeouts in commitSync

2018-09-04 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7211.
-
Resolution: Fixed

> MM should handle timeouts in commitSync
> ---
>
> Key: KAFKA-7211
> URL: https://issues.apache.org/jira/browse/KAFKA-7211
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
> Fix For: 2.1.0
>
>
> Now that we have KIP-266, the user can override `default.api.timeout.ms` for 
> the consumer so that commitSync does not block indefinitely. MM needs to be 
> updated to handle TimeoutException. We may also need some logic to handle 
> deleted topics. If MM attempts to commit an offset for a deleted topic, the 
> call will timeout and we should probably check if the topic exists and remove 
> the offset if it doesn't.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7211) MM should handle timeouts in commitSync

2018-09-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603591#comment-16603591
 ] 

ASF GitHub Bot commented on KAFKA-7211:
---

lindong28 closed pull request #5492: KAFKA-7211: MM should handle 
TimeoutException in commitSync
URL: https://github.com/apache/kafka/pull/5492
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index d7e09e4efdb..1ddcedbd487 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -33,17 +33,18 @@ import 
org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.errors.{TimeoutException, WakeupException}
 import org.apache.kafka.common.record.RecordBatch
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
+import scala.util.{Failure, Success, Try}
 import scala.util.control.ControlThrowable
 
 /**
  * The mirror maker has the following architecture:
- * - There are N mirror maker thread shares one ZookeeperConsumerConnector and 
each owns a Kafka stream.
+ * - There are N mirror maker thread, each of which is equipped with a 
separate KafkaConsumer instance.
  * - All the mirror maker threads share one producer.
  * - Each mirror maker thread periodically flushes the producer and then 
commits all offsets.
  *
@@ -69,6 +70,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   private var offsetCommitIntervalMs = 0
   private var abortOnSendFailure: Boolean = true
   @volatile private var exitingOnSendFailure: Boolean = false
+  private var lastSuccessfulCommitTime = -1L
+  private val time = Time.SYSTEM
 
   // If a message send failed after retries are exhausted. The offset of the 
messages will also be removed from
   // the unacked offset list to avoid offset commit being stuck on that 
offset. In this case, the offset of that
@@ -267,24 +270,45 @@ object MirrorMaker extends Logging with KafkaMetricsGroup 
{
 consumers.map(consumer => new ConsumerWrapper(consumer, 
customRebalanceListener, whitelist))
   }
 
-  def commitOffsets(consumerWrapper: ConsumerWrapper) {
+  def commitOffsets(consumerWrapper: ConsumerWrapper): Unit = {
 if (!exitingOnSendFailure) {
-  trace("Committing offsets.")
-  try {
-consumerWrapper.commit()
-  } catch {
-case e: WakeupException =>
-  // we only call wakeup() once to close the consumer,
-  // so if we catch it in commit we can safely retry
-  // and re-throw to break the loop
+  var retry = 0
+  var retryNeeded = true
+  while (retryNeeded) {
+trace("Committing offsets.")
+try {
   consumerWrapper.commit()
-  throw e
+  lastSuccessfulCommitTime = time.milliseconds
+  retryNeeded = false
+} catch {
+  case e: WakeupException =>
+// we only call wakeup() once to close the consumer,
+// so if we catch it in commit we can safely retry
+// and re-throw to break the loop
+commitOffsets(consumerWrapper)
+throw e
+
+  case _: TimeoutException =>
+Try(consumerWrapper.consumer.listTopics) match {
+  case Success(visibleTopics) =>
+consumerWrapper.offsets.retain((tp, _) => 
visibleTopics.containsKey(tp.topic))
+  case Failure(e) =>
+warn("Failed to list all authorized topics after committing 
offsets timed out: ", e)
+}
 
-case _: CommitFailedException =>
-  warn("Failed to commit offsets because the consumer group has 
rebalanced and assigned partitions to " +
-"another instance. If you see this regularly, it could indicate 
that you need to either increase " +
-s"the consumer's ${ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or 
reduce the number of records " +
-s"handled on each iteration with 
${ConsumerConfig.MAX_POLL_RECORDS_CONFIG}")
+retry += 1
+warn("Failed to commit offsets because the offset commit request 
processing can not be completed in time. " +
+  s"If you see

[jira] [Updated] (KAFKA-7211) MM should handle timeouts in commitSync

2018-09-04 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7211:

Fix Version/s: 2.1.0

> MM should handle timeouts in commitSync
> ---
>
> Key: KAFKA-7211
> URL: https://issues.apache.org/jira/browse/KAFKA-7211
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
> Fix For: 2.1.0
>
>
> Now that we have KIP-266, the user can override `default.api.timeout.ms` for 
> the consumer so that commitSync does not block indefinitely. MM needs to be 
> updated to handle TimeoutException. We may also need some logic to handle 
> deleted topics. If MM attempts to commit an offset for a deleted topic, the 
> call will timeout and we should probably check if the topic exists and remove 
> the offset if it doesn't.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7377) update metrics module from yammer to dropwizrd

2018-09-04 Thread RAJKUMAR NATARAJAN (JIRA)
RAJKUMAR NATARAJAN created KAFKA-7377:
-

 Summary: update metrics module from yammer to dropwizrd
 Key: KAFKA-7377
 URL: https://issues.apache.org/jira/browse/KAFKA-7377
 Project: Kafka
  Issue Type: Improvement
  Components: metrics
Affects Versions: 2.0.0, 1.1.1, 1.0.0
Reporter: RAJKUMAR NATARAJAN
 Fix For: 1.0.3, 1.1.2, 2.0.1, 2.1.0


Current kafka metrics depends on yammers. Please see depencies below - 

[https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.12/2.0.0]

 

Yammer is outdated long time back. It would be good to update the metrics 
module to dropwizard metrics.

 

[https://mvnrepository.com/artifact/com.codahale.metrics/metrics-core]

 

[https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

2018-09-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603656#comment-16603656
 ] 

ASF GitHub Bot commented on KAFKA-7321:
---

xiowu0 opened a new pull request #5611: KAFKA-7321: time based log compaction 
(KIP-354)
URL: https://github.com/apache/kafka/pull/5611
 
 
   This is to implement KIP-354.
   More detailed information can be found in KIP-354
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ensure timely processing of deletion requests in Kafka topic (Time-based log 
> compaction)
> 
>
> Key: KAFKA-7321
> URL: https://issues.apache.org/jira/browse/KAFKA-7321
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> _Compaction enables Kafka to remove old messages that are flagged for 
> deletion while other messages can be retained for a relatively longer time.  
> Today, a log segment may remain un-compacted for a long time since the 
> eligibility for log compaction is determined based on compaction ratio 
> (“min.cleanable.dirty.ratio”) and min compaction lag 
> ("min.compaction.lag.ms") setting.  Ability to delete a log message through 
> compaction in a timely manner has become an important requirement in some use 
> cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal 
> Identifiable information) data within 7 days while keeping non-PII 
> indefinitely in compacted format.  The goal of this change is to provide a 
> time-based compaction policy that ensures the cleanable section is compacted 
> after the specified time interval regardless of dirty ratio and “min 
> compaction lag”.  However, dirty ratio and “min compaction lag” are still 
> honored if the time based compaction rule is not violated. In other words, if 
> Kafka receives a deletion request on a key (e..g, a key with null value), the 
> corresponding log segment will be picked up for compaction after the 
> configured time interval to remove the key._
>  
> _This is to track effort in KIP 354:_
> _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7363) How num.stream.threads in streaming application influence memory consumption?

2018-09-04 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603670#comment-16603670
 ] 

Matthias J. Sax commented on KAFKA-7363:


[~habdank] We are always happy to improve the documentation. Please feel free 
to contribute and open an PR to add a section about sizing.

> How num.stream.threads in streaming application influence memory consumption?
> -
>
> Key: KAFKA-7363
> URL: https://issues.apache.org/jira/browse/KAFKA-7363
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> Dears,
> How option _num.stream.threads_ in streaming application influence memory 
> consumption?
> I see that by increasing num.stream.threads my application needs more memory.
> This is obvious, but it is not obvious how much I need to give it. Try and 
> error method does not work, as it seems to be highly dependen on forced 
> throughput.
> I mean: higher load more memory is needed.
> Thanks for help and regards,
> Seweryn.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7365) max.poll.records setting in Kafka Consumer is not working

2018-09-04 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603760#comment-16603760
 ] 

Matthias J. Sax commented on KAFKA-7365:


This might help to understand session.timeout.ms and max.poll.internal.ms 
configs: 
[https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10-0/]

Also not sure, what you mean by "has processed all the 4 messages in single 
poll"? Note, that fetching from Kafka and poll() is independent. A single fetch 
return data base on `max.fetch.bytes` – this can be more than 
`max.poll.records` – however, on a single poll() call, only `max.poll.records` 
should be return in the iterator while all other are buffered within the 
consumer. If you call poll() again, the next records are returned from the 
buffer without sending another fetch request to the broker.

Also, what version do you use?

> max.poll.records setting in Kafka Consumer is not working
> -
>
> Key: KAFKA-7365
> URL: https://issues.apache.org/jira/browse/KAFKA-7365
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Kashyap Ivaturi
>Priority: Major
>
> Hi,
> I have a requirement where I consume messages one by one, each message has 
> additional processing that I should do and then manually commit the offset.
> Things work well most of the times until I get a big bunch of records which 
> takes longer time to process and I encounter CommitFailed exception for the 
> last set of records even though they were processed. While i'am able to 
> reconnect back its picking some messages that I had already processed. I 
> don't want this to happen as its creating duplicates in target systems that I 
> integrate with while processing the message.
>  
> I decided that even though there are more messages in the queue , I would 
> like to have a control on how many records I can process when polled.
> I tried to replicate a scenario where I have started the consumer by setting 
> 'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
> consumer is listening.
> I expected that the consumer will only process 1 message because of my 
> 'max.poll.records' setting but the consumer has processed all the 4 messages 
> in single poll. Any idea why did it not consider 'max.poll.records' setting 
> or is some other setting overriding this setting?. Appreciate your help or 
> guidance in troubleshooting this issue.
> Here is the log of my Consumer config when it starts:
>  
> 2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
> [auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [messaging-rtp3.cisco.com:9093]
> check.crcs = true
> [client.id|https://client.id/] = 
> [connections.max.idle.ms|https://connections.max.idle.ms/] = 54
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> [fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500
> fetch.min.bytes = 1
> [group.id|https://group.id/] = empestor
> [heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000
> interceptor.classes = null
> internal.leave.group.on.close = true
> isolation.level = read_uncommitted
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> [max.poll.interval.ms|https://max.poll.interval.ms/] = 30
> max.poll.records = 1
> [metadata.max.age.ms|https://metadata.max.age.ms/] = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> [metrics.sample.window.ms|https://metrics.sample.window.ms/] = 3
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> [reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000
> [reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50
> [request.timeout.ms|https://request.timeout.ms/] = 4
> [retry.backoff.ms|https://retry.backoff.ms/] = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = SSL
> send.buffer.bytes = 131072
> [session.timeout.ms|https://session.timeout.ms/] = 1
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = [hidden]
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = 
> /kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jk