[jira] [Updated] (KAFKA-6287) Inconsistent protocol type for empty consumer groups

2018-01-17 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6287:
-
Fix Version/s: 1.0.1

> Inconsistent protocol type for empty consumer groups
> 
>
> Key: KAFKA-6287
> URL: https://issues.apache.org/jira/browse/KAFKA-6287
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Ryan Leslie
>Assignee: Jason Gustafson
>Priority: Minor
> Fix For: 1.0.1
>
>
> When a consumer is created for a new group, the group metadata's protocol 
> type is set to 'consumer' and this is stored both in __consumer_offsets as 
> well as in the coordinator's local cache.
> If the consumer leaves the group and the group becomes empty, ListGroups 
> requests will continue to show the group as type 'consumer', and as such 
> kafka-consumer-groups.sh will show it via --list.
> However, if the coordinator (broker) node is killed and a new coordinator is 
> elected, when the GroupMetadataManager loads the group from 
> __consumer_offsets into its cache, it will not set the protocolType if there 
> are no active consumers. As a result, the group's protocolType will now 
> become the empty string (UNKNOWN_PROTOCOL_TYPE), and kafka-consumer-groups.sh 
> will no longer show the group.
> Ideally bouncing a broker should not result in the group's protocol changing. 
> protocolType can be set in GroupMetadataManager.readGroupMessageValue() to 
> always reflect what's present in the persistent metadata (__consumer_offsets) 
> regardless if there are active members.
> In general, things can get confusing when distinguishing between 'consumer' 
> and non-consumer groups. For example, DescribeGroups and OffsetFetchRequest 
> does not filter on protocol type, so it's possible for 
> kafka-consumer-groups.sh to describe groups (--describe) without actually 
> being able to list them. In the protocol guide, OffsetFetchRequest / 
> OffsetCommitRequest have their parameters listed as 'ConsumerGroup', but in 
> reality these can be used for groups of unknown type as well. For instance, a 
> consumer group can be copied by finding a coordinator 
> (GroupCoordinatorRequest / FindCoordinatorRequest) and sending an 
> OffsetCommitRequest. The group will be auto-created with an empty protocol. 
> Although this is arguably correct, the group will now exist but not be  a 
> proper 'consumer' group until later when there is a JoinGroupRequest. Again, 
> this can be confusing as far as categorization / visibility of the group is 
> concerned. A group can instead be copied more directly by creating a consumer 
> and calling commitSync (as kafka-consumer-groups.sh), but this does involve 
> extra connections / requests and so is a little slower when trying to keep a 
> large number of groups in sync in real-time across clusters.
> If we want to make it easier to keep consumer groups consistent, options 
> include allowing groups to be explicitly created with a protocol type instead 
> of only lazily, or for groups created outside of JoinGroupRequest the 
> coordinator can gain a broker config to set a default protocol type for 
> groups. This would default to 'consumer'. This sort of setting might be 
> confusing for users though, since implicitly created groups is certainly not 
> the norm.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6277) Make loadClass thread-safe for class loaders of Connect plugins

2018-01-17 Thread Konstantine Karantasis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-6277:
--
Priority: Blocker  (was: Major)

> Make loadClass thread-safe for class loaders of Connect plugins
> ---
>
> Key: KAFKA-6277
> URL: https://issues.apache.org/jira/browse/KAFKA-6277
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 1.0.1, 0.11.0.3
>
>
> In Connect's classloading isolation framework, {{PluginClassLoader}} class 
> encounters a race condition when several threads corresponding to tasks using 
> a specific plugin (e.g. a Connector) try to load the same class at the same 
> time on a single JVM. 
> The race condition is related to calls to method {{defineClass}} which, 
> contract to {{findClass}}, is not thread safe for classloaders that override 
> {{loadClass}}. More details here: 
> https://docs.oracle.com/javase/7/docs/technotes/guides/lang/cl-mt.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6302) Topic can not be recreated after it is deleted

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16329547#comment-16329547
 ] 

ASF GitHub Bot commented on KAFKA-6302:
---

