[jira] [Commented] (KAFKA-7309) Upgrade Jacoco for Java 11 support

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592468#comment-16592468
 ] 

ASF GitHub Bot commented on KAFKA-7309:
---

ijuma opened a new pull request #5568: KAFKA-7309: Upgrade Jacoco for Java 11 
support
URL: https://github.com/apache/kafka/pull/5568
 
 
   Jacoco 0.8.2 adds Java 11 support:
   
   https://github.com/jacoco/jacoco/releases/tag/v0.8.2
   
   Java 11 RC1 is out so it would be good for us to
   get a working CI build.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Upgrade Jacoco for Java 11 support
> --
>
> Key: KAFKA-7309
> URL: https://issues.apache.org/jira/browse/KAFKA-7309
> Project: Kafka
>  Issue Type: Sub-task
>  Components: packaging
>Reporter: Ismael Juma
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2018-08-24 Thread Ayushi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592383#comment-16592383
 ] 

Ayushi edited comment on KAFKA-5998 at 8/25/18 2:32 AM:


[~guozhang] [~mjsax] I ran into this issue after running it for 1-2 hours for 
the shared application.

Load of around 8000 messages/sec.

Also, it is not restricted to some particular partitions. There are 50 
partitions.

As a hack, I could set STATE_CLEANUP_DELAY_MS_CONFIG to a large no but I'm 
afraid it could cause OOM issues.


was (Author: ayushi0430):
[~guozhang] I ran into this issue after running it for 1-2 hours for the shared 
application.

As a hack, I could set STATE_CLEANUP_DELAY_MS_CONFIG to a large no but I'm 
afraid it could cause OOM issues.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> properties.txt, properties.txt, props.txt, streams.txt, streamsSnippet.txt, 
> streamsSnippet.txt, topology.txt, topology.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2018-08-24 Thread Ayushi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592383#comment-16592383
 ] 

Ayushi commented on KAFKA-5998:
---

[~guozhang] I ran into this issue after running it for 1-2 hours for the shared 
application.

As a hack, I could set STATE_CLEANUP_DELAY_MS_CONFIG to a large no but I'm 
afraid it could cause OOM issues.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> properties.txt, properties.txt, props.txt, streams.txt, streamsSnippet.txt, 
> streamsSnippet.txt, topology.txt, topology.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592262#comment-16592262
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---

vvcephei opened a new pull request #5567: KAFKA-7223: Suppress API with only 
immediate emit
URL: https://github.com/apache/kafka/pull/5567
 
 
   Part 1 of the suppression API.
   
   * add the DSL `suppress` method and config objects
   * add the processor, but only in "identity" mode
 (i.e., it will forward only if the suppression spec says to forward 
immediately)
   * add tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-328: Add in-memory Suppression
> --
>
> Key: KAFKA-7223
> URL: https://issues.apache.org/jira/browse/KAFKA-7223
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7215) Improve LogCleaner behavior on error

2018-08-24 Thread Andrew Olson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592162#comment-16592162
 ] 

Andrew Olson commented on KAFKA-7215:
-

Agree this would be helpful. We have run into a few instances of the cleaner 
thread dying and logs growing indefinitely until broker restart. Would the 
offending topic-partition be immediately blacklisted? If so that would still 
allow the same problem to occur especially for a high volume topic such as 
__consumer_offsets, might give it some reasonable number of retry attempts.

> Improve LogCleaner behavior on error
> 
>
> Key: KAFKA-7215
> URL: https://issues.apache.org/jira/browse/KAFKA-7215
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> For more detailed information see 
> [KIP-346|https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect

2018-08-24 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592030#comment-16592030
 ] 

Guozhang Wang commented on KAFKA-7240:
--

Thanks for your contribution [~slendle]!!

