[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353244#comment-16353244 ] Matthias J. Sax commented on KAFKA-6535: Meta comment: sometimes, people need to manually repartition via `through()` – should we allow users to do the same thing for this case? I mean, setting retention to infinite and tell Kafka Streams to purge data for those topics? > Set default retention ms for Streams repartition topics to infinity > --- > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6536) Streams quickstart pom.xml is missing versions for a bunch of plugins
Ewen Cheslack-Postava created KAFKA-6536: Summary: Streams quickstart pom.xml is missing versions for a bunch of plugins Key: KAFKA-6536 URL: https://issues.apache.org/jira/browse/KAFKA-6536 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.11.0.2, 1.0.0, 1.0.1 Reporter: Ewen Cheslack-Postava There are a bunch of plugins being used that maven helpfully warns you about being unversioned: {code:java} > [INFO] Scanning for projects... > [WARNING] > [WARNING] Some problems were encountered while building the effective model > for org.apache.kafka:streams-quickstart-java:maven-archetype:1.0.1 > [WARNING] 'build.plugins.plugin.version' for > org.apache.maven.plugins:maven-shade-plugin is missing. @ > org.apache.kafka:streams-quickstart:1.0.1, > /Users/ewencp/kafka.git/.release_work_dir/kafka/streams/quickstart/pom.xml, > line 64, column 21 > [WARNING] 'build.plugins.plugin.version' for > com.github.siom79.japicmp:japicmp-maven-plugin is missing. @ > org.apache.kafka:streams-quickstart:1.0.1, > /Users/ewencp/kafka.git/.release_work_dir/kafka/streams/quickstart/pom.xml, > line 74, column 21 > [WARNING] > [WARNING] Some problems were encountered while building the effective model > for org.apache.kafka:streams-quickstart:pom:1.0.1 > [WARNING] 'build.plugins.plugin.version' for > org.apache.maven.plugins:maven-shade-plugin is missing. @ line 64, column 21 > [WARNING] 'build.plugins.plugin.version' for > com.github.siom79.japicmp:japicmp-maven-plugin is missing. @ line 74, column > 21 > [WARNING] > [WARNING] It is highly recommended to fix these problems because they > threaten the stability of your build. > [WARNING] > [WARNING] For this reason, future Maven versions might no longer support > building such malformed projects.{code} Unversioned dependencies are dangerous as they make the build non-reproducible. In fact, a released version may become very difficult to build as the user would have to track down the working versions of the plugins. This seems particularly bad for the quickstart as it's likely to be copy/pasted into people's own projects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity
Guozhang Wang created KAFKA-6535: Summary: Set default retention ms for Streams repartition topics to infinity Key: KAFKA-6535 URL: https://issues.apache.org/jira/browse/KAFKA-6535 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang After KIP-220 / KIP-204, repartition topics in Streams are transient, so it is better to set its default retention to infinity to allow any records be pushed to it with old timestamps (think: bootstrapping, re-processing) and just rely on the purging API to keeping its storage small. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6254) Introduce Incremental FetchRequests to Increase Partition Scalability
[ https://issues.apache.org/jira/browse/KAFKA-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353097#comment-16353097 ] Andrey Falko commented on KAFKA-6254: - Here is the tool that I used to get those above results: [https://github.com/salesforce/kafka-partition-availability-benchmark] Ran it with 10 brokers and 4k topics per producer. i3.4xlarge instances for brokers, i3.2xlarge instrances for producers. > Introduce Incremental FetchRequests to Increase Partition Scalability > - > > Key: KAFKA-6254 > URL: https://issues.apache.org/jira/browse/KAFKA-6254 > Project: Kafka > Issue Type: Improvement >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > Fix For: 1.1.0 > > > Introduce Incremental FetchRequests to Increase Partition Scalability. See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6520: - Labels: newbie user-experience (was: user-experience) > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Priority: Major > Labels: newbie, user-experience > > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. > [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a > related issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6520: - Issue Type: Improvement (was: Bug) > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Priority: Major > Labels: user-experience > > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. > [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a > related issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5233) Changes to punctuate semantics (KIP-138)
[ https://issues.apache.org/jira/browse/KAFKA-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352966#comment-16352966 ] ASF GitHub Bot commented on KAFKA-5233: --- guozhangwang closed pull request #3678: KAFKA-5233 follow up URL: https://github.com/apache/kafka/pull/3678 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java index d191891afe9..5aa7c7d94bb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Rule; @@ -39,38 +40,73 @@ @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); +private Punctuator punctuator; + +private final TransformerSupplier> transformerSupplier = +new TransformerSupplier>() { +public Transformer> get() { +return new Transformer>() { + +private int total = 0; + +@Override +public void init(final ProcessorContext context) { +punctuator = new Punctuator() { +@Override +public void punctuate(long timestamp) { +context.forward(-1, (int) timestamp); +} +}; +} + +@Override +public KeyValue transform(Number key, Number value) { +total += value.intValue(); +return KeyValue.pair(key.intValue() * 2, total); +} + +@Override +public KeyValue punctuate(long timestamp) { +return KeyValue.pair(-1, (int) timestamp); +} + +@Override +public void close() { +} +}; +} +}; + @Test public void testTransform() { StreamsBuilder builder = new StreamsBuilder(); -TransformerSupplier> transformerSupplier = -new TransformerSupplier>() { -public Transformer> get() { -return new Transformer>() { +final int[] expectedKeys = {1, 10, 100, 1000}; + +MockProcessorSupplier processor = new MockProcessorSupplier<>(); +KStream stream = builder.stream(intSerde, intSerde, topicName); +stream.transform(transformerSupplier).process(processor); + +driver.setUp(builder); +for (int expectedKey : expectedKeys) { +driver.process(topicName, expectedKey, expectedKey * 10); +} -private int total = 0; +driver.punctuate(2, punctuator); +driver.punctuate(3, punctuator); -@Override -public void init(ProcessorContext context) { -} +assertEquals(6, processor.processed.size()); -@Override -public KeyValue transform(Number key, Number value) { -total += value.intValue(); -return KeyValue.pair(key.intValue() * 2, total); -} +String[] expected = {"2:10", "20:110", "200:1110", "2000:0", "-1:2", "-1:3"}; -@Override -public KeyValue punctuate(long timestamp) { -return KeyValue.pair(-1, (int) timestamp); -} +for (int i = 0; i < expected.length; i++) { +assertEquals(expected[i], processor.processed.get(i)); +} +} -@Override -public void close() { -} -}; -} -}; +@Test @Deprecated +public void testTransformWithDeprecatedPunctuate() { +StreamsBuilder builder = new StreamsBuilder(); final int[] expectedKeys = {1, 10, 100, 1000}; @@ -83,8 +119,8 @@
[jira] [Updated] (KAFKA-6367) Fix StateRestoreListener To Use Correct Batch Ending Offset
[ https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-6367: --- Description: {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long for the batch ending offset, but the {{nextPosition}} is not correct, it should be the offset of the latest restored offset, but {{nextPosition}} is the offset of the first not restored offset. We can't automatically use {{nextPosition}} - 1 as this could be a commit marker. was: {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long but the {{nextPosition}} is not correct, it should be the offset of the latest restored offset, but {{nextPosition}} is the offset of the first not restored offset. We can't automatically use {{nextPosition}} - 1 as this could be a commit marker. > Fix StateRestoreListener To Use Correct Batch Ending Offset > --- > > Key: KAFKA-6367 > URL: https://issues.apache.org/jira/browse/KAFKA-6367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 1.0.2 > > > {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} > long for the batch ending offset, but the {{nextPosition}} is not correct, it > should be the offset of the latest restored offset, but {{nextPosition}} is > the offset of the first not restored offset. > We can't automatically use {{nextPosition}} - 1 as this could be a commit > marker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6367) Fix StateRestoreListener To Use Correct Batch Ending Offset
[ https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-6367: --- Summary: Fix StateRestoreListener To Use Correct Batch Ending Offset (was: Fix StateRestoreListener To Use Correct Ending Offset) > Fix StateRestoreListener To Use Correct Batch Ending Offset > --- > > Key: KAFKA-6367 > URL: https://issues.apache.org/jira/browse/KAFKA-6367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 1.0.2 > > > {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} > long but the {{nextPosition}} is not correct, it should be the offset of the > latest restored offset, but {{nextPosition}} is the offset of the first not > restored offset. > We can't automatically use {{nextPosition}} - 1 as this could be a commit > marker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6534) Consumer.poll may not trigger rebalance in time when there is a task migration
[ https://issues.apache.org/jira/browse/KAFKA-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6534: - Description: When Streams detect a task migration event in one of its thread, today it will always let its trigger to call {{consumer.poll}} hoping it could trigger the rebalance and hence clean up the records buffered from the partitions that on longer owned. However, because the rebalance is based on heartbeat responses which has a window of race, the rebalance is not always guaranteed to be triggered when task migration happens. As a result it could cause the records buffered in consumer to not be cleaned up and later be processed by Streams, realizing it no longer belongs to the thread, causing: {code:java} java.lang.IllegalStateException: Record's partition does not belong to this partition-group. {code} Note this issue is only relevant when EOS is turned on, and based the default heartbeat.interval.ms value (3 sec), the race likelihood should not be high. was: When Streams detect a task migration event in one of its thread, today it will always let its trigger to call {{consumer.poll}} hoping it could trigger the rebalance and hence clean up the records buffered from the partitions that on longer owned. However, because the rebalance is based on heartbeat responses which has a window of race, the rebalance is not always guaranteed to be triggered when task migration happens. As a result it could cause the records buffered in consumer to not be cleaned up and later be processed by Streams, realizing it no longer belongs to the thread, causing: {code:java} java.lang.IllegalStateException: Record's partition does not belong to this partition-group. {code} > Consumer.poll may not trigger rebalance in time when there is a task migration > -- > > Key: KAFKA-6534 > URL: https://issues.apache.org/jira/browse/KAFKA-6534 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > When Streams detect a task migration event in one of its thread, today it > will always let its trigger to call {{consumer.poll}} hoping it could trigger > the rebalance and hence clean up the records buffered from the partitions > that on longer owned. However, because the rebalance is based on heartbeat > responses which has a window of race, the rebalance is not always guaranteed > to be triggered when task migration happens. As a result it could cause the > records buffered in consumer to not be cleaned up and later be processed by > Streams, realizing it no longer belongs to the thread, causing: > {code:java} > java.lang.IllegalStateException: Record's partition does not belong to this > partition-group. > {code} > Note this issue is only relevant when EOS is turned on, and based the default > heartbeat.interval.ms value (3 sec), the race likelihood should not be high. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5987) Kafka metrics templates used in document generation should maintain order of tags
[ https://issues.apache.org/jira/browse/KAFKA-5987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352874#comment-16352874 ] ASF GitHub Bot commented on KAFKA-5987: --- ewencp closed pull request #3985: KAFKA-5987: Maintain order of metric tags in generated documentation URL: https://github.com/apache/kafka/pull/3985 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java index acf42ec339f..0a2d43a4b61 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java @@ -18,6 +18,7 @@ import java.util.Arrays; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -98,7 +99,7 @@ public FetcherMetricsRegistry(Set tags, String metricGrpPrefix) { "The maximum throttle time in ms", tags); /* Topic level */ -Set topicTags = new HashSet<>(tags); +Set topicTags = new LinkedHashSet<>(tags); topicTags.add("topic"); this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java index 21dbca61830..b01236f6218 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java @@ -17,7 +17,7 @@ package org.apache.kafka.clients.producer.internals; import java.util.ArrayList; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -70,12 +70,12 @@ private final Metrics metrics; private final Set tags; -private final HashSet topicTags; +private final LinkedHashSet topicTags; public SenderMetricsRegistry(Metrics metrics) { this.metrics = metrics; this.tags = this.metrics.config().tags().keySet(); -this.allTemplates = new ArrayList(); +this.allTemplates = new ArrayList<>(); /* Client level */ @@ -126,7 +126,7 @@ public SenderMetricsRegistry(Metrics metrics) { "The maximum time in ms a request was throttled by a broker"); /* Topic level */ -this.topicTags = new HashSet(tags); +this.topicTags = new LinkedHashSet<>(tags); this.topicTags.add("topic"); // We can't create the MetricName up front for these, because we don't know the topic name yet. diff --git a/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java index e3ea9950ef1..1b1de71037d 100644 --- a/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java +++ b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Objects; import java.util.Set; @@ -26,27 +26,45 @@ * A template for a MetricName. It contains a name, group, and description, as * well as all the tags that will be used to create the mBean name. Tag values * are omitted from the template, but are filled in at runtime with their - * specified values. + * specified values. The order of the tags is maintained, if an ordered set + * is provided, so that the mBean names can be compared and sorted lexicographically. */ public class MetricNameTemplate { private final String name; private final String group; private final String description; -private Set tags; +private LinkedHashSet tags; -public MetricNameTemplate(String name, String group, String description, Set tags) { +/** + * Create a new template. Note that the order of the tags will be preserved if the supplied + * {@code tagsNames} set has an order. + * + * @param name the name of the metric; may not be null + * @param group the name of the group; may not be null + * @param description the description of the metric; may not be null + * @param tagsNames the set of metric tag names, which can/should be a set that maintains order; may not be null + */ +
[jira] [Resolved] (KAFKA-5987) Kafka metrics templates used in document generation should maintain order of tags
[ https://issues.apache.org/jira/browse/KAFKA-5987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-5987. -- Resolution: Fixed Fix Version/s: 1.1.0 1.2.0 Issue resolved by pull request 3985 [https://github.com/apache/kafka/pull/3985] > Kafka metrics templates used in document generation should maintain order of > tags > - > > Key: KAFKA-5987 > URL: https://issues.apache.org/jira/browse/KAFKA-5987 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 1.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Blocker > Fix For: 1.2.0, 1.1.0, 1.0.1 > > > KAFKA-5191 recently added a new {{MetricNameTemplate}} that is used to create > the {{MetricName}} objects in the producer and consumer, as we as in the > newly-added generation of metric documentation. The {{MetricNameTemplate}} > and the {{Metric.toHtmlTable}} do not maintain the order of the tags, which > means the resulting HTML documentation will order the table of MBean > attributes based upon the lexicographical ordering of the MBeans, each of > which uses the lexicographical ordering of its tags. This can result in the > following order: > {noformat} > kafka.connect:type=sink-task-metrics,connector={connector},partition={partition},task={task},topic={topic} > kafka.connect:type=sink-task-metrics,connector={connector},task={task} > {noformat} > However, if the MBeans maintained the order of the tags then the > documentation would use the following order: > {noformat} > kafka.connect:type=sink-task-metrics,connector={connector},task={task} > kafka.connect:type=sink-task-metrics,connector={connector},task={task},topic={topic},partition={partition} > {noformat} > This would be more readable, and the code that is creating the templates > would have control over the order of the tags. > To maintain order, {{MetricNameTemplate}} should used a {{LinkedHashSet}} for > the tags, and the {{Metrics.toHtmlTable}} method should also use a > {{LinkedHashMap}} when building up the tags used in the MBean name. > Note that JMX MBean names use {{ObjectName}} that does not maintain order, so > this change should have no impact on JMX MBean names. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6534) Consumer.poll may not trigger rebalance in time when there is a task migration
[ https://issues.apache.org/jira/browse/KAFKA-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6534: --- Description: When Streams detect a task migration event in one of its thread, today it will always let its trigger to call {{consumer.poll}} hoping it could trigger the rebalance and hence clean up the records buffered from the partitions that on longer owned. However, because the rebalance is based on heartbeat responses which has a window of race, the rebalance is not always guaranteed to be triggered when task migration happens. As a result it could cause the records buffered in consumer to not be cleaned up and later be processed by Streams, realizing it no longer belongs to the thread, causing: {code:java} java.lang.IllegalStateException: Record's partition does not belong to this partition-group. {code} was: When Streams detect a task migration event in one of its thread, today it will always let its trigger to call {{consumer.poll}} hoping it could trigger the rebalance and hence clean up the records buffered from the partitions that on longer owned. However, because the rebalance is based on HB responses which has a window of race, the rebalance is not always guaranteed to be triggered when task migration happens. As a result it could cause the records buffered in consumer to not be cleaned up and later be processed by Streams, realizing it no longer belongs to the thread, causing: {code} java.lang.IllegalStateException: Record's partition does not belong to this partition-group. {code} > Consumer.poll may not trigger rebalance in time when there is a task migration > -- > > Key: KAFKA-6534 > URL: https://issues.apache.org/jira/browse/KAFKA-6534 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > When Streams detect a task migration event in one of its thread, today it > will always let its trigger to call {{consumer.poll}} hoping it could trigger > the rebalance and hence clean up the records buffered from the partitions > that on longer owned. However, because the rebalance is based on heartbeat > responses which has a window of race, the rebalance is not always guaranteed > to be triggered when task migration happens. As a result it could cause the > records buffered in consumer to not be cleaned up and later be processed by > Streams, realizing it no longer belongs to the thread, causing: > {code:java} > java.lang.IllegalStateException: Record's partition does not belong to this > partition-group. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6534) Consumer.poll may not trigger rebalance in time when there is a task migration
Guozhang Wang created KAFKA-6534: Summary: Consumer.poll may not trigger rebalance in time when there is a task migration Key: KAFKA-6534 URL: https://issues.apache.org/jira/browse/KAFKA-6534 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang When Streams detect a task migration event in one of its thread, today it will always let its trigger to call {{consumer.poll}} hoping it could trigger the rebalance and hence clean up the records buffered from the partitions that on longer owned. However, because the rebalance is based on HB responses which has a window of race, the rebalance is not always guaranteed to be triggered when task migration happens. As a result it could cause the records buffered in consumer to not be cleaned up and later be processed by Streams, realizing it no longer belongs to the thread, causing: {code} java.lang.IllegalStateException: Record's partition does not belong to this partition-group. {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6083) The Fetcher should add the InvalidRecordException as a cause to the KafkaException when invalid record is found.
[ https://issues.apache.org/jira/browse/KAFKA-6083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6083: - Fix Version/s: (was: 1.0.1) 1.0.2 Seems the PR is still in flight, moving from 1.0.1 to 1.0.2 so we can get 1.0.1 out the door. > The Fetcher should add the InvalidRecordException as a cause to the > KafkaException when invalid record is found. > > > Key: KAFKA-6083 > URL: https://issues.apache.org/jira/browse/KAFKA-6083 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 1.0.0 >Reporter: Jiangjie Qin >Assignee: Adem Efe Gencer >Priority: Major > Labels: newbie++ > Fix For: 1.0.2 > > > In the Fetcher, when there is an InvalidRecoredException thrown, we will > convert it to a KafkaException, we should also add the InvalidRecordException > to it as the cause. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5445) Document exceptions thrown by AdminClient methods
[ https://issues.apache.org/jira/browse/KAFKA-5445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-5445: - Fix Version/s: (was: 1.0.1) 1.0.2 Seems this needs a bit more work to be merged, moving out of 1.0.1 to 1.0.2 > Document exceptions thrown by AdminClient methods > - > > Key: KAFKA-5445 > URL: https://issues.apache.org/jira/browse/KAFKA-5445 > Project: Kafka > Issue Type: Improvement > Components: admin, clients >Reporter: Ismael Juma >Assignee: Andrey Dyachkov >Priority: Major > Fix For: 1.1.0, 1.0.2 > > > AdminClient should document the exceptions that users may have to handle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4972) Kafka 0.10.0 Found a corrupted index file during Kafka broker startup
[ https://issues.apache.org/jira/browse/KAFKA-4972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-4972: - Fix Version/s: (was: 1.0.1) 1.0.2 No progress on this, moving to subsequent release. > Kafka 0.10.0 Found a corrupted index file during Kafka broker startup > -- > > Key: KAFKA-4972 > URL: https://issues.apache.org/jira/browse/KAFKA-4972 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0 > Environment: JDK: HotSpot x64 1.7.0_80 > Tag: 0.10.0 >Reporter: fangjinuo >Priority: Critical > Labels: reliability > Fix For: 1.0.2 > > Attachments: Snap3.png > > > -deleted text-After force shutdown all kafka brokers one by one, restart them > one by one, but a broker startup failure. > The following WARN leval log was found in the log file: > found a corrutped index file, .index , delet it ... > you can view details by following attachment. > ~I look up some codes in core module, found out : > the nonthreadsafe method LogSegment.append(offset, messages) has tow caller: > 1) Log.append(messages) // here has a synchronized > lock > 2) LogCleaner.cleanInto(topicAndPartition, source, dest, map, retainDeletes, > messageFormatVersion) // here has not > So I guess this may be the reason for the repeated offset in 0xx.log file > (logsegment's .log) ~ > Although this is just my inference, but I hope that this problem can be > quickly repaired -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4686) Null Message payload is shutting down broker
[ https://issues.apache.org/jira/browse/KAFKA-4686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-4686: - Fix Version/s: (was: 1.0.1) 1.0.2 Seems there's no movement on this for 1.0.1, moving to 1.0.2 for now. > Null Message payload is shutting down broker > > > Key: KAFKA-4686 > URL: https://issues.apache.org/jira/browse/KAFKA-4686 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.1 > Environment: Amazon Linux AMI release 2016.03 kernel > 4.4.19-29.55.amzn1.x86_64 >Reporter: Rodrigo Queiroz Saramago >Priority: Critical > Fix For: 1.0.2 > > Attachments: KAFKA-4686-NullMessagePayloadError.tar.xz, > kafkaServer.out > > > Hello, I have a test environment with 3 brokers and 1 zookeeper nodes, in > which clients connect using two-way ssl authentication. I use kafka version > 0.10.1.1, the system works as expected for a while, but if the broker goes > down and then is restarted, something got corrupted and is not possible start > broker again, it always fails with the same error. What this error mean? What > can I do in this case? Is this the expected behavior? > [2017-01-23 07:03:28,927] ERROR There was an error in one of the threads > during logs loading: kafka.common.KafkaException: Message payload is null: > Message(magic = 0, attributes = 1, crc = 4122289508, key = null, payload = > null) (kafka.log.LogManager) > [2017-01-23 07:03:28,929] FATAL Fatal error during KafkaServer startup. > Prepare to shutdown (kafka.server.KafkaServer) > kafka.common.KafkaException: Message payload is null: Message(magic = 0, > attributes = 1, crc = 4122289508, key = null, payload = null) > at > kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90) > at > kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85) > at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33) > at kafka.log.LogSegment.recover(LogSegment.scala:223) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at kafka.log.Log.loadSegments(Log.scala:179) > at kafka.log.Log.(Log.scala:108) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > [2017-01-23 07:03:28,946] INFO shutting down (kafka.server.KafkaServer) > [2017-01-23 07:03:28,949] INFO Terminate ZkClient event thread. > (org.I0Itec.zkclient.ZkEventThread) > [2017-01-23 07:03:28,954] INFO EventThread shut down for session: > 0x159bd458ae70008 (org.apache.zookeeper.ClientCnxn) > [2017-01-23 07:03:28,954] INFO Session: 0x159bd458ae70008 closed > (org.apache.zookeeper.ZooKeeper) > [2017-01-23 07:03:28,957] INFO shut down completed (kafka.server.KafkaServer) > [2017-01-23 07:03:28,959] FATAL Fatal error during KafkaServerStartable > startup. Prepare to shutdown (kafka.server.KafkaServerStartable) > kafka.common.KafkaException: Message payload is null: Message(magic = 0, > attributes = 1, crc = 4122289508, key = null, payload = null) > at > kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90) > at > kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85) > at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33) > at kafka.log.LogSegment.recover(LogSegment.scala:223) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLik
[jira] [Commented] (KAFKA-6452) Add documentation for delegation token authentication mechanism
[ https://issues.apache.org/jira/browse/KAFKA-6452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352789#comment-16352789 ] ASF GitHub Bot commented on KAFKA-6452: --- junrao closed pull request #4490: KAFKA-6452: Add documentation for delegation token authentication URL: https://github.com/apache/kafka/pull/4490 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/security.html b/docs/security.html index 4e401aec186..3e3c818571a 100644 --- a/docs/security.html +++ b/docs/security.html @@ -661,6 +661,108 @@ 7.3 Authentication using SASL + +Authentication using Delegation Tokens +Delegation token based authentication is a lightweight authentication mechanism to complement existing SASL/SSL +methods. Delegation tokens are shared secrets between kafka brokers and clients. Delegation tokens will help processing +frameworks to distribute the workload to available workers in a secure environment without the added cost of distributing +Kerberos TGT/keytabs or keystores when 2-way SSL is used. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka";>KIP-48 +for more details. + +Typical steps for delegation token usage are: + +User authenticates with the Kafka cluster via SASL or SSL, and obtains a delegation token. This can be done +using AdminClient APIs or using kafka-delegation-token.sh script. +User securely passes the delegation token to Kafka clients for authenticating with the Kafka cluster. +Token owner/renewer can renew/expire the delegation tokens. + + + +Token Management + A master key/secret is used to generate and verify delegation tokens. This is supplied using config +option delegation.token.master.key. Same secret key must be configured across all the brokers. +If the secret is not set or set to empty string, brokers will disable the delegation token authentication. + +In current implementation, token details are stored in Zookeeper and is suitable for use in Kafka installations where +Zookeeper is on a private network. Also currently, master key/secret is stored as plain text in server.properties +config file. We intend to make these configurable in a future Kafka release. + +A token has a current life, and a maximum renewable life. By default, tokens must be renewed once every 24 hours +for up to 7 days. These can be configured using delegation.token.expiry.time.ms +and delegation.token.max.lifetime.ms config options. + +Tokens can also be cancelled explicitly. If a token is not renewed by the token’s expiration time or if token +is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper. + + +Creating Delegation Tokens +Tokens can be created by using AdminClient APIs or using kafka-delegation-token.sh script. +Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels. +Tokens can not be requests if the initial authentication is done through delegation token. +kafka-delegation-token.sh script examples are given below. +Create a delegation token: + +> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1 + +Renew a delegation token: + +> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK + +Expire a delegation token: + +> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire --expiry-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK + +Existing tokens can be described using the --describe option: + +> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties --owner-principal User:user1 + + +Token Authentication +Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable +SASL/SCRAM mechanism on Kafka cluster as described in here. + + Configuring Kafka Clients: + +Configure the JAAS configuration property for each client in pr
[jira] [Resolved] (KAFKA-6452) Add documentation for delegation token authentication mechanism
[ https://issues.apache.org/jira/browse/KAFKA-6452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-6452. Resolution: Fixed Fix Version/s: (was: 1.2.0) 1.1.0 The PR is merged to 1.1 and trunk. > Add documentation for delegation token authentication mechanism > --- > > Key: KAFKA-6452 > URL: https://issues.apache.org/jira/browse/KAFKA-6452 > Project: Kafka > Issue Type: Sub-task > Components: documentation >Reporter: Manikumar >Assignee: Manikumar >Priority: Major > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5638) Inconsistency in consumer group related ACLs
[ https://issues.apache.org/jira/browse/KAFKA-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vahid Hashemian updated KAFKA-5638: --- Fix Version/s: (was: 1.1.0) 1.2.0 > Inconsistency in consumer group related ACLs > > > Key: KAFKA-5638 > URL: https://issues.apache.org/jira/browse/KAFKA-5638 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Minor > Labels: kip > Fix For: 1.2.0 > > > Users can see all groups in the cluster (using consumer group’s {{--list}} > option) provided that they have {{Describe}} access to the cluster. It would > make more sense to modify that experience and limit what is listed in the > output to only those groups they have {{Describe}} access to. The reason is, > almost everything else is accessible by a user only if the access is > specifically granted (through ACL {{--add}}); and this scenario should not be > an exception. The potential change would be updating the minimum required > permission of {{ListGroup}} from {{Describe (Cluster)}} to {{Describe > (Group)}}. > We can also look at this issue from a different angle: A user with {{Read}} > access to a group can describe the group, but the same user would not see > anything when listing groups (assuming there is no {{Describe}} access to the > cluster). It makes more sense for this user to be able to list all groups > s/he can already describe. > It would be great to know if any user is relying on the existing behavior > (listing all consumer groups using a {{Describe (Cluster)}} ACL). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6254) Introduce Incremental FetchRequests to Increase Partition Scalability
[ https://issues.apache.org/jira/browse/KAFKA-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352719#comment-16352719 ] Jun Rao commented on KAFKA-6254: Some perf results from [~afalko]. Without patch @46k partitions: consume latency- 4 ms; commit latency- 46 ms With patch [@68K|https://github.com/68k]: consume - 2 ms; commit - 33 ms With patch @46k: consume - 2 ms; commit - 21 ms The improvement of commit is due to improvement in replicating the offset topic. > Introduce Incremental FetchRequests to Increase Partition Scalability > - > > Key: KAFKA-6254 > URL: https://issues.apache.org/jira/browse/KAFKA-6254 > Project: Kafka > Issue Type: Improvement >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > Fix For: 1.1.0 > > > Introduce Incremental FetchRequests to Increase Partition Scalability. See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6254) Introduce Incremental FetchRequests to Increase Partition Scalability
[ https://issues.apache.org/jira/browse/KAFKA-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-6254. Resolution: Fixed Fix Version/s: 1.1.0 The PR is merged. > Introduce Incremental FetchRequests to Increase Partition Scalability > - > > Key: KAFKA-6254 > URL: https://issues.apache.org/jira/browse/KAFKA-6254 > Project: Kafka > Issue Type: Improvement >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > Fix For: 1.1.0 > > > Introduce Incremental FetchRequests to Increase Partition Scalability. See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6533) Kafka log cleaner stopped due to "cannot allocate memory" error
Law created KAFKA-6533: -- Summary: Kafka log cleaner stopped due to "cannot allocate memory" error Key: KAFKA-6533 URL: https://issues.apache.org/jira/browse/KAFKA-6533 Project: Kafka Issue Type: Bug Affects Versions: 0.10.2.0 Reporter: Law Hi, I am on Kafka 0.10.2.0 and have an issue where the log cleaner is running okay but suddenly stops because of a "cannot allocate memory" error. Here is the error from log-cleaner.log file: [2018-02-04 02:57:41,343] INFO [kafka-log-cleaner-thread-0], Log cleaner thread 0 cleaned log __consumer_offsets-35 (dirty section = [31740820448, 31740820448]) 100.1 MB of log processed in 1.5 seconds (67.5 MB/sec). Indexed 100.0 MB in 0.8 seconds (131.8 Mb/sec, 51.2% of total time) Buffer utilization: 0.0% Cleaned 100.1 MB in 0.7 seconds (138.2 Mb/sec, 48.8% of total time) Start size: 100.1 MB (771,501 messages) End size: 0.1 MB (501 messages) 99.9% size reduction (99.9% fewer messages) (kafka.log.LogCleaner) [2018-02-04 02:57:41,348] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-15. (kafka.log.LogCleaner) [2018-02-04 02:57:41,348] INFO Cleaner 0: Building offset map for __consumer_offsets-15... (kafka.log.LogCleaner) [2018-02-04 02:57:41,359] INFO Cleaner 0: Building offset map for log __consumer_offsets-15 for 1 segments in offset range [19492717509, 19493524087). (kafka.log.LogCleaner) [2018-02-04 02:57:42,067] INFO Cleaner 0: Offset map for log __consumer_offsets-15 complete. (kafka.log.LogCleaner) [2018-02-04 02:57:42,067] INFO Cleaner 0: Cleaning log __consumer_offsets-15 (cleaning prior to Sun Feb 04 02:57:34 GMT 2018, discarding tombstones prior to Sat Feb 03 02:53:31 GMT 2018)... (k [2018-02-04 02:57:42,068] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-15 (largest timestamp Sat Sep 02 15:26:15 GMT 2017) into 0, discarding deletes. (kafka.log.LogCleaner) [2018-02-04 02:57:42,078] INFO Cleaner 0: Swapping in cleaned segment 0 for segment(s) 0 in log __consumer_offsets-15. (kafka.log.LogCleaner) [2018-02-04 02:57:42,078] INFO Cleaner 0: Cleaning segment 2148231985 in log __consumer_offsets-15 (largest timestamp Thu Sep 28 15:50:19 GMT 2017) into 2148231985, discarding deletes. (kafka. [2018-02-04 02:57:42,080] INFO Cleaner 0: Swapping in cleaned segment 2148231985 for segment(s) 2148231985 in log __consumer_offsets-15. (kafka.log.LogCleaner) [2018-02-04 02:57:42,081] INFO Cleaner 0: Cleaning segment 4296532622 in log __consumer_offsets-15 (largest timestamp Tue Oct 24 10:33:20 GMT 2017) into 4296532622, discarding deletes. (kafka. [2018-02-04 02:57:42,083] INFO Cleaner 0: Swapping in cleaned segment 4296532622 for segment(s) 4296532622 in log __consumer_offsets-15. (kafka.log.LogCleaner) [2018-02-04 02:57:42,083] INFO Cleaner 0: Cleaning segment 6444525822 in log __consumer_offsets-15 (largest timestamp Mon Nov 20 11:33:30 GMT 2017) into 6444525822, discarding deletes. (kafka. [2018-02-04 02:57:42,085] INFO Cleaner 0: Swapping in cleaned segment 6444525822 for segment(s) 6444525822 in log __consumer_offsets-15. (kafka.log.LogCleaner) [2018-02-04 02:57:42,086] INFO Cleaner 0: Cleaning segment 8592045249 in log __consumer_offsets-15 (largest timestamp Sat Dec 16 06:35:53 GMT 2017) into 8592045249, discarding deletes. (kafka. [2018-02-04 02:57:42,088] INFO Cleaner 0: Swapping in cleaned segment 8592045249 for segment(s) 8592045249 in log __consumer_offsets-15. (kafka.log.LogCleaner) [2018-02-04 02:57:42,088] INFO Cleaner 0: Cleaning segment 10739582585 in log __consumer_offsets-15 (largest timestamp Wed Dec 27 21:15:44 GMT 2017) into 10739582585, discarding deletes. (kafk [2018-02-04 02:57:42,091] INFO Cleaner 0: Swapping in cleaned segment 10739582585 for segment(s) 10739582585 in log __consumer_offsets-15. (kafka.log.LogCleaner) [2018-02-04 02:57:42,096] ERROR [kafka-log-cleaner-thread-0], Error due to (kafka.log.LogCleaner) java.io.FileNotFoundException: /kafka/broker1-logs/__consumer_offsets-15/012887210320.log.cleaned (Cannot allocate memory) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.(RandomAccessFile.java:243) at org.apache.kafka.common.record.FileRecords.openChannel(FileRecords.java:428) at org.apache.kafka.common.record.FileRecords.open(FileRecords.java:384) at org.apache.kafka.common.record.FileRecords.open(FileRecords.java:393) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:394) at kafka.log.Cleaner.$anonfun$clean$6(LogCleaner.scala:363) at kafka.log.Cleaner.$anonfun$clean$6$adapted(LogCleaner.scala:362) at scala.collection.immutable.List.foreach(List.scala:378) at kafka.log.Cleaner.clean(LogCleaner.scala:362) at kafka.l
[jira] [Resolved] (KAFKA-6253) Improve sink connector topic regex validation
[ https://issues.apache.org/jira/browse/KAFKA-6253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-6253. -- Resolution: Fixed Reviewer: Ewen Cheslack-Postava Fix Version/s: 1.2.0 > Improve sink connector topic regex validation > - > > Key: KAFKA-6253 > URL: https://issues.apache.org/jira/browse/KAFKA-6253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Jeff Klukas >Priority: Major > Fix For: 1.1.0, 1.2.0 > > > KAFKA-3073 adds topic regex support for sink connectors. The addition > requires that you only specify one of topics or topics.regex settings. This > is being validated in one place, but not during submission of connectors. We > should improve this since this means it's possible to get a bad connector > config into the config topic. > For more detailed discussion, see > https://github.com/apache/kafka/pull/4151#pullrequestreview-77300221 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6253) Improve sink connector topic regex validation
[ https://issues.apache.org/jira/browse/KAFKA-6253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352691#comment-16352691 ] ASF GitHub Bot commented on KAFKA-6253: --- ewencp closed pull request #4251: KAFKA-6253: Improve sink connector topic regex validation URL: https://github.com/apache/kafka/pull/4251 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 02465c922f1..b913f9ed65e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -256,9 +256,13 @@ public ConfigInfos validateConnectorConfig(Map connectorProps) { Connector connector = getConnector(connType); ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector); try { -ConfigDef baseConfigDef = (connector instanceof SourceConnector) -? SourceConnectorConfig.configDef() -: SinkConnectorConfig.configDef(); +ConfigDef baseConfigDef; +if (connector instanceof SourceConnector) { +baseConfigDef = SourceConnectorConfig.configDef(); +} else { +baseConfigDef = SinkConnectorConfig.configDef(); +SinkConnectorConfig.validate(connectorProps); +} ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef, connectorProps, false); Map validatedConnectorConfig = validateBasicConnectorConfig( connector, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java index cf5564c25c2..887a4da2dea 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.transforms.util.RegexValidator; @@ -34,7 +35,7 @@ public static final String TOPICS_DEFAULT = ""; private static final String TOPICS_DISPLAY = "Topics"; -private static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG; +public static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG; private static final String TOPICS_REGEX_DOC = "Regular expression giving topics to consume. " + "Under the hood, the regex is compiled to a java.util.regex.Pattern. " + "Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " should be specified."; @@ -52,4 +53,34 @@ public static ConfigDef configDef() { public SinkConnectorConfig(Plugins plugins, Map props) { super(plugins, config, props); } + +/** + * Throw an exception if the passed-in properties do not constitute a valid sink. + * @param props sink configuration properties + */ +public static void validate(Map props) { +final boolean hasTopicsConfig = hasTopicsConfig(props); +final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props); + +if (hasTopicsConfig && hasTopicsRegexConfig) { +throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG + +" are mutually exclusive options, but both are set."); +} + +if (!hasTopicsConfig && !hasTopicsRegexConfig) { +throw new ConfigException("Must configure one of " + +SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG); +} +} + +public static boolean hasTopicsConfig(Map props) { +String topicsStr = props.get(TOPICS_CONFIG); +return topicsStr != null && !topicsStr.trim().isEmpty(); +} + +public static boolean hasTopicsRegexConfig(Map props) { +String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG); +return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty(); +} + } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index 9b934f3428a..6
[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress
[ https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352667#comment-16352667 ] Jun Rao commented on KAFKA-6469: [~ambroff], thanks for reporting this. Is the # of children in the isr_change path a problem? Currently, each broker batches the isr changes when writing to ZK. So, the number of child nodes under isr_change should be proportional to the # brokers. If one follows the best practice by waiting for all replicas to be in sync before restarting the next broker, there should only be one child node in the isr_change typically. > ISR change notification queue can prevent controller from making progress > - > > Key: KAFKA-6469 > URL: https://issues.apache.org/jira/browse/KAFKA-6469 > Project: Kafka > Issue Type: Bug >Reporter: Kyle Ambroff-Kao >Assignee: Kyle Ambroff-Kao >Priority: Major > > When the writes /isr_change_notification in ZooKeeper (which is effectively a > queue of ISR change events for the controller) happen at a rate high enough > that the node with a watch can't dequeue them, the trouble starts. > The watcher kafka.controller.IsrChangeNotificationListener is fired in the > controller when a new entry is written to /isr_change_notification, and the > zkclient library sends a GetChildrenRequest to zookeeper to fetch all child > znodes. > We've failures in one of our test clusters as the partition count started to > climb north of 60k per broker. We had brokers writing child nodes under > /isr_change_notification that were larger than the jute.maxbuffer size in > ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's > session, effectively bricking the cluster. > This can be partially mitigated by chunking ISR notifications to increase the > maximum number of partitions a broker can host. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6312) Add documentation about kafka-consumer-groups.sh's ability to set/change offsets
[ https://issues.apache.org/jira/browse/KAFKA-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352573#comment-16352573 ] ASF GitHub Bot commented on KAFKA-6312: --- tankhiwale opened a new pull request #4527: KAFKA-6312 Add documentation about kafka-consumer-groups.sh's ability… URL: https://github.com/apache/kafka/pull/4527 … to set/change offsets KIP-122 added the ability for kafka-consumer-groups.sh to reset/change consumer offsets, at a fine grained level. There is documentation on it in the kafka-consumer-groups.sh usage text. There is no such documentation on the kafka.apache.org website. We should add some documentation to the website, so that users can read about the functionality without having the tools installed. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add documentation about kafka-consumer-groups.sh's ability to set/change > offsets > > > Key: KAFKA-6312 > URL: https://issues.apache.org/jira/browse/KAFKA-6312 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: James Cheng >Assignee: Mayank Tankhiwale >Priority: Major > Labels: newbie > > KIP-122 added the ability for kafka-consumer-groups.sh to reset/change > consumer offsets, at a fine grained level. > There is documentation on it in the kafka-consumer-groups.sh usage text. > There is no such documentation on the kafka.apache.org website. We should add > some documentation to the website, so that users can read about the > functionality without having the tools installed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6312) Add documentation about kafka-consumer-groups.sh's ability to set/change offsets
[ https://issues.apache.org/jira/browse/KAFKA-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352551#comment-16352551 ] ASF GitHub Bot commented on KAFKA-6312: --- tankhiwale closed pull request #4496: KAFKA-6312: Update website documentation for --reset-offsets option, … URL: https://github.com/apache/kafka/pull/4496 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add documentation about kafka-consumer-groups.sh's ability to set/change > offsets > > > Key: KAFKA-6312 > URL: https://issues.apache.org/jira/browse/KAFKA-6312 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: James Cheng >Assignee: Mayank Tankhiwale >Priority: Major > Labels: newbie > > KIP-122 added the ability for kafka-consumer-groups.sh to reset/change > consumer offsets, at a fine grained level. > There is documentation on it in the kafka-consumer-groups.sh usage text. > There is no such documentation on the kafka.apache.org website. We should add > some documentation to the website, so that users can read about the > functionality without having the tools installed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6528) Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize
[ https://issues.apache.org/jira/browse/KAFKA-6528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352449#comment-16352449 ] ASF GitHub Bot commented on KAFKA-6528: --- rajinisivaram opened a new pull request #4526: KAFKA-6528: Fix transient test failure in testThreadPoolResize URL: https://github.com/apache/kafka/pull/4526 Add locking to access `AbstractFetcherThread#partitionStates` during dynamic thread update. Also make testing of thread updates that trigger retries more resilient. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize > -- > > Key: KAFKA-6528 > URL: https://issues.apache.org/jira/browse/KAFKA-6528 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Rajini Sivaram >Priority: Major > > {code:java} > java.lang.AssertionError: expected:<108> but was:<123> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > kafka.server.DynamicBrokerReconfigurationTest.stopAndVerifyProduceConsume(DynamicBrokerReconfigurationTest.scala:755) > at > kafka.server.DynamicBrokerReconfigurationTest.verifyThreadPoolResize$1(DynamicBrokerReconfigurationTest.scala:443) > at > kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:451){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6528) Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize
[ https://issues.apache.org/jira/browse/KAFKA-6528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram reassigned KAFKA-6528: - Assignee: Rajini Sivaram > Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize > -- > > Key: KAFKA-6528 > URL: https://issues.apache.org/jira/browse/KAFKA-6528 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Rajini Sivaram >Priority: Major > > {code:java} > java.lang.AssertionError: expected:<108> but was:<123> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > kafka.server.DynamicBrokerReconfigurationTest.stopAndVerifyProduceConsume(DynamicBrokerReconfigurationTest.scala:755) > at > kafka.server.DynamicBrokerReconfigurationTest.verifyThreadPoolResize$1(DynamicBrokerReconfigurationTest.scala:443) > at > kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:451){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
[ https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram reassigned KAFKA-6527: - Assignee: Rajini Sivaram > Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig > > > Key: KAFKA-6527 > URL: https://issues.apache.org/jira/browse/KAFKA-6527 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Rajini Sivaram >Priority: Major > > {code:java} > java.lang.AssertionError: Log segment size increase not applied > at kafka.utils.TestUtils$.fail(TestUtils.scala:355) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865) > at > kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6532) Delegation token internals should not impact public interfaces
[ https://issues.apache.org/jira/browse/KAFKA-6532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352293#comment-16352293 ] ASF GitHub Bot commented on KAFKA-6532: --- rajinisivaram opened a new pull request #4524: KAFKA-6532: Reduce impact of delegation tokens on public interfaces URL: https://github.com/apache/kafka/pull/4524 Keep delegation token implementation internal without exposing implementation details to pluggable classes: 1. KafkaPrincipal#tokenAuthenticated must always be set by SaslServerAuthenticator so that custom PrincipalBuilders cannot override. 2. Replace o.a.k.c.security.scram.DelegationTokenAuthenticationCallback with a more generic ScramExtensionsCallback that can be used to add more extensions in future. 3. Separate out ScramCredentialCallback (KIP-86 makes this a public interface) from delegation token credential callback (which is internal). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Delegation token internals should not impact public interfaces > -- > > Key: KAFKA-6532 > URL: https://issues.apache.org/jira/browse/KAFKA-6532 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > > We need to make sure that code related to the internal delegation tokens > implementation doesn't have any impact on public interfaces, including > customizable callback handlers from KIP-86. > # KafkaPrincipal has a public _tokenAuthenticated()_ method. Principal > builders are configurable and we now expect custom principal builders to set > this value. Since we allow the same endpoint to be used for basic SCRAM and > delegation tokens, the configured principal builder needs a way of detecting > token authentication. Default principal builder does this using internal > SCRAM implementation code. It will be better if configurable principal > builders didn't have to set this flag at all. > # It will be better to replace > _o.a.k.c.security.scram.DelegationTokenAuthenticationCallback_ with a more > generic _ScramExtensionsCallback_. This will allow us to add more extensions > in future and it will also enable custom Scram extensions. > # _ScramCredentialCallback_ was extended to add _tokenOwner_ and mechanism. > Mechanism is determined during SASL handshake and shouldn't be configurable > in a callback handler. _ScramCredentialCallback_ is being made a public > interface in KIP-86 with configurable callback handlers. Since delegation > token implementation is internal and not extensible, _tokenOwner_ should be > in a delegation-token-specific callback. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6532) Delegation token internals should not impact public interfaces
Rajini Sivaram created KAFKA-6532: - Summary: Delegation token internals should not impact public interfaces Key: KAFKA-6532 URL: https://issues.apache.org/jira/browse/KAFKA-6532 Project: Kafka Issue Type: Bug Components: core Reporter: Rajini Sivaram Assignee: Rajini Sivaram We need to make sure that code related to the internal delegation tokens implementation doesn't have any impact on public interfaces, including customizable callback handlers from KIP-86. # KafkaPrincipal has a public _tokenAuthenticated()_ method. Principal builders are configurable and we now expect custom principal builders to set this value. Since we allow the same endpoint to be used for basic SCRAM and delegation tokens, the configured principal builder needs a way of detecting token authentication. Default principal builder does this using internal SCRAM implementation code. It will be better if configurable principal builders didn't have to set this flag at all. # It will be better to replace _o.a.k.c.security.scram.DelegationTokenAuthenticationCallback_ with a more generic _ScramExtensionsCallback_. This will allow us to add more extensions in future and it will also enable custom Scram extensions. # _ScramCredentialCallback_ was extended to add _tokenOwner_ and mechanism. Mechanism is determined during SASL handshake and shouldn't be configurable in a callback handler. _ScramCredentialCallback_ is being made a public interface in KIP-86 with configurable callback handlers. Since delegation token implementation is internal and not extensible, _tokenOwner_ should be in a delegation-token-specific callback. -- This message was sent by Atlassian JIRA (v7.6.3#76005)