mjsax closed pull request #4332: KAFKA-6302: Improve AdmintClient JavaDocs
URL: https://github.com/apache/kafka/pull/4332
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index abe666a4387..897e127d557 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -105,7 +105,7 @@ public CreateTopicsResult createTopics(Collection 
newTopics) {
  *
  * This operation is not transactional so it may succeed for some topics 
while fail for others.
  *
- * It may take several seconds after this method returns
+ * It may take several seconds after {@code CreateTopicsResult} returns
  * success for all the brokers to become aware that the topics have been 
created.
  * During this time, {@link AdminClient#listTopics()} and {@link 
AdminClient#describeTopics(Collection)}
  * may not return information about the new topics.
@@ -138,7 +138,7 @@ public DeleteTopicsResult deleteTopics(Collection 
topics) {
  *
  * This operation is not transactional so it may succeed for some topics 
while fail for others.
  *
- * It may take several seconds after AdminClient#deleteTopics returns
+ * It may take several seconds after the {@code DeleteTopicsResult} returns
  * success for all the brokers to become aware that the topics are gone.
  * During this time, AdminClient#listTopics and AdminClient#describeTopics
  * may continue to return information about the deleted topics.


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Topic can not be recreated after it is deleted
> --
>
> Key: KAFKA-6302
> URL: https://issues.apache.org/jira/browse/KAFKA-6302
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients
>Affects Versions: 1.0.0
>Reporter: kic
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 1.1.0, 1.0.1
>
>
> I use an embedded kafka for unit test. My application relies on the ability 
> to recreate topics programmatically. Currently it is not possible to 
> re-create a topic after it has been deleted.
> {code}
> // needs compile time depedency 
> 'net.manub:scalatest-embedded-kafka_2.11:1.0.0' and 
> 'org.apache.kafka:kafka-clients:1.0.0'
> package kic.kafka.embedded
> import java.util.Properties
> import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
> import org.scalatest._
> import scala.collection.JavaConverters._
> class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
>   val props = new Properties()
>   val testTopic = "test-topic"
>   "The admin client" should "be able to create, delete and re-create topics" 
> in {
> props.setProperty("bootstrap.servers", "localhost:10001")
> props.setProperty("delete.enable.topic", "true")
> props.setProperty("group.id", "test-client")
> props.setProperty("key.deserializer", 
> "org.apache.kafka.common.serialization.LongDeserializer")
> props.setProperty("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer")
> props.setProperty("clinet.id", "test-client")
> props.setProperty("key.serializer", 
> "org.apache.kafka.common.serialization.LongSerializer")
> props.setProperty("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer")
> EmbeddedKafaJavaWrapper.start(10001, 10002, props)
> try {
>   implicit val admin = AdminClient.create(props)
>   // create topic and confirm it exists
>   createTopic(testTopic)
>   val topics = listTopics()
>   info(s"topics: $topics")
>   topics should contain(testTopic)
>   // now we should be able to send something to this topic
>   // TODO create producer and send something
>   // delete topic
>   deleteTopic(testTopic)
>   listTopics() shouldNot contain(testTopic)
>   // recreate topic
>   createTopic(testTopic)
>   // listTopics() should contain(testTopic)
>   // and 

[jira] [Updated] (KAFKA-4186) Transient failure in KStreamAggregationIntegrationTest

2018-01-17 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4186:
-
Description: 
Saw this running locally off of trunk:

{code}
org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldGroupByKey[1] FAILED
java.lang.AssertionError: Condition not met within timeout 6. Did not 
receive 10 number of records
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211)
at 
org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480)
at 
org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407)
{code}



Re-opening this issue with a different test, but it seems the test suite itself 
has a timing-dependent flakiness.

{code}
Stacktrace
java.lang.AssertionError: Condition not met within timeout 3. metadata for 
topic=output-9 partition=0 not propagated to all brokers
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMetadataIsPropagated(IntegrationTestUtils.java:254)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForTopicPartitions(IntegrationTestUtils.java:246)
at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:209)
at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopics(EmbeddedKafkaCluster.java:168)
at 
org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.createTopics(KStreamAggregationIntegrationTest.java:680)
at 
org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.before(KStreamAggregationIntegrationTest.java:105)
...
{code}


Example: 
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/10564/testReport/junit/org.apache.kafka.streams.integration/KStreamAggregationIntegrationTest/shouldAggregateWindowed/

  was:
Saw this running locally off of trunk:

{code}
org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldGroupByKey[1] FAILED
java.lang.AssertionError: Condition not met within timeout 6. Did not 
receive 10 number of records
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211)
at 
org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480)
at 
org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407)
{code}