> -total metrics in Streams are incorrect
> ---
>
> Key: KAFKA-7240
> URL: https://issues.apache.org/jira/browse/KAFKA-7240
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Sam Lendle
>Assignee: Sam Lendle
>Priority: Major
> Fix For: 2.1.0
>
>
> I noticed the values of total metrics for streams were decreasing 
> periodically when viewed in JMX, for example process-total for each 
> processor-node-id under stream-processor-node-metrics. 
> Edit: For processor node metrics, I should have been looking at 
> ProcessorNode, not  StreamsMetricsThreadImpl.
>  -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to 
> using Count() as the Stat for the *-total metrics. Count() is a SampledStat, 
> so the value it reports is the count in recent time windows, and the value 
> decreases whenever a window is purged.-
> 
> -This explains the behavior I saw, but I think the issue is deeper. For 
> example, processTimeSensor attempts to measure, process-latency-avg, 
> process-latency-max, process-rate, and process-total. For that sensor, record 
> is called like-
> -streamsMetrics.processTimeSensor.record(computeLatency() / (double) 
> processed, timerStartedMs);-
>  -so the value passed to record is average latency per processed message in 
> this batch if I understand correctly. That gets pushed through to the call to 
> Count#record, which increments it's count by 1, ignoring the value parameter. 
> Whatever stat is recording the total would need to know is the number of 
> messages processed. Because of that, I don't think it's possible for one 
> Sensor to measure both latency and total.-
> -That said, it's not clear to me how all the different Stats work and how 
> exactly Sensors work, and I don't actually understand how the process-rate 
> metric is working for similar reasons but that seems to be correct, so I may 
> be missing something here.-
>   
> cc [~guozhang]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7240) -total metrics in Streams are incorrect

2018-08-24 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7240:
-
Fix Version/s: 2.1.0

> -total metrics in Streams are incorrect
> ---
>
> Key: KAFKA-7240
> URL: https://issues.apache.org/jira/browse/KAFKA-7240
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Sam Lendle
>Assignee: Sam Lendle
>Priority: Major
> Fix For: 2.1.0
>
>
> I noticed the values of total metrics for streams were decreasing 
> periodically when viewed in JMX, for example process-total for each 
> processor-node-id under stream-processor-node-metrics. 
> Edit: For processor node metrics, I should have been looking at 
> ProcessorNode, not  StreamsMetricsThreadImpl.
>  -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to 
> using Count() as the Stat for the *-total metrics. Count() is a SampledStat, 
> so the value it reports is the count in recent time windows, and the value 
> decreases whenever a window is purged.-
> 
> -This explains the behavior I saw, but I think the issue is deeper. For 
> example, processTimeSensor attempts to measure, process-latency-avg, 
> process-latency-max, process-rate, and process-total. For that sensor, record 
> is called like-
> -streamsMetrics.processTimeSensor.record(computeLatency() / (double) 
> processed, timerStartedMs);-
>  -so the value passed to record is average latency per processed message in 
> this batch if I understand correctly. That gets pushed through to the call to 
> Count#record, which increments it's count by 1, ignoring the value parameter. 
> Whatever stat is recording the total would need to know is the number of 
> messages processed. Because of that, I don't think it's possible for one 
> Sensor to measure both latency and total.-
> -That said, it's not clear to me how all the different Stats work and how 
> exactly Sensors work, and I don't actually understand how the process-rate 
> metric is working for similar reasons but that seems to be correct, so I may 
> be missing something here.-
>   
> cc [~guozhang]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592027#comment-16592027
 ] 

ASF GitHub Bot commented on KAFKA-7240:
---

