[jira] [Commented] (KAFKA-6547) group offset reset and begin_offset ignored/no effect

2018-08-12 Thread Nikazu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577856#comment-16577856
 ] 

Nikazu commented on KAFKA-6547:
---

Any update on this? I seem to be having the same issue with kafka 2.0

> group offset reset and begin_offset ignored/no effect
> -
>
> Key: KAFKA-6547
> URL: https://issues.apache.org/jira/browse/KAFKA-6547
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0
> Environment: ubuntu 16, java 1.8
>Reporter: Dan
>Priority: Major
>
> Use of kafka-consumer-group.sh with --reset-offsets --execute  <--to-earliest 
> or anything> has no effect in 1.0. When my group client connects and requests 
> a specific offset or an earliest there's no effect and the consumer is unable 
> to poll, so no messages, even new ones are ignored.
> I installed 0.11 and these problems are not manifest.
> I'm unfamiliar with the internals and put the offset manager as the possible 
> component, but that's a guess.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7279) partitionsFor implicitly creates topic for the existent topic

2018-08-12 Thread huxihx (JIRA)
huxihx created KAFKA-7279:
-

 Summary: partitionsFor implicitly creates topic for the existent 
topic
 Key: KAFKA-7279
 URL: https://issues.apache.org/jira/browse/KAFKA-7279
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.0.0
Reporter: huxihx


With `auto.create.topics.enable` set to true, the non-existent topic got 
created when invoking `Consumer#partitionsFor`. Is it deliberately as designed?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7211) MM should handle timeouts in commitSync

2018-08-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577844#comment-16577844
 ] 

ASF GitHub Bot commented on KAFKA-7211:
---

huxihx opened a new pull request #5492: KAFKA-7211: MM should handle 
TimeoutException in commitSync
URL: https://github.com/apache/kafka/pull/5492
 
 
   With KIP-266 introduced, MirrorMaker should handle TimeoutException thrown 
in commitSync(). Besides, MM should only commit offsets for existsing topics.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM should handle timeouts in commitSync
> ---
>
> Key: KAFKA-7211
> URL: https://issues.apache.org/jira/browse/KAFKA-7211
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
>
> Now that we have KIP-266, the user can override `default.api.timeout.ms` for 
> the consumer so that commitSync does not block indefinitely. MM needs to be 
> updated to handle TimeoutException. We may also need some logic to handle 
> deleted topics. If MM attempts to commit an offset for a deleted topic, the 
> call will timeout and we should probably check if the topic exists and remove 
> the offset if it doesn't.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7211) MM should handle timeouts in commitSync

2018-08-12 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-7211:
-

Assignee: huxihx

> MM should handle timeouts in commitSync
> ---
>
> Key: KAFKA-7211
> URL: https://issues.apache.org/jira/browse/KAFKA-7211
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
>
> Now that we have KIP-266, the user can override `default.api.timeout.ms` for 
> the consumer so that commitSync does not block indefinitely. MM needs to be 
> updated to handle TimeoutException. We may also need some logic to handle 
> deleted topics. If MM attempts to commit an offset for a deleted topic, the 
> call will timeout and we should probably check if the topic exists and remove 
> the offset if it doesn't.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577735#comment-16577735
 ] 

ASF GitHub Bot commented on KAFKA-7278:
---

lindong28 opened a new pull request #5491: KAFKA-7278; replaceSegments() should 
not call asyncDeleteSegment() for segments which have been removed from 
segments list
URL: https://github.com/apache/kafka/pull/5491
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-12 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577707#comment-16577707
 ] 

Dong Lin commented on KAFKA-7278:
-

Yes `segment.changeFileSuffixes("", Log.DeletedFileSuffix)` is executed when 
the lock is hold. But the lock is released between step 2), 3) and 4) in the 
example sequence provided above.

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-12 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577706#comment-16577706
 ] 

Dong Lin commented on KAFKA-7278:
-

[~ijuma] The exception is probably thrown from `segment.changeFileSuffixes("", 
Log.DeletedFileSuffix)`. Below is the stacktrace in the discussion of 
https://issues.apache.org/jira/browse/KAFKA-6188.

{code}