> Transient failure in KStreamAggregationIntegrationTest
> --
>
> Key: KAFKA-4186
> URL: https://issues.apache.org/jira/browse/KAFKA-4186
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Matthias J. Sax
>Priority: Major
>
> Saw this running locally off of trunk:
> {code}
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
> shouldGroupByKey[1] FAILED
> java.lang.AssertionError: Condition not met within timeout 6. Did not 
> receive 10 number of records
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407)
> {code}
> 
> Re-opening this issue with a different test, but it seems the test suite 
> itself has a timing-dependent flakiness.
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. metadata 
> for topic=output-9 partition=0 not propagated to all brokers
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMetadataIsPropagated(IntegrationTestUtils.java:254)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForTopicPartitions(IntegrationTestUtils.java:246)
>

[jira] [Reopened] (KAFKA-4186) Transient failure in KStreamAggregationIntegrationTest.shouldGroupByKey

2018-01-17 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-4186:
--
  Assignee: Matthias J. Sax  (was: Damian Guy)

> Transient failure in KStreamAggregationIntegrationTest.shouldGroupByKey
> ---
>
> Key: KAFKA-4186
> URL: https://issues.apache.org/jira/browse/KAFKA-4186
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Matthias J. Sax
>Priority: Major
>
> Saw this running locally off of trunk:
> {code}
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
> shouldGroupByKey[1] FAILED
> java.lang.AssertionError: Condition not met within timeout 6. Did not 
> receive 10 number of records
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4186) Transient failure in KStreamAggregationIntegrationTest

2018-01-17 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4186:
-
Summary: Transient failure in KStreamAggregationIntegrationTest  (was: 
Transient failure in KStreamAggregationIntegrationTest.shouldGroupByKey)

> Transient failure in KStreamAggregationIntegrationTest
> --
>
> Key: KAFKA-4186
> URL: https://issues.apache.org/jira/browse/KAFKA-4186
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Matthias J. Sax
>Priority: Major
>
> Saw this running locally off of trunk:
> {code}
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
> shouldGroupByKey[1] FAILED
> java.lang.AssertionError: Condition not met within timeout 6. Did not 
> receive 10 number of records
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6382) Make ProducerConfig and ConsumerConfig constructors public

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16329260#comment-16329260
 ] 

ASF GitHub Bot commented on KAFKA-6382:
---

mjsax closed pull request #4341: KAFKA-6382: Make ProducerConfig and 
ConsumerConfig constructors public
URL: https://github.com/apache/kafka/pull/4341
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index be3077ffe17..3fe58d7576a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -456,8 +456,7 @@
 public static Map addDeserializerToConfig(Map configs,
   Deserializer 
keyDeserializer,
   Deserializer 
valueDeserializer) {
-Map newConfigs = new HashMap();
-newConfigs.putAll(configs);
+Map newConfigs = new HashMap<>(configs);
 if (keyDeserializer != null)
 newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass());
 if (valueDeserializer != null)
@@ -477,7 +476,11 @@ public static Properties 
addDeserializerToConfig(Properties properties,
 return newProperties;
 }
 