guozhangwang closed pull request #5467: KAFKA-7240: -total metrics in Streams 
are incorrect
URL: https://github.com/apache/kafka/pull/5467
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 79df5d158a1..7f3d31fd776 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -41,6 +41,7 @@
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -109,7 +110,7 @@
 );
 parent.add(
 new MetricName("commit-total", group, "The total number of 
occurrence of commit operations.", allTagMap),
-new Count()
+new CumulativeCount()
 );
 
 // add the operation metrics with additional tags
@@ -129,7 +130,7 @@
 );
 taskCommitTimeSensor.add(
 new MetricName("commit-total", group, "The total number of 
occurrence of commit operations.", tagMap),
-new Count()
+new CumulativeCount()
 );
 
 // add the metrics for enforced processing
@@ -140,7 +141,7 @@
 );
 taskEnforcedProcessSensor.add(
 new MetricName("enforced-process-total", group, "The total 
number of occurrence of enforced-process operations.", tagMap),
-new Count()
+new CumulativeCount()
 );
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index efd94eaf637..28cedbe4197 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -44,6 +44,7 @@
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
@@ -437,7 +438,7 @@ StreamTask createTask(final Consumer 
consumer,
 cache,
 time,
 () -> createProducer(taskId),
-streamsMetrics.tasksClosedSensor);
+streamsMetrics.taskClosedSensor);
 }
 
 private Producer createProducer(final TaskId id) {
@@ -518,7 +519,7 @@ StandbyTask createTask(final Consumer 
consumer,
 private final Sensor processTimeSensor;
 private final Sensor punctuateTimeSensor;
 private final Sensor taskCreatedSensor;
-private final Sensor tasksClosedSensor;
+private final Sensor taskClosedSensor;
 
 StreamsMetricsThreadImpl(final Metrics metrics, final String 
threadName) {
 super(metrics, threadName);
@@ -532,7 +533,7 @@ StandbyTask createTask(final Consumer 
consumer,
 addAvgMaxLatency(pollTimeSensor, group, tagMap(), "poll");
 // can't use addInvocationRateAndCount due to non-standard 
description string
 pollTimeSensor.add(metrics.metricName("poll-rate", group, "The 
average per-second number of record-poll calls", tagMap()), new 
Rate(TimeUnit.SECONDS, new Count()));
-pollTimeSensor.add(metrics.metricName("poll-total", group, "The 
total number of record-poll calls", tagMap()), new Count());
+pollTimeSensor.add(metrics.metricName("poll-total", group, "The 
total number of record-poll calls", tagMap()), new CumulativeCount());
 
 processTimeSensor = threadLevelSensor("process-latency", 
Sensor.RecordingLevel.INFO);
 addAvgMaxLatency(processTimeSensor, group, tagMap(), "process");
@@ -546,9 +547,9 @@ 

[jira] [Commented] (KAFKA-7335) Store clusterId locally to ensure broker joins the right cluster

2018-08-24 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591871#comment-16591871
 ] 

Jason Gustafson commented on KAFKA-7335:


[~badai] That is one possibility. I was thinking it would be advantageous to 
have the cluster identity stored in the log directory with the data since it is 
less likely to be mistakenly altered. A server config for the clusterId could 
be incorrectly specified just like the zookeeper connect string, so I'm not 
sure there's a strong advantage to having one. I think it's the association of 
the data with the cluster that we need to protect. 

> Store clusterId locally to ensure broker joins the right cluster
> 
>
> Key: KAFKA-7335
> URL: https://issues.apache.org/jira/browse/KAFKA-7335
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> We have seen situations where a broker somehow got the wrong configuration 
> and joined a different cluster than the one it was previously registered in. 
> This can create various kinds of metadata inconsistencies in the cluster and 
> can be difficult to debug.  It was suggested in 
> [KIP-78|https://cwiki.apache.org/confluence/display/KAFKA/KIP-78%3A+Cluster+Id]
>  that we could store the clusterId locally after initial registration and 
> verify upon startup that the locally stored value matches what is in 
> zookeeper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2018-08-24 Thread Ayushi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591870#comment-16591870
 ] 

Ayushi commented on KAFKA-5998:
---

[~guozhang] This issue is there even after changing state dir to a persistent 
location.

[^exc.txt]

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> properties.txt, properties.txt, props.txt, streams.txt, streamsSnippet.txt, 
> streamsSnippet.txt, topology.txt, topology.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  

[jira] [Updated] (KAFKA-5998) /.checkpoint.tmp Not found exception

2018-08-24 Thread Ayushi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayushi updated KAFKA-5998:
--
Attachment: exc.txt

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> properties.txt, properties.txt, props.txt, streams.txt, streamsSnippet.txt, 
> streamsSnippet.txt, topology.txt, topology.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Updated] (KAFKA-7337) Enhance Producer Performance tool to generate keys

2018-08-24 Thread Zenifer Cheruveettil (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zenifer Cheruveettil updated KAFKA-7337:

Priority: Major  (was: Minor)

> Enhance Producer Performance tool to generate keys
> --
>
> Key: KAFKA-7337
> URL: https://issues.apache.org/jira/browse/KAFKA-7337
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 1.0.0
>Reporter: Zenifer Cheruveettil
>Priority: Major
>
> {{kafka-producer-perf-test.sh}} cannot generate messages with keys. It would 
> be helpful to have the option to generate messages with keys, especially when 
> using this tool together with applications such as Kafka streams applications 
> which expect messages to have a key.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7337) Enhance Producer Performance tool to generate keys

2018-08-24 Thread Zenifer Cheruveettil (JIRA)
Zenifer Cheruveettil created KAFKA-7337:
---

 Summary: Enhance Producer Performance tool to generate keys
 Key: KAFKA-7337
 URL: https://issues.apache.org/jira/browse/KAFKA-7337
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Affects Versions: 1.0.0
Reporter: Zenifer Cheruveettil


{{kafka-producer-perf-test.sh}} cannot generate messages with keys. It would be 
helpful to have the option to generate messages with keys, especially when 
using this tool together with applications such as Kafka streams applications 
which expect messages to have a key.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5975) No response when deleting topics and delete.topic.enable=false

2018-08-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591815#comment-16591815
 ] 

ASF GitHub Bot commented on KAFKA-5975:
---

hachikuji closed pull request #3960: KAFKA-5975: No response when deleting 
topics and delete.topic.enable=false
URL: https://github.com/apache/kafka/pull/3960
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java
new file mode 100644
index 000..41577d2a288
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class TopicDeletionDisabledException extends  ApiException {
+private static final long serialVersionUID = 1L;
+
+public TopicDeletionDisabledException() {
+}
+
+public TopicDeletionDisabledException(String message) {
+super(message);
+}
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 090dca32651..d4610602d1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -80,6 +80,7 @@
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
@@ -110,7 +111,7 @@
  * Do not add exceptions that occur only on the client or only on the server 
here.
  */
 public enum Errors {
-UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when 
processing the request",
+UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when 
processing the request,",
 UnknownServerException::new),
 NONE(0, null, message -> null),
 OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of 
offsets maintained by the server.",
@@ -129,7 +130,7 @@
 TimeoutException::new),
 BROKER_NOT_AVAILABLE(8, "The broker is not available.",
 BrokerNotAvailableException::new),
-REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested 
topic-partition",
+REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested 
topic-partition.",
 ReplicaNotAvailableException::new),
 MESSAGE_TOO_LARGE(10, "The request included a message larger than the max 
message size the server will accept.",
 RecordTooLargeException::new),
@@ -161,7 +162,7 @@
 "The group member's supported protocols are incompatible with 
those of existing members" +
 " or first group member tried to join with empty protocol type 
or empty protocol list.",
 InconsistentGroupProtocolException::new),
-INVALID_GROUP_ID(24, "The configured groupId is invalid",
+INVALID_GROUP_ID(24, "The configured groupId is invalid.",
 InvalidGroupIdException::new),
 UNKNOWN_MEMBER_ID(25, "The coordinator is not aware of this member.",
 UnknownMemberIdException::new),
@@ -171,7 +172,7 @@
 InvalidSessionTimeoutException::new),
 REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is 
needed.",
 RebalanceInProgressException::new),
-INVALID_COMMIT_OFFSET_SIZE(28, "The committing offset data size is not 
valid",
+ 

[jira] [Commented] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-08-24 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591643#comment-16591643
 ] 

Jonathan Santilli commented on KAFKA-7165:
--

Done [~omkreddy], let's wait, thanks!

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
> /brokers/ids/1, node already exists and owner '216186131422332301' does not 
> match current session '288330817911521280' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)*
>  [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 
> is: NODEEXISTS* (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
> 'isr-expiration' (kafka.utils.KafkaScheduler)
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired for 
> /brokers/topics/*TOPIC_NAME*/partitions/*PARTITION-#*/state
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>  at kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:465)
>  at 

[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2018-08-24 Thread Russell Ferriday (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591460#comment-16591460
 ] 

Russell Ferriday commented on KAFKA-2260:
-

This would enable full-on eventsourcing on Kafka, without having to restrict to 
single-thread designs.
One example of a great (>250 github star) FOSS project being held back by this:

    [https://github.com/johnbywater/eventsourcing/issues/108]

Can we see this soon?

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Ben Kirwin
>Priority: Minor
> Attachments: KAFKA-2260.patch, expected-offsets.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7336) Kafka Connect source task hangs when producing record with invalid topic name

2018-08-24 Thread Gunnar Morling (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-7336:
--
Summary: Kafka Connect source task hangs when producing record with invalid 
topic name  (was: Kafka Connect source task when producing record with invalid 
topic name)

> Kafka Connect source task hangs when producing record with invalid topic name
> -
>
> Key: KAFKA-7336
> URL: https://issues.apache.org/jira/browse/KAFKA-7336
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> If a Kafka Connect source task returns a {{SourceRecord}} with an invalid 
> topic name (e.g. "dbserver1.inventory.test@data"), that source task hangs 
> (presumably indefinitely?) and doesn't continue it's polling loop. The log is 
> flooded with this message:
> {code}
> connect_1| 2018-08-24 08:47:29,014 WARN   ||  [Producer 
> clientId=producer-4] Error while fetching metadata with correlation id 833 : 
> {dbserver1.inventory.test@data=INVALID_TOPIC_EXCEPTION}   
> [org.apache.kafka.clients.NetworkClient]
> {code}
> The producer thread is stuck in the loop here:
> {code}
> KafkaProducer.waitOnMetadata(String, Integer, long) line: 938
> KafkaProducer.doSend(ProducerRecord, Callback) line: 823
> KafkaProducer.send(ProducerRecord, Callback) line: 803  
> WorkerSourceTask.sendRecords() line: 318  
> WorkerSourceTask.execute() line: 228  
> WorkerSourceTask(WorkerTask).doRun() line: 175
> WorkerSourceTask(WorkerTask).run() line: 219  
> Executors$RunnableAdapter.call() line: 511 
> FutureTask.run() line: 266 
> ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149
> ThreadPoolExecutor$Worker.run() line: 624 
> Thread.run() line: 748
> {code}
> This causes the task to remain in RUNNING state, but no further invocations 
> of {{poll()}} are done.
> Of course we'll work around this and make sure to not produce records with 
> invalid topic names, but I think the source task should transition to FAILED 
> state in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7336) Kafka Connect source task when producing record with invalid topic name

2018-08-24 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-7336:
-

 Summary: Kafka Connect source task when producing record with 
invalid topic name
 Key: KAFKA-7336
 URL: https://issues.apache.org/jira/browse/KAFKA-7336
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Gunnar Morling


If a Kafka Connect source task returns a {{SourceRecord}} with an invalid topic 
name (e.g. "dbserver1.inventory.test@data"), that source task hangs (presumably 
indefinitely?) and doesn't continue it's polling loop. The log is flooded with 
this message:

{code}
connect_1| 2018-08-24 08:47:29,014 WARN   ||  [Producer 
clientId=producer-4] Error while fetching metadata with correlation id 833 : 
{dbserver1.inventory.test@data=INVALID_TOPIC_EXCEPTION}   
[org.apache.kafka.clients.NetworkClient]
{code}

The producer thread is stuck in the loop here:

{code}
KafkaProducer.waitOnMetadata(String, Integer, long) line: 938  
KafkaProducer.doSend(ProducerRecord, Callback) line: 823  
KafkaProducer.send(ProducerRecord, Callback) line: 803
WorkerSourceTask.sendRecords() line: 318
WorkerSourceTask.execute() line: 228
WorkerSourceTask(WorkerTask).doRun() line: 175  
WorkerSourceTask(WorkerTask).run() line: 219
Executors$RunnableAdapter.call() line: 511   
FutureTask.run() line: 266   
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149  
ThreadPoolExecutor$Worker.run() line: 624   
Thread.run() line: 748  
{code}

This causes the task to remain in RUNNING state, but no further invocations of 
{{poll()}} are done.

Of course we'll work around this and make sure to not produce records with 
invalid topic names, but I think the source task should transition to FAILED 
state in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)