[jira] [Updated] (KAFKA-6287) Inconsistent protocol type for empty consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-6287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6287: - Fix Version/s: 1.0.1 > Inconsistent protocol type for empty consumer groups > > > Key: KAFKA-6287 > URL: https://issues.apache.org/jira/browse/KAFKA-6287 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.0.0 >Reporter: Ryan Leslie >Assignee: Jason Gustafson >Priority: Minor > Fix For: 1.0.1 > > > When a consumer is created for a new group, the group metadata's protocol > type is set to 'consumer' and this is stored both in __consumer_offsets as > well as in the coordinator's local cache. > If the consumer leaves the group and the group becomes empty, ListGroups > requests will continue to show the group as type 'consumer', and as such > kafka-consumer-groups.sh will show it via --list. > However, if the coordinator (broker) node is killed and a new coordinator is > elected, when the GroupMetadataManager loads the group from > __consumer_offsets into its cache, it will not set the protocolType if there > are no active consumers. As a result, the group's protocolType will now > become the empty string (UNKNOWN_PROTOCOL_TYPE), and kafka-consumer-groups.sh > will no longer show the group. > Ideally bouncing a broker should not result in the group's protocol changing. > protocolType can be set in GroupMetadataManager.readGroupMessageValue() to > always reflect what's present in the persistent metadata (__consumer_offsets) > regardless if there are active members. > In general, things can get confusing when distinguishing between 'consumer' > and non-consumer groups. For example, DescribeGroups and OffsetFetchRequest > does not filter on protocol type, so it's possible for > kafka-consumer-groups.sh to describe groups (--describe) without actually > being able to list them. In the protocol guide, OffsetFetchRequest / > OffsetCommitRequest have their parameters listed as 'ConsumerGroup', but in > reality these can be used for groups of unknown type as well. For instance, a > consumer group can be copied by finding a coordinator > (GroupCoordinatorRequest / FindCoordinatorRequest) and sending an > OffsetCommitRequest. The group will be auto-created with an empty protocol. > Although this is arguably correct, the group will now exist but not be a > proper 'consumer' group until later when there is a JoinGroupRequest. Again, > this can be confusing as far as categorization / visibility of the group is > concerned. A group can instead be copied more directly by creating a consumer > and calling commitSync (as kafka-consumer-groups.sh), but this does involve > extra connections / requests and so is a little slower when trying to keep a > large number of groups in sync in real-time across clusters. > If we want to make it easier to keep consumer groups consistent, options > include allowing groups to be explicitly created with a protocol type instead > of only lazily, or for groups created outside of JoinGroupRequest the > coordinator can gain a broker config to set a default protocol type for > groups. This would default to 'consumer'. This sort of setting might be > confusing for users though, since implicitly created groups is certainly not > the norm. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6277) Make loadClass thread-safe for class loaders of Connect plugins
[ https://issues.apache.org/jira/browse/KAFKA-6277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-6277: -- Priority: Blocker (was: Major) > Make loadClass thread-safe for class loaders of Connect plugins > --- > > Key: KAFKA-6277 > URL: https://issues.apache.org/jira/browse/KAFKA-6277 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Blocker > Fix For: 1.0.1, 0.11.0.3 > > > In Connect's classloading isolation framework, {{PluginClassLoader}} class > encounters a race condition when several threads corresponding to tasks using > a specific plugin (e.g. a Connector) try to load the same class at the same > time on a single JVM. > The race condition is related to calls to method {{defineClass}} which, > contract to {{findClass}}, is not thread safe for classloaders that override > {{loadClass}}. More details here: > https://docs.oracle.com/javase/7/docs/technotes/guides/lang/cl-mt.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6302) Topic can not be recreated after it is deleted
[ https://issues.apache.org/jira/browse/KAFKA-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16329547#comment-16329547 ] ASF GitHub Bot commented on KAFKA-6302: --- mjsax closed pull request #4332: KAFKA-6302: Improve AdmintClient JavaDocs URL: https://github.com/apache/kafka/pull/4332 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index abe666a4387..897e127d557 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -105,7 +105,7 @@ public CreateTopicsResult createTopics(Collection newTopics) { * * This operation is not transactional so it may succeed for some topics while fail for others. * - * It may take several seconds after this method returns + * It may take several seconds after {@code CreateTopicsResult} returns * success for all the brokers to become aware that the topics have been created. * During this time, {@link AdminClient#listTopics()} and {@link AdminClient#describeTopics(Collection)} * may not return information about the new topics. @@ -138,7 +138,7 @@ public DeleteTopicsResult deleteTopics(Collection topics) { * * This operation is not transactional so it may succeed for some topics while fail for others. * - * It may take several seconds after AdminClient#deleteTopics returns + * It may take several seconds after the {@code DeleteTopicsResult} returns * success for all the brokers to become aware that the topics are gone. * During this time, AdminClient#listTopics and AdminClient#describeTopics * may continue to return information about the deleted topics. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Topic can not be recreated after it is deleted > -- > > Key: KAFKA-6302 > URL: https://issues.apache.org/jira/browse/KAFKA-6302 > Project: Kafka > Issue Type: Bug > Components: admin, clients >Affects Versions: 1.0.0 >Reporter: kic >Assignee: Matthias J. Sax >Priority: Major > Fix For: 1.1.0, 1.0.1 > > > I use an embedded kafka for unit test. My application relies on the ability > to recreate topics programmatically. Currently it is not possible to > re-create a topic after it has been deleted. > {code} > // needs compile time depedency > 'net.manub:scalatest-embedded-kafka_2.11:1.0.0' and > 'org.apache.kafka:kafka-clients:1.0.0' > package kic.kafka.embedded > import java.util.Properties > import org.apache.kafka.clients.admin.{AdminClient, NewTopic} > import org.scalatest._ > import scala.collection.JavaConverters._ > class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers { > val props = new Properties() > val testTopic = "test-topic" > "The admin client" should "be able to create, delete and re-create topics" > in { > props.setProperty("bootstrap.servers", "localhost:10001") > props.setProperty("delete.enable.topic", "true") > props.setProperty("group.id", "test-client") > props.setProperty("key.deserializer", > "org.apache.kafka.common.serialization.LongDeserializer") > props.setProperty("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer") > props.setProperty("clinet.id", "test-client") > props.setProperty("key.serializer", > "org.apache.kafka.common.serialization.LongSerializer") > props.setProperty("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer") > EmbeddedKafaJavaWrapper.start(10001, 10002, props) > try { > implicit val admin = AdminClient.create(props) > // create topic and confirm it exists > createTopic(testTopic) > val topics = listTopics() > info(s"topics: $topics") > topics should contain(testTopic) > // now we should be able to send something to this topic > // TODO create producer and send something > // delete topic > deleteTopic(testTopic) > listTopics() shouldNot contain(testTopic) > // recreate topic > createTopic(testTopic) > // listTopics() should contain(testTopic) > // and
[jira] [Updated] (KAFKA-4186) Transient failure in KStreamAggregationIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-4186: - Description: Saw this running locally off of trunk: {code} org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > shouldGroupByKey[1] FAILED java.lang.AssertionError: Condition not met within timeout 6. Did not receive 10 number of records at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211) at org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480) at org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407) {code} Re-opening this issue with a different test, but it seems the test suite itself has a timing-dependent flakiness. {code} Stacktrace java.lang.AssertionError: Condition not met within timeout 3. metadata for topic=output-9 partition=0 not propagated to all brokers at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMetadataIsPropagated(IntegrationTestUtils.java:254) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForTopicPartitions(IntegrationTestUtils.java:246) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:209) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopics(EmbeddedKafkaCluster.java:168) at org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.createTopics(KStreamAggregationIntegrationTest.java:680) at org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.before(KStreamAggregationIntegrationTest.java:105) ... {code} Example: https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/10564/testReport/junit/org.apache.kafka.streams.integration/KStreamAggregationIntegrationTest/shouldAggregateWindowed/ was: Saw this running locally off of trunk: {code} org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > shouldGroupByKey[1] FAILED java.lang.AssertionError: Condition not met within timeout 6. Did not receive 10 number of records at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211) at org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480) at org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407) {code} > Transient failure in KStreamAggregationIntegrationTest > -- > > Key: KAFKA-4186 > URL: https://issues.apache.org/jira/browse/KAFKA-4186 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Jason Gustafson >Assignee: Matthias J. Sax >Priority: Major > > Saw this running locally off of trunk: > {code} > org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > > shouldGroupByKey[1] FAILED > java.lang.AssertionError: Condition not met within timeout 6. Did not > receive 10 number of records > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211) > at > org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480) > at > org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407) > {code} > > Re-opening this issue with a different test, but it seems the test suite > itself has a timing-dependent flakiness. > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 3. metadata > for topic=output-9 partition=0 not propagated to all brokers > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMetadataIsPropagated(IntegrationTestUtils.java:254) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForTopicPartitions(IntegrationTestUtils.java:246) >
[jira] [Reopened] (KAFKA-4186) Transient failure in KStreamAggregationIntegrationTest.shouldGroupByKey
[ https://issues.apache.org/jira/browse/KAFKA-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reopened KAFKA-4186: -- Assignee: Matthias J. Sax (was: Damian Guy) > Transient failure in KStreamAggregationIntegrationTest.shouldGroupByKey > --- > > Key: KAFKA-4186 > URL: https://issues.apache.org/jira/browse/KAFKA-4186 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Jason Gustafson >Assignee: Matthias J. Sax >Priority: Major > > Saw this running locally off of trunk: > {code} > org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > > shouldGroupByKey[1] FAILED > java.lang.AssertionError: Condition not met within timeout 6. Did not > receive 10 number of records > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211) > at > org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480) > at > org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4186) Transient failure in KStreamAggregationIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-4186: - Summary: Transient failure in KStreamAggregationIntegrationTest (was: Transient failure in KStreamAggregationIntegrationTest.shouldGroupByKey) > Transient failure in KStreamAggregationIntegrationTest > -- > > Key: KAFKA-4186 > URL: https://issues.apache.org/jira/browse/KAFKA-4186 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Jason Gustafson >Assignee: Matthias J. Sax >Priority: Major > > Saw this running locally off of trunk: > {code} > org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > > shouldGroupByKey[1] FAILED > java.lang.AssertionError: Condition not met within timeout 6. Did not > receive 10 number of records > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211) > at > org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480) > at > org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6382) Make ProducerConfig and ConsumerConfig constructors public
[ https://issues.apache.org/jira/browse/KAFKA-6382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16329260#comment-16329260 ] ASF GitHub Bot commented on KAFKA-6382: --- mjsax closed pull request #4341: KAFKA-6382: Make ProducerConfig and ConsumerConfig constructors public URL: https://github.com/apache/kafka/pull/4341 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index be3077ffe17..3fe58d7576a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -456,8 +456,7 @@ public static MapaddDeserializerToConfig(Map configs, Deserializer keyDeserializer, Deserializer valueDeserializer) { -Map newConfigs = new HashMap (); -newConfigs.putAll(configs); +Map newConfigs = new HashMap<>(configs); if (keyDeserializer != null) newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass()); if (valueDeserializer != null) @@ -477,7 +476,11 @@ public static Properties addDeserializerToConfig(Properties properties, return newProperties; } -ConsumerConfig(Map props) { +public ConsumerConfig(Properties props) { +super(CONFIG, props); +} + +public ConsumerConfig(Map props) { super(CONFIG, props); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 14f405bca6e..4e67fe8323e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -267,7 +267,7 @@ * @param configs The producer configs * */ -public KafkaProducer(Map configs) { +public KafkaProducer(final Map configs) { this(new ProducerConfig(configs), null, null, null, null); } @@ -286,7 +286,10 @@ public KafkaProducer(Map configs) { */ public KafkaProducer(Map configs, Serializer keySerializer, Serializer valueSerializer) { this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)), -keySerializer, valueSerializer, null, null); +keySerializer, +valueSerializer, +null, +null); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index f9bf3f47723..0631814cda9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -347,7 +347,8 @@ } public static Properties addSerializerToConfig(Properties properties, - Serializer keySerializer, Serializer valueSerializer) { + Serializer keySerializer, + Serializer valueSerializer) { Properties newProperties = new Properties(); newProperties.putAll(properties); if (keySerializer != null) @@ -357,7 +358,11 @@ public static Properties addSerializerToConfig(Properties properties, return newProperties; } -ProducerConfig(Map props) { +public ProducerConfig(Properties props) { +super(CONFIG, props); +} + +public ProducerConfig(Map props) { super(CONFIG, props); } diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index b4908e85d82..0feb48ddecd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Properties; import java.util.Set; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; @@ -765,8 +766,8 @@ private void
[jira] [Created] (KAFKA-6458) ERROR Found invalid messages during fetch for partition [__consumer_offsets,20] offset 0 error Record is corrupt (stored crc = 73, computed crc = 2559213387) (kafka.serve
VinayKumar created KAFKA-6458: - Summary: ERROR Found invalid messages during fetch for partition [__consumer_offsets,20] offset 0 error Record is corrupt (stored crc = 73, computed crc = 2559213387) (kafka.server.ReplicaFetcherThread) Key: KAFKA-6458 URL: https://issues.apache.org/jira/browse/KAFKA-6458 Project: Kafka Issue Type: Bug Components: log, offset manager Affects Versions: 0.10.2.1 Environment: CentOS Linux release 7.2.1511 Reporter: VinayKumar I see the below ERRORs in the log file. Restarting the Kafka service is not helping to fix it. Can someone please help how to eliminate/fix these errors. [2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for partition [__consumer_offsets,11] offset 0 error Record is corrupt (stored crc = 81, computed crc = 1264288837) (kafka.server.ReplicaFetcherThread) [2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for partition [__consumer_offsets,42] offset 0 error Record is corrupt (stored crc = 73, computed crc = 1222016777) (kafka.server.ReplicaFetcherThread) [2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for partition [__consumer_offsets,20] offset 0 error Record is corrupt (stored crc = 73, computed crc = 2559213387) (kafka.server.ReplicaFetcherThread) [2018-01-17 13:52:15,777] ERROR Found invalid messages during fetch for partition [__consumer_offsets,22] offset 65 error Record is corrupt (stored crc = 123, computed crc = 2233168612) (kafka.server.ReplicaFetcherThread) [2018-01-17 13:52:15,777] ERROR Found invalid messages during fetch for partition [__consumer_offsets,39] offset 20 error Record is corrupt (stored crc = 71, computed crc = 2065457751) (kafka.server.ReplicaFetcherThread) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6166) Streams configuration requires consumer. and producer. in order to be read
[ https://issues.apache.org/jira/browse/KAFKA-6166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16329190#comment-16329190 ] ASF GitHub Bot commented on KAFKA-6166: --- h314to opened a new pull request #4434: KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read URL: https://github.com/apache/kafka/pull/4434 * Implement method to get custom properties * Add custom properties to getConsumerConfigs and getProducerConfigs * Add tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Streams configuration requires consumer. and producer. in order to be read > -- > > Key: KAFKA-6166 > URL: https://issues.apache.org/jira/browse/KAFKA-6166 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 > Environment: Kafka 0.11.0.0 > JDK 1.8 > CoreOS >Reporter: Justin Manchester >Assignee: Filipe Agapito >Priority: Minor > Labels: newbie++, user-experience > > Problem: > In previous release you could specify a custom metrics reporter like so: > Properties config = new Properties(); > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); > config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, > "com.mycompany.MetricReporter"); > config.put("custom-key-for-metric-reporter", "value"); > From 0.11.0.0 onwards this is no longer possible, as you have to specify > consumer.custom-key-for-metric-reporter or > producer.custom-key-for-metric-reporter otherwise it's stripped out of the > configuration. > So, if you wish to use a metrics reporter and to collect producer and > consumer metrics, as well as kafka-streams metrics, that you would need to > specify 3 distinct configs: > 1) consumer.custom-key-for-metric-reporter > 2) producer.custom-key-for-metric-reporter > 3) custom-key-for-metric-reporter > This appears to be a regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6457) Error: NOT_LEADER_FOR_PARTITION leads to NPE
Seweryn Habdank-Wojewodzki created KAFKA-6457: - Summary: Error: NOT_LEADER_FOR_PARTITION leads to NPE Key: KAFKA-6457 URL: https://issues.apache.org/jira/browse/KAFKA-6457 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Reporter: Seweryn Habdank-Wojewodzki One of our nodes was dead. Then the second one took all responsibility. But streamming aplication in the meanwhile crashed due to NPE caused by {{Error: NOT_LEADER_FOR_PARTITION}}. The stack trace is below. Is it something expected? {code} 2018-01-17 11:47:21 [my] [WARN ] Sender:251 - [Producer ...2018-01-17 11:47:21 [my] [WARN ] Sender:251 - [Producer clientId=restreamer-my-fef07ca9-b067-45c0-a5af-68b5a1730dac-StreamThread-1-producer] Got error produce response with correlation id 768962 on topic-partition my_internal_topic-5, retrying (9 attempts left). Error: NOT_LEADER_FOR_PARTITION 2018-01-17 11:47:21 [my] [WARN ] Sender:251 - [Producer clientId=restreamer-my-fef07ca9-b067-45c0-a5af-68b5a1730dac-StreamThread-1-producer] Got error produce response with correlation id 768962 on topic-partition my_internal_topic-7, retrying (9 attempts left). Error: NOT_LEADER_FOR_PARTITION 2018-01-17 11:47:21 [my] [ERROR] AbstractCoordinator:296 - [Consumer clientId=restreamer-my-fef07ca9-b067-45c0-a5af-68b5a1730dac-StreamThread-1-consumer, groupId=restreamer-my] Heartbeat thread for group restreamer-my failed due to unexpected error java.lang.NullPointerException: null at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:436) ~[my-restreamer.jar:?] at org.apache.kafka.common.network.Selector.poll(Selector.java:395) ~[my-restreamer.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) ~[my-restreamer.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) ~[my-restreamer.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:275) ~[my-restreamer.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:934) [my-restreamer.jar:?] {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking
[ https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-6456: -- Priority: Minor (was: Major) > Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking > -- > > Key: KAFKA-6456 > URL: https://issues.apache.org/jira/browse/KAFKA-6456 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Minor > > The docs of {{poll()}} currently say "This method should block if no data is > currently available". This causes the task from transitioning to PAUSED > state, if there's no data available for a longer period of time. I'd > therefore suggest to reword like this: > {quote} > This method should block if no data is currently available but return control > to Kafka Connect periodically (by returning null). > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6456) Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking
[ https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328601#comment-16328601 ] ASF GitHub Bot commented on KAFKA-6456: --- gunnarmorling opened a new pull request #4432: KAFKA-6456 JavaDoc clarification for SourceTask#poll() URL: https://github.com/apache/kafka/pull/4432 Making clear that implementations of poll() shouldn't block indefinitely in order to allow the task instance to transition to PAUSED state. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking > -- > > Key: KAFKA-6456 > URL: https://issues.apache.org/jira/browse/KAFKA-6456 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > The docs of {{poll()}} currently say "This method should block if no data is > currently available". This causes the task from transitioning to PAUSED > state, if there's no data available for a longer period of time. I'd > therefore suggest to reword like this: > {quote} > This method should block if no data is currently available but return control > to Kafka Connect periodically (by returning null). > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time
[ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328549#comment-16328549 ] Georgij Cernysiov commented on KAFKA-1194: -- [~manme...@gmail.com] thank you. There is also [~nxmbriggs404] [pull request|https://github.com/apache/kafka/pull/3283] which fixes the issue, but there was no activity since August 2017. Recently, I merged the latest trunk into [~nxmbriggs404] branch with fixes and opened [new pull request|https://github.com/apache/kafka/pull/4431] (_i have no rights to push to his repository or to fix merge conflicts within pull requests_) which is "ready" to be merged into `trunk` (_code review required_). So far I had no issues with log retention when running kafka based on [~nxmbriggs404] fixes. > The kafka broker cannot delete the old log files after the configured time > -- > > Key: KAFKA-1194 > URL: https://issues.apache.org/jira/browse/KAFKA-1194 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 > Environment: window >Reporter: Tao Qin >Priority: Critical > Labels: features, patch > Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, > kafka-1194-v2.patch, screenshot-1.png > > Original Estimate: 72h > Remaining Estimate: 72h > > We tested it in windows environment, and set the log.retention.hours to 24 > hours. > # The minimum age of a log file to be eligible for deletion > log.retention.hours=24 > After several days, the kafka broker still cannot delete the old log file. > And we get the following exceptions: > [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task > 'kafka-log-retention' (kafka.utils.KafkaScheduler) > kafka.common.KafkaStorageException: Failed to change the log file suffix from > to .deleted for log segment 1516723 > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > at scala.collection.immutable.List.foreach(List.scala:76) > at kafka.log.Log.deleteOldSegments(Log.scala:418) > at > kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743) > at scala.collection.Iterator$class.foreach(Iterator.scala:772) > at > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) > at > scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:314) > at > kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143) > at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > I think this error happens because kafka tries to rename the log file when it > is still opened. So we should close the file first before rename. > The index file uses a special data structure, the MappedByteBuffer. Javadoc > describes it as: > A mapped byte buffer and the file mapping that it represents remain valid > until the buffer itself is garbage-collected. > Fortunately, I find a forceUnmap function in kafka code, and perhaps it can > be used to free the MappedByteBuffer. -- This message was sent by Atlassian JIRA
[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking
[ https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-6456: -- Summary: Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking (was: Improve JavaDoc of SourceTask#poll()) > Improve JavaDoc of SourceTask#poll() to discourage indefinite blocking > -- > > Key: KAFKA-6456 > URL: https://issues.apache.org/jira/browse/KAFKA-6456 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > The docs of {{poll()}} currently say "This method should block if no data is > currently available". This causes the task from transitioning to PAUSED > state, if there's no data available for a longer period of time. I'd > therefore suggest to reword like this: > {quote} > This method should block if no data is currently available but return control > to Kafka Connect periodically (by returning null). > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll()
[ https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-6456: -- Description: The docs of {{poll()}} currently say "This method should block if no data is currently available". This causes the task from transitioning to PAUSED state, if there's no data available for a longer period of time. I'd therefore suggest to reword like this: {quote} This method should block if no data is currently available but return control to Kafka Connect periodically (by returning null). {quote} > Improve JavaDoc of SourceTask#poll() > > > Key: KAFKA-6456 > URL: https://issues.apache.org/jira/browse/KAFKA-6456 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > > The docs of {{poll()}} currently say "This method should block if no data is > currently available". This causes the task from transitioning to PAUSED > state, if there's no data available for a longer period of time. I'd > therefore suggest to reword like this: > {quote} > This method should block if no data is currently available but return control > to Kafka Connect periodically (by returning null). > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2170) 10 LogTest cases failed for file.renameTo failed under windows
[ https://issues.apache.org/jira/browse/KAFKA-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328534#comment-16328534 ] ASF GitHub Bot commented on KAFKA-2170: --- GeorgeCGV opened a new pull request #4431: KAFKA-2170: Updated Fixes For Windows Platform URL: https://github.com/apache/kafka/pull/4431 **This is continuous development of [original pull request](https://github.com/apache/kafka/pull/3283) made by [nxmbriggs404](https://github.com/nxmbriggs404).** I am using Kafka primary on Windows platform; but [KAFKA-1194](https://issues.apache.org/jira/browse/KAFKA-1194) affects all Kafka versions on Windows and makes it unusable without restarts. There are several pull requests available to fix this problem but most of them are outdated. [Nxmbriggs404 pull request](https://github.com/apache/kafka/pull/3283) is the most recent of them. Unfortunately, there is no further discussion and progress on forementioned pull request, so I created a new one. Pull request contains the most recent merge of **apache/kafka** **trunk** branch into [original pull request](https://github.com/apache/kafka/pull/3283) made by [nxmbriggs404](https://github.com/nxmbriggs404) with all related tests and code part adaptations. The only major change is in the `LogManager.scala` file within `asyncDelete`. I had to change operations order for `removedLog` from 1. Rename directory (atomic move) 2. Make checkpoint log for offsets and recovery 3. Mark for deletion to 1. Make checkpoint log for offsets and recovery 2. Close `FileChannel` this avoids `AccessDeniedException` on Windows 3. Rename directory (atomic move) 4. Mark for deletion New order will allow `LogManagerTest.testFileReferencesAfterAsyncDelete` to succeed on Windows. Currently I am running long live test with all of the changes mentioned above. It seems to function as expected: log retention works. I think most Windows users would really like to see the old [KAFKA-1194](https://issues.apache.org/jira/browse/KAFKA-1194) issue being closed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > 10 LogTest cases failed for file.renameTo failed under windows > --- > > Key: KAFKA-2170 > URL: https://issues.apache.org/jira/browse/KAFKA-2170 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.1.0 > Environment: Windows >Reporter: Honghai Chen >Assignee: Jay Kreps >Priority: Major > > get latest code from trunk, then run test > gradlew -i core:test --tests kafka.log.LogTest > Got 10 cases failed for same reason: > kafka.common.KafkaStorageException: Failed to change the log file suffix from > to .deleted for log segment 0 > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:259) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:756) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:747) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514) > at scala.collection.immutable.List.foreach(List.scala:318) > at kafka.log.Log.deleteOldSegments(Log.scala:514) > at kafka.log.LogTest.testAsyncDelete(LogTest.scala:633) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:180) > at
[jira] [Updated] (KAFKA-6456) Improve JavaDoc of SourceTask#poll()
[ https://issues.apache.org/jira/browse/KAFKA-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-6456: -- Summary: Improve JavaDoc of SourceTask#poll() (was: Improve JavaDoc of SourceTask) > Improve JavaDoc of SourceTask#poll() > > > Key: KAFKA-6456 > URL: https://issues.apache.org/jira/browse/KAFKA-6456 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Gunnar Morling >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6456) Improve JavaDoc of SourceTask
Gunnar Morling created KAFKA-6456: - Summary: Improve JavaDoc of SourceTask Key: KAFKA-6456 URL: https://issues.apache.org/jira/browse/KAFKA-6456 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 1.0.0 Reporter: Gunnar Morling -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328505#comment-16328505 ] huxihx commented on KAFKA-6449: --- Try to increase `session.timeout.ms` > KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more > than request.timeout.ms > --- > > Key: KAFKA-6449 > URL: https://issues.apache.org/jira/browse/KAFKA-6449 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: zhaoshijie >Priority: Major > > I use code as as follow consumer a partition of kafka topic, I got 40s > latency every poll > {code:java} > @Test > public void testTimeOut() throws Exception { > String test_topic = "timeOut_test"; > int test_partition = 1; > MapkafkaParams = new HashMap (); > kafkaParams.put("auto.offset.reset", "earliest"); > kafkaParams.put("enable.auto.commit", false); > kafkaParams.put("bootstrap.servers", "*"); > kafkaParams.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("group.id", "test-consumer-" + > System.currentTimeMillis()); > //kafkaParams.put("reconnect.backoff.ms", "0"); > //kafkaParams.put("max.poll.records", "500"); > KafkaConsumer consumer = new KafkaConsumer String>(kafkaParams); > consumer.assign(Arrays.asList(new TopicPartition(test_topic, > test_partition))); > Long offset = 0L; > while (true) { > Long startPollTime = System.currentTimeMillis(); > consumer.seek(new TopicPartition(test_topic, test_partition), > offset); > ConsumerRecords records = consumer.poll(12); > logger.info("poll take " + (System.currentTimeMillis() - > startPollTime) + "ms, MSGCount is " + records.count()); > Thread.sleep(41000); > Iterator > consumerRecordIterable = > records.records(test_topic).iterator(); > while (consumerRecordIterable.hasNext()) { > offset = consumerRecordIterable.next().offset(); > } > } > } > {code} > log as follow: > {code:java} > 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 30 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092] > ssl.keystore.type = JKS > enable.auto.commit = false > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 > check.crcs = true > request.timeout.ms = 4 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 65536 > ssl.truststore.type = JKS > ssl.truststore.location = null > ssl.keystore.password = null > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > group.id = test-consumer-1516100013868 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.trustmanager.algorithm = PKIX > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 6 > connections.max.idle.ms = 54 > session.timeout.ms = 3 > metrics.num.samples = 2 > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > ssl.protocol = TLS > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > ssl.cipher.suites = null > security.protocol = PLAINTEXT > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 3 > auto.offset.reset = earliest > | > org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178) > 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 30 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092] > ssl.keystore.type = JKS > enable.auto.commit = false > sasl.mechanism =
[jira] [Commented] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328478#comment-16328478 ] zhaoshijie commented on KAFKA-6449: --- `max.poll.interval.ms` is param for kafka 0.10.1.0 and above. 0.10.0.1 has not this param and heartbeating is not a background thread. I do not want trigger kafka client rebalance,but process records take more than 40s+. > KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more > than request.timeout.ms > --- > > Key: KAFKA-6449 > URL: https://issues.apache.org/jira/browse/KAFKA-6449 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: zhaoshijie >Priority: Major > > I use code as as follow consumer a partition of kafka topic, I got 40s > latency every poll > {code:java} > @Test > public void testTimeOut() throws Exception { > String test_topic = "timeOut_test"; > int test_partition = 1; > MapkafkaParams = new HashMap (); > kafkaParams.put("auto.offset.reset", "earliest"); > kafkaParams.put("enable.auto.commit", false); > kafkaParams.put("bootstrap.servers", "*"); > kafkaParams.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("group.id", "test-consumer-" + > System.currentTimeMillis()); > //kafkaParams.put("reconnect.backoff.ms", "0"); > //kafkaParams.put("max.poll.records", "500"); > KafkaConsumer consumer = new KafkaConsumer String>(kafkaParams); > consumer.assign(Arrays.asList(new TopicPartition(test_topic, > test_partition))); > Long offset = 0L; > while (true) { > Long startPollTime = System.currentTimeMillis(); > consumer.seek(new TopicPartition(test_topic, test_partition), > offset); > ConsumerRecords records = consumer.poll(12); > logger.info("poll take " + (System.currentTimeMillis() - > startPollTime) + "ms, MSGCount is " + records.count()); > Thread.sleep(41000); > Iterator > consumerRecordIterable = > records.records(test_topic).iterator(); > while (consumerRecordIterable.hasNext()) { > offset = consumerRecordIterable.next().offset(); > } > } > } > {code} > log as follow: > {code:java} > 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 30 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092] > ssl.keystore.type = JKS > enable.auto.commit = false > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 > check.crcs = true > request.timeout.ms = 4 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 65536 > ssl.truststore.type = JKS > ssl.truststore.location = null > ssl.keystore.password = null > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > group.id = test-consumer-1516100013868 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.trustmanager.algorithm = PKIX > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 6 > connections.max.idle.ms = 54 > session.timeout.ms = 3 > metrics.num.samples = 2 > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > ssl.protocol = TLS > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > ssl.cipher.suites = null > security.protocol = PLAINTEXT > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 3 > auto.offset.reset = earliest > | > org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178) > 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 30 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 >
[jira] [Commented] (KAFKA-6449) KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more than request.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-6449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328449#comment-16328449 ] huxihx commented on KAFKA-6449: --- hmmmso what's the problem you ran into? Repeated rebalance? Try to increase `max.poll.interval.ms` to 40+ seconds to see if it works. > KafkaConsumer happen 40s timeOut when poll data after pollThread sleep more > than request.timeout.ms > --- > > Key: KAFKA-6449 > URL: https://issues.apache.org/jira/browse/KAFKA-6449 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.1 >Reporter: zhaoshijie >Priority: Major > > I use code as as follow consumer a partition of kafka topic, I got 40s > latency every poll > {code:java} > @Test > public void testTimeOut() throws Exception { > String test_topic = "timeOut_test"; > int test_partition = 1; > MapkafkaParams = new HashMap (); > kafkaParams.put("auto.offset.reset", "earliest"); > kafkaParams.put("enable.auto.commit", false); > kafkaParams.put("bootstrap.servers", "*"); > kafkaParams.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > kafkaParams.put("group.id", "test-consumer-" + > System.currentTimeMillis()); > //kafkaParams.put("reconnect.backoff.ms", "0"); > //kafkaParams.put("max.poll.records", "500"); > KafkaConsumer consumer = new KafkaConsumer String>(kafkaParams); > consumer.assign(Arrays.asList(new TopicPartition(test_topic, > test_partition))); > Long offset = 0L; > while (true) { > Long startPollTime = System.currentTimeMillis(); > consumer.seek(new TopicPartition(test_topic, test_partition), > offset); > ConsumerRecords records = consumer.poll(12); > logger.info("poll take " + (System.currentTimeMillis() - > startPollTime) + "ms, MSGCount is " + records.count()); > Thread.sleep(41000); > Iterator > consumerRecordIterable = > records.records(test_topic).iterator(); > while (consumerRecordIterable.hasNext()) { > offset = consumerRecordIterable.next().offset(); > } > } > } > {code} > log as follow: > {code:java} > 2018-01-16 18:53:33,033 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 30 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [10.0.52.24:9092, 10.0.52.25:9092, 10.0.52.26:9092] > ssl.keystore.type = JKS > enable.auto.commit = false > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 > check.crcs = true > request.timeout.ms = 4 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 65536 > ssl.truststore.type = JKS > ssl.truststore.location = null > ssl.keystore.password = null > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > group.id = test-consumer-1516100013868 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.trustmanager.algorithm = PKIX > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 6 > connections.max.idle.ms = 54 > session.timeout.ms = 3 > metrics.num.samples = 2 > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > ssl.protocol = TLS > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > ssl.cipher.suites = null > security.protocol = PLAINTEXT > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 3 > auto.offset.reset = earliest > | > org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:178) > 2018-01-16 18:53:34,034 |INFO | main | ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 30 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [10.0.52.24:9092,