-ConsumerConfig(Map props) {
+public ConsumerConfig(Properties props) {
+super(CONFIG, props);
+}
+
+public ConsumerConfig(Map props) {
 super(CONFIG, props);
 }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 14f405bca6e..4e67fe8323e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -267,7 +267,7 @@
  * @param configs   The producer configs
  *
  */
-public KafkaProducer(Map configs) {
+public KafkaProducer(final Map configs) {
 this(new ProducerConfig(configs), null, null, null, null);
 }
 
@@ -286,7 +286,10 @@ public KafkaProducer(Map configs) {
  */
 public KafkaProducer(Map configs, Serializer 
keySerializer, Serializer valueSerializer) {
 this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, 
keySerializer, valueSerializer)),
-keySerializer, valueSerializer, null, null);
+keySerializer,
+valueSerializer,
+null,
+null);
 }
 
 /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9bf3f47723..0631814cda9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -347,7 +347,8 @@
 }
 
 public static Properties addSerializerToConfig(Properties properties,
-   Serializer 
keySerializer, Serializer valueSerializer) {
+   Serializer keySerializer,
+   Serializer 
valueSerializer) {
 Properties newProperties = new Properties();
 newProperties.putAll(properties);
 if (keySerializer != null)
@@ -357,7 +358,11 @@ public static Properties addSerializerToConfig(Properties 
properties,
 return newProperties;
 }
 
-ProducerConfig(Map props) {
+public ProducerConfig(Properties props) {
+super(CONFIG, props);
+}
+
+public ProducerConfig(Map props) {
 super(CONFIG, props);
 }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b4908e85d82..0feb48ddecd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -46,6 +46,7 @@
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
@@ -765,8 +766,8 @@ private void 

[jira] [Created] (KAFKA-6458) ERROR Found invalid messages during fetch for partition [__consumer_offsets,20] offset 0 error Record is corrupt (stored crc = 73, computed crc = 2559213387) (kafka.serve

2018-01-17 Thread VinayKumar (JIRA)
VinayKumar created KAFKA-6458:
-

 Summary: ERROR Found invalid messages during fetch for partition 
[__consumer_offsets,20] offset 0 error Record is corrupt (stored crc = 73, 
computed crc = 2559213387) (kafka.server.ReplicaFetcherThread)
 Key: KAFKA-6458
 URL: https://issues.apache.org/jira/browse/KAFKA-6458
 Project: Kafka
  Issue Type: Bug
  Components: log, offset manager
Affects Versions: 0.10.2.1
 Environment: CentOS Linux release 7.2.1511
Reporter: VinayKumar


I see the below ERRORs in the log file. Restarting the Kafka service is not 
helping to fix it. 
Can someone please help how to eliminate/fix these errors.

[2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for 
partition [__consumer_offsets,11] offset 0 error Record is corrupt (stored crc 
= 81, computed crc = 1264288837) (kafka.server.ReplicaFetcherThread)
[2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for 
partition [__consumer_offsets,42] offset 0 error Record is corrupt (stored crc 
= 73, computed crc = 1222016777) (kafka.server.ReplicaFetcherThread)
[2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for 
partition [__consumer_offsets,20] offset 0 error Record is corrupt (stored crc 
= 73, computed crc = 2559213387) (kafka.server.ReplicaFetcherThread)
[2018-01-17 13:52:15,777] ERROR Found invalid messages during fetch for 
partition [__consumer_offsets,22] offset 65 error Record is corrupt (stored crc 
= 123, computed crc = 2233168612) (kafka.server.ReplicaFetcherThread)
[2018-01-17 13:52:15,777] ERROR Found invalid messages during fetch for 
partition [__consumer_offsets,39] offset 20 error Record is corrupt (stored crc 
= 71, computed crc = 2065457751) (kafka.server.ReplicaFetcherThread)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6166) Streams configuration requires consumer. and producer. in order to be read

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16329190#comment-16329190
 ] 

ASF GitHub Bot commented on KAFKA-6166:
---

h314to opened a new pull request #4434: KAFKA-6166: Streams configuration 
requires consumer. and producer. in order to be read
URL: https://github.com/apache/kafka/pull/4434
 
 
   * Implement method to get custom properties
   * Add custom properties to getConsumerConfigs and getProducerConfigs
   * Add tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams configuration requires consumer. and producer. in order to be read
> --
>
> Key: KAFKA-6166
> URL: https://issues.apache.org/jira/browse/KAFKA-6166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: Kafka 0.11.0.0
> JDK 1.8
> CoreOS
>Reporter: Justin Manchester
>Assignee: Filipe Agapito
>Priority: Minor
>  Labels: newbie++, user-experience
>
> Problem:
> In previous release you could specify a custom metrics reporter like so:
> Properties config = new Properties(); 
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); 
> config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
> "com.mycompany.MetricReporter"); 
> config.put("custom-key-for-metric-reporter", "value");
> From 0.11.0.0 onwards this is no longer possible, as you have to specify 
> consumer.custom-key-for-metric-reporter or 
> producer.custom-key-for-metric-reporter otherwise it's stripped out of the 
> configuration.
> So, if you wish to use a metrics reporter and to collect producer and 
> consumer metrics, as well as kafka-streams metrics, that you would need to 
> specify 3 distinct configs:
> 1) consumer.custom-key-for-metric-reporter 
> 2) producer.custom-key-for-metric-reporter 
> 3) custom-key-for-metric-reporter
> This appears to be a regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6457) Error: NOT_LEADER_FOR_PARTITION leads to NPE

