[jira] [Commented] (KAFKA-7309) Upgrade Jacoco for Java 11 support
[ https://issues.apache.org/jira/browse/KAFKA-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592468#comment-16592468 ] ASF GitHub Bot commented on KAFKA-7309: --- ijuma opened a new pull request #5568: KAFKA-7309: Upgrade Jacoco for Java 11 support URL: https://github.com/apache/kafka/pull/5568 Jacoco 0.8.2 adds Java 11 support: https://github.com/jacoco/jacoco/releases/tag/v0.8.2 Java 11 RC1 is out so it would be good for us to get a working CI build. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Upgrade Jacoco for Java 11 support > -- > > Key: KAFKA-7309 > URL: https://issues.apache.org/jira/browse/KAFKA-7309 > Project: Kafka > Issue Type: Sub-task > Components: packaging >Reporter: Ismael Juma >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592383#comment-16592383 ] Ayushi edited comment on KAFKA-5998 at 8/25/18 2:32 AM: [~guozhang] [~mjsax] I ran into this issue after running it for 1-2 hours for the shared application. Load of around 8000 messages/sec. Also, it is not restricted to some particular partitions. There are 50 partitions. As a hack, I could set STATE_CLEANUP_DELAY_MS_CONFIG to a large no but I'm afraid it could cause OOM issues. was (Author: ayushi0430): [~guozhang] I ran into this issue after running it for 1-2 hours for the shared application. As a hack, I could set STATE_CLEANUP_DELAY_MS_CONFIG to a large no but I'm afraid it could cause OOM issues. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Yogesh BG >Priority: Major > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > properties.txt, properties.txt, props.txt, streams.txt, streamsSnippet.txt, > streamsSnippet.txt, topology.txt, topology.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592383#comment-16592383 ] Ayushi commented on KAFKA-5998: --- [~guozhang] I ran into this issue after running it for 1-2 hours for the shared application. As a hack, I could set STATE_CLEANUP_DELAY_MS_CONFIG to a large no but I'm afraid it could cause OOM issues. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Yogesh BG >Priority: Major > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > properties.txt, properties.txt, props.txt, streams.txt, streamsSnippet.txt, > streamsSnippet.txt, topology.txt, topology.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression
[ https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592262#comment-16592262 ] ASF GitHub Bot commented on KAFKA-7223: --- vvcephei opened a new pull request #5567: KAFKA-7223: Suppress API with only immediate emit URL: https://github.com/apache/kafka/pull/5567 Part 1 of the suppression API. * add the DSL `suppress` method and config objects * add the processor, but only in "identity" mode (i.e., it will forward only if the suppression spec says to forward immediately) * add tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-328: Add in-memory Suppression > -- > > Key: KAFKA-7223 > URL: https://issues.apache.org/jira/browse/KAFKA-7223 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.] > > This ticket is to implement Suppress, but only for in-memory buffers. > (depends on KAFKA-7222) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7215) Improve LogCleaner behavior on error
[ https://issues.apache.org/jira/browse/KAFKA-7215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592162#comment-16592162 ] Andrew Olson commented on KAFKA-7215: - Agree this would be helpful. We have run into a few instances of the cleaner thread dying and logs growing indefinitely until broker restart. Would the offending topic-partition be immediately blacklisted? If so that would still allow the same problem to occur especially for a high volume topic such as __consumer_offsets, might give it some reasonable number of retry attempts. > Improve LogCleaner behavior on error > > > Key: KAFKA-7215 > URL: https://issues.apache.org/jira/browse/KAFKA-7215 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > For more detailed information see > [KIP-346|https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592030#comment-16592030 ] Guozhang Wang commented on KAFKA-7240: -- Thanks for your contribution [~slendle]!! > -total metrics in Streams are incorrect > --- > > Key: KAFKA-7240 > URL: https://issues.apache.org/jira/browse/KAFKA-7240 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Affects Versions: 2.0.0 >Reporter: Sam Lendle >Assignee: Sam Lendle >Priority: Major > Fix For: 2.1.0 > > > I noticed the values of total metrics for streams were decreasing > periodically when viewed in JMX, for example process-total for each > processor-node-id under stream-processor-node-metrics. > Edit: For processor node metrics, I should have been looking at > ProcessorNode, not StreamsMetricsThreadImpl. > -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to > using Count() as the Stat for the *-total metrics. Count() is a SampledStat, > so the value it reports is the count in recent time windows, and the value > decreases whenever a window is purged.- > > -This explains the behavior I saw, but I think the issue is deeper. For > example, processTimeSensor attempts to measure, process-latency-avg, > process-latency-max, process-rate, and process-total. For that sensor, record > is called like- > -streamsMetrics.processTimeSensor.record(computeLatency() / (double) > processed, timerStartedMs);- > -so the value passed to record is average latency per processed message in > this batch if I understand correctly. That gets pushed through to the call to > Count#record, which increments it's count by 1, ignoring the value parameter. > Whatever stat is recording the total would need to know is the number of > messages processed. Because of that, I don't think it's possible for one > Sensor to measure both latency and total.- > -That said, it's not clear to me how all the different Stats work and how > exactly Sensors work, and I don't actually understand how the process-rate > metric is working for similar reasons but that seems to be correct, so I may > be missing something here.- > > cc [~guozhang] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7240) -total metrics in Streams are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-7240: - Fix Version/s: 2.1.0 > -total metrics in Streams are incorrect > --- > > Key: KAFKA-7240 > URL: https://issues.apache.org/jira/browse/KAFKA-7240 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Affects Versions: 2.0.0 >Reporter: Sam Lendle >Assignee: Sam Lendle >Priority: Major > Fix For: 2.1.0 > > > I noticed the values of total metrics for streams were decreasing > periodically when viewed in JMX, for example process-total for each > processor-node-id under stream-processor-node-metrics. > Edit: For processor node metrics, I should have been looking at > ProcessorNode, not StreamsMetricsThreadImpl. > -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to > using Count() as the Stat for the *-total metrics. Count() is a SampledStat, > so the value it reports is the count in recent time windows, and the value > decreases whenever a window is purged.- > > -This explains the behavior I saw, but I think the issue is deeper. For > example, processTimeSensor attempts to measure, process-latency-avg, > process-latency-max, process-rate, and process-total. For that sensor, record > is called like- > -streamsMetrics.processTimeSensor.record(computeLatency() / (double) > processed, timerStartedMs);- > -so the value passed to record is average latency per processed message in > this batch if I understand correctly. That gets pushed through to the call to > Count#record, which increments it's count by 1, ignoring the value parameter. > Whatever stat is recording the total would need to know is the number of > messages processed. Because of that, I don't think it's possible for one > Sensor to measure both latency and total.- > -That said, it's not clear to me how all the different Stats work and how > exactly Sensors work, and I don't actually understand how the process-rate > metric is working for similar reasons but that seems to be correct, so I may > be missing something here.- > > cc [~guozhang] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592027#comment-16592027 ] ASF GitHub Bot commented on KAFKA-7240: --- guozhangwang closed pull request #5467: KAFKA-7240: -total metrics in Streams are incorrect URL: https://github.com/apache/kafka/pull/5467 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 79df5d158a1..7f3d31fd776 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -109,7 +110,7 @@ ); parent.add( new MetricName("commit-total", group, "The total number of occurrence of commit operations.", allTagMap), -new Count() +new CumulativeCount() ); // add the operation metrics with additional tags @@ -129,7 +130,7 @@ ); taskCommitTimeSensor.add( new MetricName("commit-total", group, "The total number of occurrence of commit operations.", tagMap), -new Count() +new CumulativeCount() ); // add the metrics for enforced processing @@ -140,7 +141,7 @@ ); taskEnforcedProcessSensor.add( new MetricName("enforced-process-total", group, "The total number of occurrence of enforced-process operations.", tagMap), -new Count() +new CumulativeCount() ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index efd94eaf637..28cedbe4197 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -44,6 +44,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; @@ -437,7 +438,7 @@ StreamTask createTask(final Consumer consumer, cache, time, () -> createProducer(taskId), -streamsMetrics.tasksClosedSensor); +streamsMetrics.taskClosedSensor); } private Producer createProducer(final TaskId id) { @@ -518,7 +519,7 @@ StandbyTask createTask(final Consumer consumer, private final Sensor processTimeSensor; private final Sensor punctuateTimeSensor; private final Sensor taskCreatedSensor; -private final Sensor tasksClosedSensor; +private final Sensor taskClosedSensor; StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) { super(metrics, threadName); @@ -532,7 +533,7 @@ StandbyTask createTask(final Consumer consumer, addAvgMaxLatency(pollTimeSensor, group, tagMap(), "poll"); // can't use addInvocationRateAndCount due to non-standard description string pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second number of record-poll calls", tagMap()), new Rate(TimeUnit.SECONDS, new Count())); -pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tagMap()), new Count()); +pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tagMap()), new CumulativeCount()); processTimeSensor = threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO); addAvgMaxLatency(processTimeSensor, group, tagMap(), "process"); @@ -546,9 +547,9 @@
[jira] [Commented] (KAFKA-7335) Store clusterId locally to ensure broker joins the right cluster
[ https://issues.apache.org/jira/browse/KAFKA-7335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591871#comment-16591871 ] Jason Gustafson commented on KAFKA-7335: [~badai] That is one possibility. I was thinking it would be advantageous to have the cluster identity stored in the log directory with the data since it is less likely to be mistakenly altered. A server config for the clusterId could be incorrectly specified just like the zookeeper connect string, so I'm not sure there's a strong advantage to having one. I think it's the association of the data with the cluster that we need to protect. > Store clusterId locally to ensure broker joins the right cluster > > > Key: KAFKA-7335 > URL: https://issues.apache.org/jira/browse/KAFKA-7335 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > We have seen situations where a broker somehow got the wrong configuration > and joined a different cluster than the one it was previously registered in. > This can create various kinds of metadata inconsistencies in the cluster and > can be difficult to debug. It was suggested in > [KIP-78|https://cwiki.apache.org/confluence/display/KAFKA/KIP-78%3A+Cluster+Id] > that we could store the clusterId locally after initial registration and > verify upon startup that the locally stored value matches what is in > zookeeper. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591870#comment-16591870 ] Ayushi commented on KAFKA-5998: --- [~guozhang] This issue is there even after changing state dir to a persistent location. [^exc.txt] > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Yogesh BG >Priority: Major > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > properties.txt, properties.txt, props.txt, streams.txt, streamsSnippet.txt, > streamsSnippet.txt, topology.txt, topology.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) >
[jira] [Updated] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ayushi updated KAFKA-5998: -- Attachment: exc.txt > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Yogesh BG >Priority: Major > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > properties.txt, properties.txt, props.txt, streams.txt, streamsSnippet.txt, > streamsSnippet.txt, topology.txt, topology.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Updated] (KAFKA-7337) Enhance Producer Performance tool to generate keys
[ https://issues.apache.org/jira/browse/KAFKA-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zenifer Cheruveettil updated KAFKA-7337: Priority: Major (was: Minor) > Enhance Producer Performance tool to generate keys > -- > > Key: KAFKA-7337 > URL: https://issues.apache.org/jira/browse/KAFKA-7337 > Project: Kafka > Issue Type: Improvement > Components: tools >Affects Versions: 1.0.0 >Reporter: Zenifer Cheruveettil >Priority: Major > > {{kafka-producer-perf-test.sh}} cannot generate messages with keys. It would > be helpful to have the option to generate messages with keys, especially when > using this tool together with applications such as Kafka streams applications > which expect messages to have a key. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7337) Enhance Producer Performance tool to generate keys
Zenifer Cheruveettil created KAFKA-7337: --- Summary: Enhance Producer Performance tool to generate keys Key: KAFKA-7337 URL: https://issues.apache.org/jira/browse/KAFKA-7337 Project: Kafka Issue Type: Improvement Components: tools Affects Versions: 1.0.0 Reporter: Zenifer Cheruveettil {{kafka-producer-perf-test.sh}} cannot generate messages with keys. It would be helpful to have the option to generate messages with keys, especially when using this tool together with applications such as Kafka streams applications which expect messages to have a key. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5975) No response when deleting topics and delete.topic.enable=false
[ https://issues.apache.org/jira/browse/KAFKA-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591815#comment-16591815 ] ASF GitHub Bot commented on KAFKA-5975: --- hachikuji closed pull request #3960: KAFKA-5975: No response when deleting topics and delete.topic.enable=false URL: https://github.com/apache/kafka/pull/3960 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java b/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java new file mode 100644 index 000..41577d2a288 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.errors; + +public class TopicDeletionDisabledException extends ApiException { +private static final long serialVersionUID = 1L; + +public TopicDeletionDisabledException() { +} + +public TopicDeletionDisabledException(String message) { +super(message); +} +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 090dca32651..d4610602d1e 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -80,6 +80,7 @@ import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.TopicDeletionDisabledException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; @@ -110,7 +111,7 @@ * Do not add exceptions that occur only on the client or only on the server here. */ public enum Errors { -UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request", +UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request,", UnknownServerException::new), NONE(0, null, message -> null), OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of offsets maintained by the server.", @@ -129,7 +130,7 @@ TimeoutException::new), BROKER_NOT_AVAILABLE(8, "The broker is not available.", BrokerNotAvailableException::new), -REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition", +REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition.", ReplicaNotAvailableException::new), MESSAGE_TOO_LARGE(10, "The request included a message larger than the max message size the server will accept.", RecordTooLargeException::new), @@ -161,7 +162,7 @@ "The group member's supported protocols are incompatible with those of existing members" + " or first group member tried to join with empty protocol type or empty protocol list.", InconsistentGroupProtocolException::new), -INVALID_GROUP_ID(24, "The configured groupId is invalid", +INVALID_GROUP_ID(24, "The configured groupId is invalid.", InvalidGroupIdException::new), UNKNOWN_MEMBER_ID(25, "The coordinator is not aware of this member.", UnknownMemberIdException::new), @@ -171,7 +172,7 @@ InvalidSessionTimeoutException::new), REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is needed.", RebalanceInProgressException::new), -INVALID_COMMIT_OFFSET_SIZE(28, "The committing offset data size is not valid", +
[jira] [Commented] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID
[ https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591643#comment-16591643 ] Jonathan Santilli commented on KAFKA-7165: -- Done [~omkreddy], let's wait, thanks! > Error while creating ephemeral at /brokers/ids/BROKER_ID > > > Key: KAFKA-7165 > URL: https://issues.apache.org/jira/browse/KAFKA-7165 > Project: Kafka > Issue Type: Bug > Components: core, zkclient >Affects Versions: 1.1.0 >Reporter: Jonathan Santilli >Priority: Major > > Kafka version: 1.1.0 > Zookeeper version: 3.4.12 > 4 Kafka Brokers > 4 Zookeeper servers > > In one of the 4 brokers of the cluster, we detect the following error: > [2018-07-14 04:38:23,784] INFO Unable to read additional data from server > sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:24,509] INFO Opening socket connection to server > *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL > (unknown error) (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:24,510] INFO Socket connection established to > *ZOOKEEPER_SERVER_1:PORT*, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:24,513] INFO Unable to read additional data from server > sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:25,287] INFO Opening socket connection to server > *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL > (unknown error) (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:25,287] INFO Socket connection established to > *ZOOKEEPER_SERVER_2:PORT*, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* > broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition) > [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, > session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, > session 0x3000c2420cb458d has expired, closing socket connection > (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:26,445] INFO EventThread shut down for session: > 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. > (kafka.zookeeper.ZooKeeperClient) > [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session > to > *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*. > (kafka.zookeeper.ZooKeeperClient) > [2018-07-14 04:38:26,459] INFO Initiating client connection, > connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT* > sessionTimeout=6000 > watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 > (org.apache.zookeeper.ZooKeeper) > [2018-07-14 04:38:26,465] INFO Opening socket connection to server > *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL > (unknown error) (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:26,477] INFO Socket connection established to > *ZOOKEEPER_SERVER_1:PORT*, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:26,484] INFO Session establishment complete on server > *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout > = 6000 (org.apache.zookeeper.ClientCnxn) > [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? > false) (kafka.zk.KafkaZkClient) > [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes > (kafka.common.ZkNodeChangeNotificationListener) > *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at > /brokers/ids/1, node already exists and owner '216186131422332301' does not > match current session '288330817911521280' > (kafka.zk.KafkaZkClient$CheckedEphemeral)* > [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 > is: NODEEXISTS* (kafka.zk.KafkaZkClient) > [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task > 'isr-expiration' (kafka.utils.KafkaScheduler) > org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode > = Session expired for > /brokers/topics/*TOPIC_NAME*/partitions/*PARTITION-#*/state > at org.apache.zookeeper.KeeperException.create(KeeperException.java:127) > at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) > at kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:465) > at
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591460#comment-16591460 ] Russell Ferriday commented on KAFKA-2260: - This would enable full-on eventsourcing on Kafka, without having to restrict to single-thread designs. One example of a great (>250 github star) FOSS project being held back by this: [https://github.com/johnbywater/eventsourcing/issues/108] Can we see this soon? > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Ben Kirwin >Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7336) Kafka Connect source task hangs when producing record with invalid topic name
[ https://issues.apache.org/jira/browse/KAFKA-7336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-7336: -- Summary: Kafka Connect source task hangs when producing record with invalid topic name (was: Kafka Connect source task when producing record with invalid topic name) > Kafka Connect source task hangs when producing record with invalid topic name > - > > Key: KAFKA-7336 > URL: https://issues.apache.org/jira/browse/KAFKA-7336 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Gunnar Morling >Priority: Major > > If a Kafka Connect source task returns a {{SourceRecord}} with an invalid > topic name (e.g. "dbserver1.inventory.test@data"), that source task hangs > (presumably indefinitely?) and doesn't continue it's polling loop. The log is > flooded with this message: > {code} > connect_1| 2018-08-24 08:47:29,014 WARN || [Producer > clientId=producer-4] Error while fetching metadata with correlation id 833 : > {dbserver1.inventory.test@data=INVALID_TOPIC_EXCEPTION} > [org.apache.kafka.clients.NetworkClient] > {code} > The producer thread is stuck in the loop here: > {code} > KafkaProducer.waitOnMetadata(String, Integer, long) line: 938 > KafkaProducer.doSend(ProducerRecord, Callback) line: 823 > KafkaProducer.send(ProducerRecord, Callback) line: 803 > WorkerSourceTask.sendRecords() line: 318 > WorkerSourceTask.execute() line: 228 > WorkerSourceTask(WorkerTask).doRun() line: 175 > WorkerSourceTask(WorkerTask).run() line: 219 > Executors$RunnableAdapter.call() line: 511 > FutureTask.run() line: 266 > ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149 > ThreadPoolExecutor$Worker.run() line: 624 > Thread.run() line: 748 > {code} > This causes the task to remain in RUNNING state, but no further invocations > of {{poll()}} are done. > Of course we'll work around this and make sure to not produce records with > invalid topic names, but I think the source task should transition to FAILED > state in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7336) Kafka Connect source task when producing record with invalid topic name
Gunnar Morling created KAFKA-7336: - Summary: Kafka Connect source task when producing record with invalid topic name Key: KAFKA-7336 URL: https://issues.apache.org/jira/browse/KAFKA-7336 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.0.0 Reporter: Gunnar Morling If a Kafka Connect source task returns a {{SourceRecord}} with an invalid topic name (e.g. "dbserver1.inventory.test@data"), that source task hangs (presumably indefinitely?) and doesn't continue it's polling loop. The log is flooded with this message: {code} connect_1| 2018-08-24 08:47:29,014 WARN || [Producer clientId=producer-4] Error while fetching metadata with correlation id 833 : {dbserver1.inventory.test@data=INVALID_TOPIC_EXCEPTION} [org.apache.kafka.clients.NetworkClient] {code} The producer thread is stuck in the loop here: {code} KafkaProducer.waitOnMetadata(String, Integer, long) line: 938 KafkaProducer.doSend(ProducerRecord, Callback) line: 823 KafkaProducer.send(ProducerRecord, Callback) line: 803 WorkerSourceTask.sendRecords() line: 318 WorkerSourceTask.execute() line: 228 WorkerSourceTask(WorkerTask).doRun() line: 175 WorkerSourceTask(WorkerTask).run() line: 219 Executors$RunnableAdapter.call() line: 511 FutureTask.run() line: 266 ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149 ThreadPoolExecutor$Worker.run() line: 624 Thread.run() line: 748 {code} This causes the task to remain in RUNNING state, but no further invocations of {{poll()}} are done. Of course we'll work around this and make sure to not produce records with invalid topic names, but I think the source task should transition to FAILED state in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)