[2018-05-07 16:53:06,721] ERROR Failed to clean up log for 
__consumer_offsets-24 in dir /tmp/kafka-logs due to IOException 
(kafka.server.LogDirFailureChannel)
java.nio.file.NoSuchFileException: 
/tmp/kafka-logs/__consumer_offsets-24/.log
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
at kafka.log.Log.asyncDeleteSegment(Log.scala:1601)
at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653)
at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648)
at scala.collection.immutable.List.foreach(List.scala:389)
at kafka.log.Log.replaceSegments(Log.scala:1648)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535)
at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462)
at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461)
at scala.collection.immutable.List.foreach(List.scala:389)
at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
at kafka.log.Cleaner.clean(LogCleaner.scala:438)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
Suppressed: java.nio.file.NoSuchFileException: 
/tmp/kafka-logs/__consumer_offsets-24/.log -> 
/tmp/kafka-logs/__consumer_offsets-24/.log.deleted
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)
... 16 more
[2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving 
replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager)
[2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs 
(kafka.log.LogManager)
[2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in 
/tmp/kafka-logs have failed (kafka.log.LogManager)

{code}

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7255) Timing issue in SimpleAclAuthorizer with concurrent create/update

2018-08-12 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7255:
--
Reviewer: Jun Rao

> Timing issue in SimpleAclAuthorizer with concurrent create/update
> -
>
> Key: KAFKA-7255
> URL: https://issues.apache.org/jira/browse/KAFKA-7255
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 0.11.0.4, 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> There is a small timing window in SimpleAclAuthorizer where ACL updates may 
> be lost if two brokers create ACLs for a resource at the same time.
> Scenario: Administrator creates new.topic and sends one ACL request to add 
> ACL for UserA for new.topic and a second request to add ACL for UserB for 
> new.topic using AdminClient. These requests may be sent to different brokers 
> by AdminClient. In most cases, both ACLs are added for the resource 
> new.topic, but there is a small timing window where one broker may overwrite 
> the ACL written by the other broker, resulting in only one of the ACLs 
> (either UserA or UserB) being actually stored in ZooKeeper. The timing window 
> itself is very small, but we have seen intermittent failures in 
> SimpleAclAuthorizerTest.testHighConcurrencyModificationOfResourceAcls as a 
> result of this window.
> Even though this issue can result in incorrect ACLs affecting security, we 
> have not raised this as a security vulnerability since this is not an 
> exploitable issue. ACLs can only be set by privileged users in Kafka who have 
> Alter access on the Cluster resource. Users without this privileged access 
> cannot use this issue to gain additional access to any resource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7261) Request and response total metrics record bytes instead of request count

2018-08-12 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7261.
---
   Resolution: Fixed
 Reviewer: Jun Rao
Fix Version/s: 2.1.0
   2.0.1
   1.1.2
   1.0.3

> Request and response total metrics record bytes instead of request count
> 
>
> Key: KAFKA-7261
> URL: https://issues.apache.org/jira/browse/KAFKA-7261
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> Request and response total metrics seem to be recording total bytes rather 
> than total requests since they record using a common sensor.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7255) Timing issue in SimpleAclAuthorizer with concurrent create/update

2018-08-12 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7255.
---
Resolution: Fixed

> Timing issue in SimpleAclAuthorizer with concurrent create/update
> -
>
> Key: KAFKA-7255
> URL: https://issues.apache.org/jira/browse/KAFKA-7255
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 0.11.0.4, 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> There is a small timing window in SimpleAclAuthorizer where ACL updates may 
> be lost if two brokers create ACLs for a resource at the same time.
> Scenario: Administrator creates new.topic and sends one ACL request to add 
> ACL for UserA for new.topic and a second request to add ACL for UserB for 
> new.topic using AdminClient. These requests may be sent to different brokers 
> by AdminClient. In most cases, both ACLs are added for the resource 
> new.topic, but there is a small timing window where one broker may overwrite 
> the ACL written by the other broker, resulting in only one of the ACLs 
> (either UserA or UserB) being actually stored in ZooKeeper. The timing window 
> itself is very small, but we have seen intermittent failures in 
> SimpleAclAuthorizerTest.testHighConcurrencyModificationOfResourceAcls as a 
> result of this window.
> Even though this issue can result in incorrect ACLs affecting security, we 
> have not raised this as a security vulnerability since this is not an 
> exploitable issue. ACLs can only be set by privileged users in Kafka who have 
> Alter access on the Cluster resource. Users without this privileged access 
> cannot use this issue to gain additional access to any resource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7261) Request and response total metrics record bytes instead of request count

2018-08-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577689#comment-16577689
 ] 

ASF GitHub Bot commented on KAFKA-7261:
---