2018-01-17 Thread Seweryn Habdank-Wojewodzki (JIRA)
Seweryn Habdank-Wojewodzki created KAFKA-6457:
-

 Summary: Error: NOT_LEADER_FOR_PARTITION leads to NPE
 Key: KAFKA-6457
 URL: https://issues.apache.org/jira/browse/KAFKA-6457
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Seweryn Habdank-Wojewodzki


One of our nodes was dead. Then the second one took all responsibility.

But streamming aplication in the meanwhile crashed due to NPE caused by 
{{Error: NOT_LEADER_FOR_PARTITION}}.

The stack trace is below.

 

Is it something expected?

 

{code}

2018-01-17 11:47:21 [my] [WARN ] Sender:251 - [Producer ...2018-01-17 11:47:21 
[my] [WARN ] Sender:251 - [Producer 
clientId=restreamer-my-fef07ca9-b067-45c0-a5af-68b5a1730dac-StreamThread-1-producer]
 Got error produce response with correlation id 768962 on topic-partition 
my_internal_topic-5, retrying (9 attempts left). Error: NOT_LEADER_FOR_PARTITION
2018-01-17 11:47:21 [my] [WARN ] Sender:251 - [Producer 
clientId=restreamer-my-fef07ca9-b067-45c0-a5af-68b5a1730dac-StreamThread-1-producer]
 Got error produce response with correlation id 768962 on topic-partition 
my_internal_topic-7, retrying (9 attempts left). Error: NOT_LEADER_FOR_PARTITION
2018-01-17 11:47:21 [my] [ERROR] AbstractCoordinator:296 - [Consumer 
clientId=restreamer-my-fef07ca9-b067-45c0-a5af-68b5a1730dac-StreamThread-1-consumer,
 groupId=restreamer-my] Heartbeat thread for group restreamer-my failed due to 
unexpected error
java.lang.NullPointerException: null
    at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:436) 
~[my-restreamer.jar:?]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:395) 
~[my-restreamer.jar:?]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) 
~[my-restreamer.jar:?]
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
 ~[my-restreamer.jar:?]
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:275)
 ~[my-restreamer.jar:?]
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:934)
 [my-restreamer.jar:?]

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking

2018-01-17 Thread Gunnar Morling (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-6456:
--
Priority: Minor  (was: Major)

> Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking
> --
>
> Key: KAFKA-6456
> URL: https://issues.apache.org/jira/browse/KAFKA-6456
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Minor
>
> The docs of {{poll()}} currently say "This method should block if no data is 
> currently available". This causes the task from transitioning to PAUSED 
> state, if there's no data available for a longer period of time. I'd 
> therefore suggest to reword like this:
> {quote}
> This method should block if no data is currently available but return control 
> to Kafka Connect periodically (by returning null).
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6456) Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328601#comment-16328601
 ] 

ASF GitHub Bot commented on KAFKA-6456:
---

gunnarmorling opened a new pull request #4432: KAFKA-6456 JavaDoc clarification 
for SourceTask#poll()
URL: https://github.com/apache/kafka/pull/4432
 
 
   Making clear that implementations of poll() shouldn't block indefinitely in 
order to allow the task instance to transition to PAUSED state.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking
> --
>
> Key: KAFKA-6456
> URL: https://issues.apache.org/jira/browse/KAFKA-6456
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> The docs of {{poll()}} currently say "This method should block if no data is 
> currently available". This causes the task from transitioning to PAUSED 
> state, if there's no data available for a longer period of time. I'd 
> therefore suggest to reword like this:
> {quote}
> This method should block if no data is currently available but return control 
> to Kafka Connect periodically (by returning null).
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-01-17 Thread Georgij Cernysiov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328549#comment-16328549
 ] 

Georgij Cernysiov commented on KAFKA-1194:
--

[~manme...@gmail.com] thank you. There is also [~nxmbriggs404] [pull 
request|https://github.com/apache/kafka/pull/3283] which fixes the issue, but 
there was no activity since August 2017. 
Recently, I merged the latest trunk into [~nxmbriggs404] branch with fixes and 
opened [new pull request|https://github.com/apache/kafka/pull/4431] (_i have no 
rights to push to his repository or to fix merge conflicts within pull 
requests_) which is "ready" to be merged into `trunk` (_code review required_).

So far I had no issues with log retention when running kafka based on 
[~nxmbriggs404] fixes.

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch
> Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA

[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking

2018-01-17 Thread Gunnar Morling (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-6456:
--
Summary: Improve JavaDoc of SourceTask#poll() to discourage indefinite 
blocking  (was: Improve JavaDoc of SourceTask#poll())

> Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking
> --
>
> Key: KAFKA-6456
> URL: https://issues.apache.org/jira/browse/KAFKA-6456
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> The docs of {{poll()}} currently say "This method should block if no data is 
> currently available". This causes the task from transitioning to PAUSED 
> state, if there's no data available for a longer period of time. I'd 
> therefore suggest to reword like this:
> {quote}
> This method should block if no data is currently available but return control 
> to Kafka Connect periodically (by returning null).
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll()

2018-01-17 Thread Gunnar Morling (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-6456:
--
Description: 
The docs of {{poll()}} currently say "This method should block if no data is 
currently available". This causes the task from transitioning to PAUSED state, 
if there's no data available for a longer period of time. I'd therefore suggest 
to reword like this:

{quote}
This method should block if no data is currently available but return control 
to Kafka Connect periodically (by returning null).
{quote}

> Improve JavaDoc of SourceTask#poll()
> 
>
> Key: KAFKA-6456
> URL: https://issues.apache.org/jira/browse/KAFKA-6456
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> The docs of {{poll()}} currently say "This method should block if no data is 
> currently available". This causes the task from transitioning to PAUSED 
> state, if there's no data available for a longer period of time. I'd 
> therefore suggest to reword like this:
> {quote}
> This method should block if no data is currently available but return control 
> to Kafka Connect periodically (by returning null).
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2170) 10 LogTest cases failed for file.renameTo failed under windows

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328534#comment-16328534
 ] 

ASF GitHub Bot commented on KAFKA-2170:
---

GeorgeCGV opened a new pull request #4431: KAFKA-2170: Updated Fixes For 
Windows Platform
URL: https://github.com/apache/kafka/pull/4431
 
 
   **This is continuous development of [original pull 
request](https://github.com/apache/kafka/pull/3283) made by 
[nxmbriggs404](https://github.com/nxmbriggs404).**
   
   I am using Kafka primary on Windows platform; but 
[KAFKA-1194](https://issues.apache.org/jira/browse/KAFKA-1194) affects all 
Kafka versions on Windows and makes it unusable without restarts. There are 
several pull requests available to fix this problem but most of them are 
outdated. [Nxmbriggs404 pull 
request](https://github.com/apache/kafka/pull/3283) is the most recent of them. 
Unfortunately, there is no further discussion and progress on forementioned 
pull request, so I created a new one.
   
   Pull request contains the most recent merge of **apache/kafka** **trunk** 
branch into [original pull request](https://github.com/apache/kafka/pull/3283) 
made by [nxmbriggs404](https://github.com/nxmbriggs404) with all related tests 
and code part adaptations.
   
   The only major change is in the `LogManager.scala` file within 
`asyncDelete`. I had to change operations order for `removedLog` from
   
   1. Rename directory (atomic move) 
   2. Make checkpoint log for offsets and recovery
   3. Mark for deletion
   
   to
   
   1. Make checkpoint log for offsets and recovery
   2. Close `FileChannel` this avoids `AccessDeniedException` on Windows
   3. Rename directory (atomic move)
   4. Mark for deletion
   New order will allow `LogManagerTest.testFileReferencesAfterAsyncDelete` to 
succeed on Windows.
   
   Currently I am running long live test with all of the changes mentioned 
above. It seems to function as expected: log retention works.
   
   I think most Windows users would really like to see the old 
[KAFKA-1194](https://issues.apache.org/jira/browse/KAFKA-1194) issue being 
closed. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> 10 LogTest cases failed for  file.renameTo failed under windows
> ---
>
> Key: KAFKA-2170
> URL: https://issues.apache.org/jira/browse/KAFKA-2170
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.1.0
> Environment: Windows
>Reporter: Honghai Chen
>Assignee: Jay Kreps
>Priority: Major
>
> get latest code from trunk, then run test 
> gradlew  -i core:test --tests kafka.log.LogTest
> Got 10 cases failed for same reason:
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 0
>   at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:259)
>   at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:756)
>   at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:747)
>   at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
>   at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at kafka.log.Log.deleteOldSegments(Log.scala:514)
>   at kafka.log.LogTest.testAsyncDelete(LogTest.scala:633)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:601)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:180)
>   at 

[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll()

2018-01-17 Thread Gunnar Morling (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-6456:
--
Summary: Improve JavaDoc of SourceTask#poll()  (was: Improve JavaDoc of 
SourceTask)

> Improve JavaDoc of SourceTask#poll()
> 
>
> Key: KAFKA-6456
> URL: https://issues.apache.org/jira/browse/KAFKA-6456
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6456) Improve JavaDoc of SourceTask

2018-01-17 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-6456:
-

 Summary: Improve JavaDoc of SourceTask
 Key: KAFKA-6456
 URL: https://issues.apache.org/jira/browse/KAFKA-6456
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Gunnar Morling






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms

2018-01-17 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328505#comment-16328505
 ] 

huxihx commented on KAFKA-6449:
---

Try to increase `session.timeout.ms`

> KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more 
> than request.timeout.ms
> ---
>
> Key: KAFKA-6449
> URL: https://issues.apache.org/jira/browse/KAFKA-6449
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: zhaoshijie
>Priority: Major
>
> I use code as as follow consumer a partition of kafka topic, I got 40s 
> latency every poll 
> {code:java}
> @Test
> public void testTimeOut() throws Exception {
> String test_topic = "timeOut_test";
> int test_partition = 1;
> Map kafkaParams = new HashMap();
> kafkaParams.put("auto.offset.reset", "earliest");
> kafkaParams.put("enable.auto.commit", false);
> kafkaParams.put("bootstrap.servers", "*");
> kafkaParams.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaParams.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaParams.put("group.id", "test-consumer-" + 
> System.currentTimeMillis());
> //kafkaParams.put("reconnect.backoff.ms", "0");
> //kafkaParams.put("max.poll.records", "500");
> KafkaConsumer consumer = new KafkaConsumer String>(kafkaParams);
> consumer.assign(Arrays.asList(new TopicPartition(test_topic, 
> test_partition)));
> Long offset = 0L;
> while (true) {
> Long startPollTime = System.currentTimeMillis();
> consumer.seek(new TopicPartition(test_topic, test_partition), 
> offset);
> ConsumerRecords records = consumer.poll(12);
> logger.info("poll take " + (System.currentTimeMillis() - 
> startPollTime) + "ms, MSGCount is " + records.count());
> Thread.sleep(41000);
> Iterator> consumerRecordIterable = 
> records.records(test_topic).iterator();
> while (consumerRecordIterable.hasNext()) {
> offset = consumerRecordIterable.next().offset();
> }
> }
> }
> {code}
> log as follow:
> {code:java}
> 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = false
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id = 
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 4
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> group.id = test-consumer-1516100013868
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> session.timeout.ms = 3
> metrics.num.samples = 2
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> auto.offset.reset = earliest
> | 
> org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178)
> 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = false
> sasl.mechanism = 

[jira] [Commented] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms

2018-01-17 Thread zhaoshijie (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328478#comment-16328478
 ] 

zhaoshijie commented on KAFKA-6449:
---

 `max.poll.interval.ms` is param for kafka 0.10.1.0 and above.   0.10.0.1 has 
not this param and heartbeating is not  a background thread. I do not want 
trigger kafka client rebalance,but process records take more than 40s+.

> KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more 
> than request.timeout.ms
> ---
>
> Key: KAFKA-6449
> URL: https://issues.apache.org/jira/browse/KAFKA-6449
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: zhaoshijie
>Priority: Major
>
> I use code as as follow consumer a partition of kafka topic, I got 40s 
> latency every poll 
> {code:java}
> @Test
> public void testTimeOut() throws Exception {
> String test_topic = "timeOut_test";
> int test_partition = 1;
> Map kafkaParams = new HashMap();
> kafkaParams.put("auto.offset.reset", "earliest");
> kafkaParams.put("enable.auto.commit", false);
> kafkaParams.put("bootstrap.servers", "*");
> kafkaParams.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaParams.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaParams.put("group.id", "test-consumer-" + 
> System.currentTimeMillis());
> //kafkaParams.put("reconnect.backoff.ms", "0");
> //kafkaParams.put("max.poll.records", "500");
> KafkaConsumer consumer = new KafkaConsumer String>(kafkaParams);
> consumer.assign(Arrays.asList(new TopicPartition(test_topic, 
> test_partition)));
> Long offset = 0L;
> while (true) {
> Long startPollTime = System.currentTimeMillis();
> consumer.seek(new TopicPartition(test_topic, test_partition), 
> offset);
> ConsumerRecords records = consumer.poll(12);
> logger.info("poll take " + (System.currentTimeMillis() - 
> startPollTime) + "ms, MSGCount is " + records.count());
> Thread.sleep(41000);
> Iterator> consumerRecordIterable = 
> records.records(test_topic).iterator();
> while (consumerRecordIterable.hasNext()) {
> offset = consumerRecordIterable.next().offset();
> }
> }
> }
> {code}
> log as follow:
> {code:java}
> 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = false
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id = 
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 4
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> group.id = test-consumer-1516100013868
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> session.timeout.ms = 3
> metrics.num.samples = 2
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> auto.offset.reset = earliest
> | 
> org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178)
> 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> 

[jira] [Commented] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms

2018-01-17 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328449#comment-16328449
 ] 

huxihx commented on KAFKA-6449:
---

hmmmso what's the problem you ran into? Repeated rebalance? Try to increase 
`max.poll.interval.ms` to 40+ seconds to see if it works.

> KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more 
> than request.timeout.ms
> ---
>
> Key: KAFKA-6449
> URL: https://issues.apache.org/jira/browse/KAFKA-6449
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: zhaoshijie
>Priority: Major
>
> I use code as as follow consumer a partition of kafka topic, I got 40s 
> latency every poll 
> {code:java}
> @Test
> public void testTimeOut() throws Exception {
> String test_topic = "timeOut_test";
> int test_partition = 1;
> Map kafkaParams = new HashMap();
> kafkaParams.put("auto.offset.reset", "earliest");
> kafkaParams.put("enable.auto.commit", false);
> kafkaParams.put("bootstrap.servers", "*");
> kafkaParams.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaParams.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> kafkaParams.put("group.id", "test-consumer-" + 
> System.currentTimeMillis());
> //kafkaParams.put("reconnect.backoff.ms", "0");
> //kafkaParams.put("max.poll.records", "500");
> KafkaConsumer consumer = new KafkaConsumer String>(kafkaParams);
> consumer.assign(Arrays.asList(new TopicPartition(test_topic, 
> test_partition)));
> Long offset = 0L;
> while (true) {
> Long startPollTime = System.currentTimeMillis();
> consumer.seek(new TopicPartition(test_topic, test_partition), 
> offset);
> ConsumerRecords records = consumer.poll(12);
> logger.info("poll take " + (System.currentTimeMillis() - 
> startPollTime) + "ms, MSGCount is " + records.count());
> Thread.sleep(41000);
> Iterator> consumerRecordIterable = 
> records.records(test_topic).iterator();
> while (consumerRecordIterable.hasNext()) {
> offset = consumerRecordIterable.next().offset();
> }
> }
> }
> {code}
> log as follow:
> {code:java}
> 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = false
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id = 
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 4
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> group.id = test-consumer-1516100013868
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> session.timeout.ms = 3
> metrics.num.samples = 2
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> auto.offset.reset = earliest
> | 
> org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178)
> 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: 
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [10.0.52.24:9092,