rajinisivaram closed pull request #5484: KAFKA-7261: Fix request total metric 
to count requests instead of bytes
URL: https://github.com/apache/kafka/pull/5484
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
index 09263cecae8..91d4461d2b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
@@ -61,6 +61,9 @@ public Meter(SampledStat rateStat, MetricName rateMetricName, 
MetricName totalMe
  * Construct a Meter with provided time unit and provided {@link 
SampledStat} stats for Rate
  */
 public Meter(TimeUnit unit, SampledStat rateStat, MetricName 
rateMetricName, MetricName totalMetricName) {
+if (!(rateStat instanceof SampledTotal) && !(rateStat instanceof 
Count)) {
+throw new IllegalArgumentException("Meter is supported only for 
SampledTotal and Count");
+}
 this.total = new Total();
 this.rate = new Rate(unit, rateStat);
 this.rateMetricName = rateMetricName;
@@ -77,6 +80,8 @@ public Meter(TimeUnit unit, SampledStat rateStat, MetricName 
rateMetricName, Met
 @Override
 public void record(MetricConfig config, double value, long timeMs) {
 rate.record(config, value, timeMs);
-total.record(config, value, timeMs);
+// Total metrics with Count stat should record 1.0 (as recorded in the 
count)
+double totalValue = (rate.stat instanceof Count) ? 1.0 : value;
+total.record(config, totalValue, timeMs);
 }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 355233125bc..55354ac8d64 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -146,4 +146,12 @@ public ByteBuffer payload() {
 return this.buffer;
 }
 
+/**
+ * Returns the total size of the receive including payload and size buffer
+ * for use in metrics. This is consistent with {@link NetworkSend#size()}
+ */
+public int size() {
+return payload().limit() + size.limit();
+}
+
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 8ca7fff381a..7e32509933e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -862,7 +862,7 @@ private void addToCompletedReceives() {
 private void addToCompletedReceives(KafkaChannel channel, 
Deque stagedDeque) {
 NetworkReceive networkReceive = stagedDeque.poll();
 this.completedReceives.add(networkReceive);
-this.sensors.recordBytesReceived(channel.id(), 
networkReceive.payload().limit());
+this.sensors.recordBytesReceived(channel.id(), networkReceive.size());
 }
 
 // only for testing
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 59bc84e40de..5c75d03b0e6 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -465,8 +465,12 @@ public void testRateWindowing() throws Exception {
 Sensor s = metrics.sensor("test.sensor", cfg);
 MetricName rateMetricName = metrics.metricName("test.rate", "grp1");
 MetricName totalMetricName = metrics.metricName("test.total", "grp1");
+MetricName countRateMetricName = metrics.metricName("test.count.rate", 
"grp1");
+MetricName countTotalMetricName = 
metrics.metricName("test.count.total", "grp1");
 s.add(new Meter(TimeUnit.SECONDS, rateMetricName, totalMetricName));
-KafkaMetric totalMetric = 
metrics.metrics().get(metrics.metricName("test.total", "grp1"));
+s.add(new Meter(TimeUnit.SECONDS, new Count(), countRateMetricName, 
countTotalMetricName));
+KafkaMetric totalMetric = metrics.metrics().get(totalMetricName);
+KafkaMetric countTotalMetric = 
metrics.metrics().get(countTotalMetricName);
 
 int sum = 0;
 int 

[jira] [Commented] (KAFKA-7269) KStream.merge is not documented

2018-08-12 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577626#comment-16577626
 ] 

Guozhang Wang commented on KAFKA-7269:
--

[~lucapette] that is right. More specifically, you'll need to add one more row 
in the table of KStream APIs:

https://github.com/apache/kafka/blob/trunk/docs/streams/developer-guide/dsl-api.html#L2866

> KStream.merge is not documented
> ---
>
> Key: KAFKA-7269
> URL: https://issues.apache.org/jira/browse/KAFKA-7269
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0
>Reporter: John Roesler
>Priority: Major
>  Labels: beginner, newbie
>
> If I understand the operator correctly, it should be documented as a 
> stateless transformation at 
> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7269) KStream.merge is not documented

2018-08-12 Thread Luca Pette (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577507#comment-16577507
 ] 

Luca Pette commented on KAFKA-7269:
---

[~mjsax] I'd be happy to write docs for merge (I agree it fits better than 
union as well). For my understanding, the 
`docs/streams/developer-guide/dsl-api.html` needs to be changed manually right?

> KStream.merge is not documented
> ---
>
> Key: KAFKA-7269
> URL: https://issues.apache.org/jira/browse/KAFKA-7269
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0
>Reporter: John Roesler
>Priority: Major
>  Labels: beginner, newbie
>
> If I understand the operator correctly, it should be documented as a 
> stateless transformation at 
> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-12 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577484#comment-16577484
 ] 

Ismael Juma commented on KAFKA-7278:


Can you please clarify which part of the code is the problem?

{code}
private def asyncDeleteSegment(segment: LogSegment) {
segment.changeFileSuffixes("", Log.DeletedFileSuffix)
def deleteSeg() {
  info(s"Deleting segment ${segment.baseOffset}")
  maybeHandleIOException(s"Error while deleting segments for 
$topicPartition in dir ${dir.getParent}") {
segment.deleteIfExists()
  }
}
scheduler.schedule("delete-file", deleteSeg _, delay = 
config.fileDeleteDelayMs)
  }
{code}

`segment.deleteIfExists()` should not throw an exception if the file doesn't 
exist (this is the code that I changed some time ago). There rest executes with 
the lock held. That's why I suspected you were seeing an issue that has since 
been fixed. But I might be missing something.

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)