[jira] [Assigned] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`

2018-06-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6966:
--

Assignee: Nishanth Pradeep

> Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
> 
>
> Key: KAFKA-6966
> URL: https://issues.apache.org/jira/browse/KAFKA-6966
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Assignee: Nishanth Pradeep
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> With KIP-303, a dynamic routing feature was added and 
> `TopologyDescription.Sink#topic()` returns `null` if this feature is used.
> It would be useful to get the actually used `TopicNameExtractor` class from 
> the `TopologyDescription`.
> We suggest to add `Class 
> TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
> dynamic routing feature is not used.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7045:

Description: 
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:

 

(1) When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.

 

(2) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

  was:
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.

(1) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

(2) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.


> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1
>Reporter: Dhruvil Shah
>Priority: Major
> Fix For: 2.1.0
>
>
> When down-conversion is required, the consumer might fail consuming messages 
> under certain conditions. Couple such cases are outlined below:
>  
> (1) When consuming from a compacted topic, it is possible that the consumer 
> wants to fetch messages that fall in the middle of a batch but the messages 
> have been compacted by the cl

[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7045:

Description: 
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:

(1) When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.

(2) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these do not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

  was:
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:

 

(1) When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.

 

(2) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.


> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1
>Reporter: Dhruvil Shah
>Priority: Major
> Fix For: 2.1.0
>
>
> When down-conversion is required, the consumer might fail consuming messages 
> under certain conditions. Couple such cases are outlined below:
> (1) When consuming from a compacted topic, it is possible that the consumer 
> wants to fetch messages that fall in the middle of a batch but the messages 
> have been compacted by the cleaner. For example, let's say we have the 
> following two segments. The brackets indicate a single batch of messages and 
> the numbers within are the message offsets.
> Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> If the cleaner were to come in now and clean up messages with offsets 7 and 
> 8, the segments would look like the following:
> Segment #1: [0, 1, 2], [3, 4, 5], [6]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> A consumer atte

[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7045:

Attachment: log-cleaner-test.zip

> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1
>Reporter: Dhruvil Shah
>Priority: Major
> Fix For: 2.1.0
>
> Attachments: log-cleaner-test.zip
>
>
> When down-conversion is required, the consumer might fail consuming messages 
> under certain conditions. Couple such cases are outlined below:
> (1) When consuming from a compacted topic, it is possible that the consumer 
> wants to fetch messages that fall in the middle of a batch but the messages 
> have been compacted by the cleaner. For example, let's say we have the 
> following two segments. The brackets indicate a single batch of messages and 
> the numbers within are the message offsets.
> Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> If the cleaner were to come in now and clean up messages with offsets 7 and 
> 8, the segments would look like the following:
> Segment #1: [0, 1, 2], [3, 4, 5], [6]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> A consumer attempting to fetch messages at offset 7 will start reading the 
> batch starting at offset 6. During down-conversion, we will drop the record 
> starting at 6 it is less than the current fetch start offset. However, there 
> are no messages in the log following offset 6. In such cases, we return the 
> `FileRecords` itself which would cause the consumer to throw an exception 
> because it does not understand the stored message format.
> (2) When consuming from a topic with transactional messages, down-conversion 
> usually drops control batches because these do not exist in V0 and V1 message 
> formats. If there are no message batches following the control batch in the 
> particular segment (or if we are at the end of the log), we would again get 
> no records after down-conversion and will return the `FileRecords`. Because 
> the consumer is not able to interpret control batches, it will again throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`

2018-06-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509072#comment-16509072
 ] 

Matthias J. Sax commented on KAFKA-6966:


Thanks for your interest in contributing to Kafka! I added you to the list of 
contributors and assigned the ticket to you. You can also self-assign tickets 
now. Please prepare a KIP for this – you can also work on a PR in parallel – 
whatever works best for you. The KIP must be accepted before we can accept the 
PR. If you don't have a wiki account yet, please create one and provide your 
wiki ID so we can grant you write access to the wiki (otherwise you cannot 
create a KIP page).

> Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
> 
>
> Key: KAFKA-6966
> URL: https://issues.apache.org/jira/browse/KAFKA-6966
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Assignee: Nishanth Pradeep
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> With KIP-303, a dynamic routing feature was added and 
> `TopologyDescription.Sink#topic()` returns `null` if this feature is used.
> It would be useful to get the actually used `TopicNameExtractor` class from 
> the `TopologyDescription`.
> We suggest to add `Class 
> TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
> dynamic routing feature is not used.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509076#comment-16509076
 ] 

Dhruvil Shah commented on KAFKA-7045:
-

Attached topic data that reproduces case (1) described in the JIRA where 
messages in the first segment starting offset 70 have been cleaned out. These 
messages were part of the batch starting at offset 69.

Credit to [~omkreddy] for finding this issue and providing a reproducible test 
case.

> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1
>Reporter: Dhruvil Shah
>Priority: Major
> Fix For: 2.1.0
>
> Attachments: log-cleaner-test.zip
>
>
> When down-conversion is required, the consumer might fail consuming messages 
> under certain conditions. Couple such cases are outlined below:
> (1) When consuming from a compacted topic, it is possible that the consumer 
> wants to fetch messages that fall in the middle of a batch but the messages 
> have been compacted by the cleaner. For example, let's say we have the 
> following two segments. The brackets indicate a single batch of messages and 
> the numbers within are the message offsets.
> Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> If the cleaner were to come in now and clean up messages with offsets 7 and 
> 8, the segments would look like the following:
> Segment #1: [0, 1, 2], [3, 4, 5], [6]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> A consumer attempting to fetch messages at offset 7 will start reading the 
> batch starting at offset 6. During down-conversion, we will drop the record 
> starting at 6 it is less than the current fetch start offset. However, there 
> are no messages in the log following offset 6. In such cases, we return the 
> `FileRecords` itself which would cause the consumer to throw an exception 
> because it does not understand the stored message format.
> (2) When consuming from a topic with transactional messages, down-conversion 
> usually drops control batches because these do not exist in V0 and V1 message 
> formats. If there are no message batches following the control batch in the 
> particular segment (or if we are at the end of the log), we would again get 
> no records after down-conversion and will return the `FileRecords`. Because 
> the consumer is not able to interpret control batches, it will again throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7043) Connect isolation whitelist does not include new primitive converters (KIP-305)

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509082#comment-16509082
 ] 

ASF GitHub Bot commented on KAFKA-7043:
---

rhauch opened a new pull request #5198: KAFKA-7043: Modified plugin isolation 
whitelist with recently added converters (KIP-305)
URL: https://github.com/apache/kafka/pull/5198
 
 
   Several recently-added converters are included in the plugin isolation 
whitelist, similarly to the `StringConverter`. This is a change in the 
implementation, and does not affect the approved KIP. Several unit tests were 
added to verify they are being loaded in isolation, again similarly to 
`StringConverter`.
   
   These changes should be applied only to `trunk`, since these converters were 
added as part of KIP-305 for AK 2.0.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect isolation whitelist does not include new primitive converters 
> (KIP-305)
> ---
>
> Key: KAFKA-7043
> URL: https://issues.apache.org/jira/browse/KAFKA-7043
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Blocker
> Fix For: 2.0.0
>
>
> KIP-305 added several new primitive converters, but the PR did not add them 
> to the whitelist for the plugin isolation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6881) Kafka 1.1 Broker version crashes when deleting log

2018-06-11 Thread xiaojing zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509191#comment-16509191
 ] 

xiaojing zhou commented on KAFKA-6881:
--

I have the same issue, the __consumer_offsets-7/019668089841.log does 
exist in our nas folder. My kafka version is 1.0.1.

{code}

[2018-06-11 00:04:23,282] ERROR Failed to clean up log for __consumer_offsets-7 
in dir /nas/kafka_logs/lvsp01hkf001 due to IOException 
(kafka.server.LogDirFailureChannel)

java.nio.file.NoSuchFileException: 
/nas/kafka_logs/lvsp01hkf001/__consumer_offsets-7/019668089841.log

--

java.nio.file.NoSuchFileException: 
/nas/kafka_logs/lvsp01hkf001/__consumer_offsets-7/019668089841.log

 at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)

 at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)

 at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)

 at java.nio.file.Files.move(Files.java:1395)

 at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:682)

 at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)

 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:398)

 at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1592)

 at kafka.log.Log$$anonfun$replaceSegments$1.apply(Log.scala:1644)

 at kafka.log.Log$$anonfun$replaceSegments$1.apply(Log.scala:1639)

 at scala.collection.immutable.List.foreach(List.scala:392)

{code}

> Kafka 1.1 Broker version crashes when deleting log
> --
>
> Key: KAFKA-6881
> URL: https://issues.apache.org/jira/browse/KAFKA-6881
> Project: Kafka
>  Issue Type: Bug
> Environment: Linux
>Reporter: K B Parthasarathy
>Priority: Critical
>
> Hello
> We are running Kafka 1.1 version in Linux from past 3 weeks. Today Kafka 
> crashed. When we checked server.log file the following log was found
> [2018-05-07 16:53:06,721] ERROR Failed to clean up log for 
> __consumer_offsets-24 in dir /tmp/kafka-logs due to IOException 
> (kafka.server.LogDirFailureChannel)
>  java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/__consumer_offsets-24/.log
>  at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>  at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
>  at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>  at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>  at kafka.log.Log.asyncDeleteSegment(Log.scala:1601)
>  at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653)
>  at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648)
>  at scala.collection.immutable.List.foreach(List.scala:389)
>  at kafka.log.Log.replaceSegments(Log.scala:1648)
>  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535)
>  at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462)
>  at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461)
>  at scala.collection.immutable.List.foreach(List.scala:389)
>  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Suppressed: java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/__consumer_offsets-24/.log -> 
> /tmp/kafka-logs/__consumer_offsets-24/.log.deleted
>  at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>  at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
>  at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)
>  ... 16 more
>  [2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager)
>  [2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs 
> (kafka.log.LogManager)
>  [2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in 
> /tmp/kafka-logs h

[jira] [Commented] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required

2018-06-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509256#comment-16509256
 ] 

Dong Lin commented on KAFKA-6806:
-

Hey [~rhauch], this issue has been marked as blocking issue for 1.1.1 but there 
does not seem to be person actively working on this. Do you know who will be 
working on this issue, or do we actually need to block 1.1.1 release until it 
is fixed?

 

> Unable to validate sink connectors without "topics" component which is not 
> required
> ---
>
> Key: KAFKA-6806
> URL: https://issues.apache.org/jira/browse/KAFKA-6806
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: CP4.1., Centos7
>Reporter: Ivan Majnarić
>Priority: Blocker
> Fix For: 2.0.0, 1.1.1
>
>
> The bug is happening when you try to create new connector through for example 
> kafka-connect-ui.
> While both source and sink connectors were able to be validated through REST 
> without "topics" as add-on with "connector.class" like this:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> }{code}
> In the new version of CP4.1 you still can validate *source connectors* but 
> not *sink connectors*. If you want to validate sink connectors you need to 
> add to request -> "topics" config, like:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> "topics": "test-topic"
> }{code}
> So there is a little missmatch of the ways how to validate connectors which I 
> think happened accidentally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6948) Avoid overflow in timestamp comparison

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507808#comment-16507808
 ] 

ASF GitHub Bot commented on KAFKA-6948:
---

thisthat opened a new pull request #5183: KAFKA-6948 - Change comparison to 
avoid overflow inconsistencies
URL: https://github.com/apache/kafka/pull/5183
 
 
   Change timestamp comparison following what the Java documentation recommends 
to help preventing such errors.
   
   @guozhangwang I create the new PR as requested 😄 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid overflow in timestamp comparison
> --
>
> Key: KAFKA-6948
> URL: https://issues.apache.org/jira/browse/KAFKA-6948
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Giovanni Liva
>Priority: Major
>
> Some comparisons with timestamp values are not safe. This comparisons can 
> trigger errors that were found in some other issues, e.g. KAFKA-4290 or 
> KAFKA-6608.
> The following classes contains some comparison between timestamps that can 
> overflow.
>  * org.apache.kafka.clients.NetworkClientUtils
>  * org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
>  * org.apache.kafka.common.security.kerberos.KerberosLogin
>  * org.apache.kafka.connect.runtime.WorkerSinkTask
>  * org.apache.kafka.connect.tools.MockSinkTask
>  * org.apache.kafka.connect.tools.MockSourceTask
>  * org.apache.kafka.streams.processor.internals.GlobalStreamThread
>  * org.apache.kafka.streams.processor.internals.StateDirectory
>  * org.apache.kafka.streams.processor.internals.StreamThread
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7033) Modify AbstractOptions's timeoutMs as Long type

2018-06-11 Thread darion yaphet (JIRA)
darion yaphet created KAFKA-7033:


 Summary: Modify AbstractOptions's timeoutMs as Long type
 Key: KAFKA-7033
 URL: https://issues.apache.org/jira/browse/KAFKA-7033
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 1.1.0
Reporter: darion yaphet


Currently AbstractOptions's timeoutMs is Integer and using Long  to represent 
timeout Millisecond maybe better .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7033) Modify AbstractOptions's timeoutMs as Long type

2018-06-11 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507850#comment-16507850
 ] 

Stanislav Kozlovski commented on KAFKA-7033:


2147483647 (the max value of an Integer) milliseconds are equivalent to 24.85 
days. This is beyond a practical maximum value - I believe an integer is enough 
:)

 

> Modify AbstractOptions's timeoutMs as Long type
> ---
>
> Key: KAFKA-7033
> URL: https://issues.apache.org/jira/browse/KAFKA-7033
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 1.1.0
>Reporter: darion yaphet
>Priority: Minor
>
> Currently AbstractOptions's timeoutMs is Integer and using Long  to represent 
> timeout Millisecond maybe better .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning

2018-06-11 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6975:
---
Priority: Blocker  (was: Major)

> AdminClient.deleteRecords() may cause replicas unable to fetch from beginning
> -
>
> Key: KAFKA-6975
> URL: https://issues.apache.org/jira/browse/KAFKA-6975
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Blocker
> Fix For: 2.0.0
>
>
> AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to 
> the requested offset. If the requested offset is in the middle of the batch, 
> the replica will not be able to fetch from that offset (because it is in the 
> middle of the batch). 
> One use-case where this could cause problems is replica re-assignment. 
> Suppose we have a topic partition with 3 initial replicas, and at some point 
> the user issues  AdminClient.deleteRecords() for the offset that falls in the 
> middle of the batch. It now becomes log start offset for this topic 
> partition. Suppose at some later time, the user starts partition 
> re-assignment to 3 new replicas. The new replicas (followers) will start with 
> HW = 0, will try to fetch from 0, then get "out of order offset" because 0 < 
> log start offset (LSO); the follower will be able to reset offset to LSO of 
> the leader and fetch LSO; the leader will send a batch in response with base 
> offset  stop the fetcher thread. The end result is that the new replicas will not be 
> able to start fetching unless LSO moves to an offset that is not in the 
> middle of the batch, and the re-assignment will be stuck for a possibly a 
> very log time. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7032) The TimeUnit is neglected by KakfaConsumer#close(long, TimeUnit)

2018-06-11 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7032:
---
Priority: Blocker  (was: Major)

> The TimeUnit is neglected by KakfaConsumer#close(long, TimeUnit)
> 
>
> Key: KAFKA-7032
> URL: https://issues.apache.org/jira/browse/KAFKA-7032
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.0.0
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 2.0.0
>
>
> {code:java}
> @Deprecated
> @Override
> public void close(long timeout, TimeUnit timeUnit) {
> close(Duration.ofMillis(TimeUnit.MILLISECONDS.toMillis(timeout)));
> }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6979) Add max.block.ms to consumer for default timeout behavior

2018-06-11 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6979:
---
Priority: Blocker  (was: Major)

> Add max.block.ms to consumer for default timeout behavior
> -
>
> Key: KAFKA-6979
> URL: https://issues.apache.org/jira/browse/KAFKA-6979
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Dhruvil Shah
>Priority: Blocker
> Fix For: 2.0.0
>
>
> Implement max.block.ms as described in KIP-266: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica

2018-06-11 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6949:
---
Priority: Blocker  (was: Major)

> alterReplicaLogDirs() should grab partition lock when accessing log of the 
> future replica
> -
>
> Key: KAFKA-6949
> URL: https://issues.apache.org/jira/browse/KAFKA-6949
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dong Lin
>Priority: Blocker
> Fix For: 2.0.0
>
>
> I found this in a failed execution of 
> kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like 
> we're missing some option checking.
> {code}
> [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while 
> changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76)
> java.util.NoSuchElementException: None.get
>   at scala.None$.get(Option.scala:347)
>   at scala.None$.get(Option.scala:345)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584)
>   at 
> kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576)
>   at 
> kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:138)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7034) Remove the duplicated listTopics from Consumer

2018-06-11 Thread darion yaphet (JIRA)
darion yaphet created KAFKA-7034:


 Summary: Remove the duplicated listTopics from Consumer 
 Key: KAFKA-7034
 URL: https://issues.apache.org/jira/browse/KAFKA-7034
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: darion yaphet


Both AdminClient and Consumer are include the listTopics method , and they are 
also use the Cluster instance to get the topic name . They are very similar .

So I think we should remove the Consumer's listTopics method .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6697) JBOD configured broker should not die if log directory is invalid

2018-06-11 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6697:
---
Priority: Major  (was: Blocker)

> JBOD configured broker should not die if log directory is invalid
> -
>
> Key: KAFKA-6697
> URL: https://issues.apache.org/jira/browse/KAFKA-6697
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 2.0.0
>
>
> Currently JBOD configured broker will still die on startup if 
> dir.getCanonicalPath() throws IOException. We should mark such log directory 
> as offline and broker should still run if there is good disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6697) JBOD configured broker should not die if log directory is invalid

2018-06-11 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6697:
---
Priority: Blocker  (was: Major)

> JBOD configured broker should not die if log directory is invalid
> -
>
> Key: KAFKA-6697
> URL: https://issues.apache.org/jira/browse/KAFKA-6697
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Blocker
> Fix For: 2.0.0
>
>
> Currently JBOD configured broker will still die on startup if 
> dir.getCanonicalPath() throws IOException. We should mark such log directory 
> as offline and broker should still run if there is good disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6780) log cleaner shouldn't clean messages beyond high watermark

2018-06-11 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507860#comment-16507860
 ] 

Ismael Juma commented on KAFKA-6780:


[~apovzner], this currently has 2.0.0 and 1.1.1 as the target release. Both 
releases are nearing completion. Shall we update the fix version?

> log cleaner shouldn't clean messages beyond high watermark
> --
>
> Key: KAFKA-6780
> URL: https://issues.apache.org/jira/browse/KAFKA-6780
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Assignee: Anna Povzner
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> Currently, the firstUncleanableDirtyOffset computed by the log cleaner is 
> bounded by the first offset in the active segment. It's possible for the high 
> watermark to be smaller than that. This may cause a committed record to be 
> removed because of an uncommitted record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5905) Remove PrincipalBuilder and DefaultPrincipalBuilder

2018-06-11 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5905:
---
Fix Version/s: (was: 2.0.0)
   3.0.0

> Remove PrincipalBuilder and DefaultPrincipalBuilder
> ---
>
> Key: KAFKA-5905
> URL: https://issues.apache.org/jira/browse/KAFKA-5905
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Manikumar
>Priority: Major
> Fix For: 3.0.0
>
>
> These classes were deprecated after KIP-189: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-189%3A+Improve+principal+builder+interface+and+add+support+for+SASL,
>  which is part of 1.0.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)

2018-06-11 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5886:
---
Fix Version/s: (was: 2.0.0)

> Introduce delivery.timeout.ms producer config (KIP-91)
> --
>
> Key: KAFKA-5886
> URL: https://issues.apache.org/jira/browse/KAFKA-5886
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
>Priority: Major
>
> We propose adding a new timeout delivery.timeout.ms. The window of 
> enforcement includes batching in the accumulator, retries, and the inflight 
> segments of the batch. With this config, the user has a guaranteed upper 
> bound on when a record will either get sent, fail or expire from the point 
> when send returns. In other words we no longer overload request.timeout.ms to 
> act as a weak proxy for accumulator timeout and instead introduce an explicit 
> timeout that users can rely on without exposing any internals of the producer 
> such as the accumulator. 
> See 
> [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer]
>  for more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3438) Rack Aware Replica Reassignment should warn of overloaded brokers

2018-06-11 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3438:
---
Fix Version/s: (was: 2.0.0)

> Rack Aware Replica Reassignment should warn of overloaded brokers
> -
>
> Key: KAFKA-3438
> URL: https://issues.apache.org/jira/browse/KAFKA-3438
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Ben Stopford
>Assignee: Vahid Hashemian
>Priority: Major
>
> We've changed the replica reassignment code to be rack aware.
> One problem that might catch users out would be that they rebalance the 
> cluster using kafka-reassign-partitions.sh but their rack configuration means 
> that some high proportion of replicas are pushed onto a single, or small 
> number of, brokers. 
> This should be an easy problem to avoid, by changing the rack assignment 
> information, but we should probably warn users if they are going to create 
> something that is unbalanced. 
> So imagine I have a Kafka cluster of 12 nodes spread over two racks with rack 
> awareness enabled. If I add a 13th machine, on a new rack, and run the 
> rebalance tool, that new machine will get ~6x as many replicas as the least 
> loaded broker. 
> Suggest a warning  be added to the tool output when --generate is called. 
> "The most loaded broker has 2.3x as many replicas as the the least loaded 
> broker. This is likely due to an uneven distribution of brokers across racks. 
> You're advised to alter the rack config so there are approximately the same 
> number of brokers per rack" and displays the individual rack→#brokers and 
> broker→#replicas data for the proposed move.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5914) Return MessageFormatVersion and MessageMaxBytes in MetadataResponse

2018-06-11 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5914:
---
Fix Version/s: (was: 2.0.0)

> Return MessageFormatVersion and MessageMaxBytes in MetadataResponse
> ---
>
> Key: KAFKA-5914
> URL: https://issues.apache.org/jira/browse/KAFKA-5914
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
>
> As part of KIP-192, we want to send two additional fields in the 
> {{TopicMetadata}} which is part of the {{MetadataResponse}}. These fields are 
> the {{MessageFormatVersion}} and the {{MessageMaxBytes}}.
> The {{MessageFormatVersion}} is required to implement 
> https://issues.apache.org/jira/browse/KAFKA-5794 . The latter will be 
> implemented in a future release, but with the changes proposed here, the said 
> future release will be backward compatible with 1.0.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6800) Update documentation for SASL/PLAIN and SCRAM to use callbacks

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507897#comment-16507897
 ] 

ASF GitHub Bot commented on KAFKA-6800:
---

rajinisivaram closed pull request #4890: KAFKA-6800: Update SASL/PLAIN and 
SCRAM docs to use KIP-86 callbacks
URL: https://github.com/apache/kafka/pull/4890
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 848d12c8e4d..911ba09edec 100644
--- a/build.gradle
+++ b/build.gradle
@@ -859,9 +859,11 @@ project(':clients') {
 include "**/org/apache/kafka/common/serialization/*"
 include "**/org/apache/kafka/common/config/*"
 include "**/org/apache/kafka/common/security/auth/*"
-include "**/org/apache/kafka/server/policy/*"
+include "**/org/apache/kafka/common/security/plain/*"
+include "**/org/apache/kafka/common/security/scram/*"
 include "**/org/apache/kafka/common/security/token/delegation/*"
 include "**/org/apache/kafka/common/security/oauthbearer/*"
+include "**/org/apache/kafka/server/policy/*"
   }
 }
 
diff --git a/docs/security.html b/docs/security.html
index 57bba4775a3..877ca579a52 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -547,27 +547,12 @@ 7.3 
Authentication using SASL
 SASL/PLAIN should be used only with SSL as transport layer to 
ensure that clear passwords are not transmitted on the wire without 
encryption.
 The default implementation of SASL/PLAIN in Kafka specifies 
usernames and passwords in the JAAS configuration file as shown
-here. To avoid 
storing passwords on disk, you can plug in your own implementation of
-javax.security.auth.spi.LoginModule that provides 
usernames and passwords from an external source. The login module 
implementation should
-provide username as the public credential and password as the 
private credential of the Subject. The default implementation
-
org.apache.kafka.common.security.plain.PlainLoginModule can be 
used as an example.
-In production systems, external authentication servers may 
implement password authentication. Kafka brokers can be integrated with these 
servers by adding
-your own implementation of 
javax.security.sasl.SaslServer. The default implementation 
included in Kafka in the package
-org.apache.kafka.common.security.plain can be 
used as an example to get started.
-
-New providers must be installed and registered in the JVM. 
Providers can be installed by adding provider classes to
-the normal CLASSPATH or bundled as a jar file and 
added to JAVA_HOME/lib/ext.
-Providers can be registered statically by adding a 
provider to the security properties file
-JAVA_HOME/lib/security/java.security.
-security.provider.n=providerClassName
-where providerClassName is the fully qualified name of 
the new provider and n is the preference order with
-lower numbers indicating higher preference.
-Alternatively, you can register providers dynamically at 
runtime by invoking Security.addProvider at the beginning of the 
client
-application or in a static initializer in the login module. 
For example:
-Security.addProvider(new 
PlainSaslServerProvider());
-For more details, see http://docs.oracle.com/javase/8/docs/technotes/guides/security/crypto/CryptoSpec.html";>JCA
 Reference.
-
-
+here. From 
Kafka version 2.0 onwards, you can avoid storing clear passwords on disk
+by configuring your own callback handlers that obtain username 
and password from an external source using the configuration options
+sasl.server.callback.handler.class and 
sasl.client.callback.handler.class.
+In production systems, external authentication servers may 
implement password authentication. From Kafka version 2.0 onwards,
+you can plug in your own callback handlers that use external 
authentication servers for password verification by configuring
+sasl.server.callback.handler.class.
 
 
 
@@ -667,8 +652,8 @@ 7.3 
Authentication using SASL
 SCRAM should be used only with TLS-encryption to prevent 
interception of SCRAM exchanges. This
 protects against dictionary or brute force attacks and against 
impersonation if Zookeeper is compromised.
-The default SASL/SCRAM implementation may be overridden usin

[jira] [Created] (KAFKA-7035) Kafka Processor's init() method sometimes is not called

2018-06-11 Thread Oleksandr Konopko (JIRA)
Oleksandr Konopko created KAFKA-7035:


 Summary: Kafka Processor's init() method sometimes is not called
 Key: KAFKA-7035
 URL: https://issues.apache.org/jira/browse/KAFKA-7035
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Oleksandr Konopko


Scenario:

1. We have processing of Kafka Topic which is implemented with Processor API

2. We want to collect metrics (lets say just count number of processed entities 
for simplicity)

3. How we tried to organize this
 * process data with process() method and send it down the stream with context
 * on each call of process() method update the counter
 * schedule puctuate function which will send metric to special topic. Metric 
is build with counter

You can find the code (we removed all business sensitive code out of it, so it 
should be easy to read) in attachment

 

Problematic Kafka Streams behaviour that i can see by logging every step:

1. We have 80 messages in the input topic

2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
ProcessorB, ProcessorC and ProcessorD

3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed correctly, 
results are sent down the stream. Counter is upated

4. init() method was not called for ProcessorA and ProcessorB

5. ProcessorC and ProcessorD are created and they start to receive all the rest 
of data. 95-99%

6. init() method is called for both ProcessorC and ProcessorD. It initiates 
punctuation, which causes Metrics message be created and sent down the metric 
stream periodically

7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
Metric entity was not sent to metrics topic

8. Processing is finished.

 

In the end:

Expected:
 * 80 entities were processed and sent to the Sink
 * Metrics entities contain counters which sum up to 80

Actual results:
 * 80 entities were processed and sent to the Sink
 * Metrics entities contain counters which sum up to some number 3-6% less than 
80, for example 786543

 

Problem:
 * init() method call is not guaranteed
 * there is no way to guarantee that all work was done by punctuate method 
before close()

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7035) Kafka Processor's init() method sometimes is not called

2018-06-11 Thread Oleksandr Konopko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Konopko updated KAFKA-7035:
-
Attachment: TransformProcessor.java

> Kafka Processor's init() method sometimes is not called
> ---
>
> Key: KAFKA-7035
> URL: https://issues.apache.org/jira/browse/KAFKA-7035
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Oleksandr Konopko
>Priority: Critical
> Attachments: TransformProcessor.java
>
>
> Scenario:
> 1. We have processing of Kafka Topic which is implemented with Processor API
> 2. We want to collect metrics (lets say just count number of processed 
> entities for simplicity)
> 3. How we tried to organize this
>  * process data with process() method and send it down the stream with context
>  * on each call of process() method update the counter
>  * schedule puctuate function which will send metric to special topic. Metric 
> is build with counter
> You can find the code (we removed all business sensitive code out of it, so 
> it should be easy to read) in attachment
>  
> Problematic Kafka Streams behaviour that i can see by logging every step:
> 1. We have 80 messages in the input topic
> 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
> ProcessorB, ProcessorC and ProcessorD
> 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed 
> correctly, results are sent down the stream. Counter is upated
> 4. init() method was not called for ProcessorA and ProcessorB
> 5. ProcessorC and ProcessorD are created and they start to receive all the 
> rest of data. 95-99%
> 6. init() method is called for both ProcessorC and ProcessorD. It initiates 
> punctuation, which causes Metrics message be created and sent down the metric 
> stream periodically
> 7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
> Metric entity was not sent to metrics topic
> 8. Processing is finished.
>  
> In the end:
> Expected:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to 80
> Actual results:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to some number 3-6% less 
> than 80, for example 786543
>  
> Problem:
>  * init() method call is not guaranteed
>  * there is no way to guarantee that all work was done by punctuate method 
> before close()
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7036) Complete the docs of KafkaConsumer#poll

2018-06-11 Thread Chia-Ping Tsai (JIRA)
Chia-Ping Tsai created KAFKA-7036:
-

 Summary: Complete the docs of KafkaConsumer#poll
 Key: KAFKA-7036
 URL: https://issues.apache.org/jira/browse/KAFKA-7036
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


KafkaConsumer#poll has a nice docs about the expected exceptions. However, it 
lacks the description of SerializationException. Another mirror issue is that 
KafkaConsumer doesn't catch all type of exception which may be thrown by 
deserializer (see below). We should use Throwable to replace the 
RuntimeException so as to catch all exception and then wrap them to 
SerializationException.
{code:java}
private ConsumerRecord parseRecord(TopicPartition partition,
 RecordBatch batch,
 Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
TimestampType timestampType = batch.timestampType();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
K key = keyBytes == null ? null : 
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null : 
Utils.toArray(valueBytes);
V value = valueBytes == null ? null : 
this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(), 
offset,
timestamp, timestampType, 
record.checksumOrNull(),
keyByteArray == null ? 
ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ? 
ConsumerRecord.NULL_SIZE : valueByteArray.length,
key, value, headers);
} catch (RuntimeException e) {
throw new SerializationException("Error deserializing key/value for 
partition " + partition +
" at offset " + record.offset() + ". If needed, please seek 
past the record to continue consumption.", e);
}
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6952) LogCleaner stopped due to org.apache.kafka.common.errors.CorruptRecordException

2018-06-11 Thread evan einst (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507918#comment-16507918
 ] 

evan einst commented on KAFKA-6952:
---

This error occur "log.message.format.version=0.10.2 and 0.11.0"

Final I delete offset files which error occur and copy same files from other 
broker skiped this problem.

 

> LogCleaner stopped due to 
> org.apache.kafka.common.errors.CorruptRecordException
> ---
>
> Key: KAFKA-6952
> URL: https://issues.apache.org/jira/browse/KAFKA-6952
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.2
>Reporter: evan einst
>Priority: Major
> Attachments: __consumer_offsets-30-2.zip, 
> __consumer_offsets-30-3.zip, __consumer_offsets-30_1.zip
>
>
> This error occur our prd cluster 0.10.2.1 upgrading to the 0.11.0.2 version,  
> here said(https://issues.apache.org/jira/browse/KAFKA-5431)
> that in the 0.11.0.0 version is fixed this error, we decided to upgrade to 
> 0.11.0.2 version, but after upgrading the code of a server, this problem 
> still occured.
> 
> {{[2018-05-26 13:23:58,029] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-42. (kafka.log.LogCleaner) [2018-05-26 13:23:58,029] INFO 
> Cleaner 0: Building offset map for __consumer_offsets-42... 
> (kafka.log.LogCleaner) [2018-05-26 13:23:58,050] INFO Cleaner 0: Building 
> offset map for log __consumer_offsets-42 for 19 segments in offset range [0, 
> 6919353). (kafka.log.LogCleaner) [2018-05-26 13:23:58,300] ERROR 
> [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner) 
> org.apache.kafka.common.errors.CorruptRecordException: Record size is less 
> than the minimum record overhead (14) [2018-05-26 13:23:58,301] INFO 
> [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)}}
> 
> Please help me resolve this problem, thank you very much!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2018-06-11 Thread Sampath Kumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507929#comment-16507929
 ] 

Sampath Kumar commented on KAFKA-2857:
--

We are still observing group coordinator not available in 
https://issues.apache.org/jira/browse/KAFKA-7017 

> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6964) Add ability to print all internal topic names

2018-06-11 Thread Jagadesh Adireddi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507936#comment-16507936
 ] 

Jagadesh Adireddi commented on KAFKA-6964:
--

Hi [~bbejeck],
Just trying to make myself clear. If we want to print internal topic names, can 
we use  `InternalTopologyBuilder#getSourceTopicNames()`  to get all topic names?

> Add ability to print all internal topic names
> -
>
> Key: KAFKA-6964
> URL: https://issues.apache.org/jira/browse/KAFKA-6964
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: needs-kip
>
> For security access reasons some streams users need to build all internal 
> topics before deploying their streams application.  While it's possible to 
> get all internal topic names from the {{Topology#describe()}} method, it 
> would be nice to have a separate method that prints out only the internal 
> topic names to ease the process.
> I think this change will require a KIP, so I've added the appropriate label.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7037) Kafka doesn't allow to delete topic with '+' in the name

2018-06-11 Thread Sandeep Nemuri (JIRA)
Sandeep Nemuri created KAFKA-7037:
-

 Summary: Kafka doesn't allow to delete topic with '+' in the name
 Key: KAFKA-7037
 URL: https://issues.apache.org/jira/browse/KAFKA-7037
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.0
Reporter: Sandeep Nemuri


 
{code:java}
[kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create 
--zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic 
test+topic
Created topic "test+topic".
[kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --list
__consumer_offsets
test+topic

[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --delete --topic test+topic
Error while executing topic command : Topic test+topic does not exist on ZK 
path ssltester-3.openstacklocal:2181
[2018-06-11 09:36:32,989] ERROR java.lang.IllegalArgumentException: Topic 
test+topic does not exist on ZK path ssltester-3.openstacklocal:2181
 at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:166)
 at kafka.admin.TopicCommand$.main(TopicCommand.scala:68)
 at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)
{code}
The major issue is that while executing a delete command kafka cli tool is 
removing the "+" symbol and deleting the incorrect topic. In below case if 
_"*test+topic"*_ is deleted kafka deletes _*testtopic.*_
{code:java}
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create 
--zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic 
testtopic
Created topic "testtopic".
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --topic test+topic --delete
Topic testtopic is marked for deletion.{code}
 

It seems create topic doesn't check for '+' and delete topic replaces '+' from 
the topic name.

Create Topic: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L77-L85]
 

Delete Topic : 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7037) Kafka doesn't allow to delete topic with '+' in the name

2018-06-11 Thread Sandeep Nemuri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507941#comment-16507941
 ] 

Sandeep Nemuri commented on KAFKA-7037:
---

Two options: 
 # Do not allow to create topic names which contains '+'
 # Allow deleting topics where topic name contains '+'

I'd be interested to work on this jira, Kindly provide your inputs/comments on 
how to proceed further.

> Kafka doesn't allow to delete topic with '+' in the name
> 
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Sandeep Nemuri
>Priority: Major
>
>  
> {code:java}
> [kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic test+topic
> Created topic "test+topic".
> [kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --list
> __consumer_offsets
> test+topic
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --delete --topic test+topic
> Error while executing topic command : Topic test+topic does not exist on ZK 
> path ssltester-3.openstacklocal:2181
> [2018-06-11 09:36:32,989] ERROR java.lang.IllegalArgumentException: Topic 
> test+topic does not exist on ZK path ssltester-3.openstacklocal:2181
>  at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:166)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:68)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> {code}
> The major issue is that while executing a delete command kafka cli tool is 
> removing the "+" symbol and deleting the incorrect topic. In below case if 
> _"*test+topic"*_ is deleted kafka deletes _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  
> It seems create topic doesn't check for '+' and delete topic replaces '+' 
> from the topic name.
> Create Topic: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L77-L85]
>  
> Delete Topic : 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7037) Kafka doesn't allow to delete topic with '+' in the name

2018-06-11 Thread Sandeep Nemuri (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandeep Nemuri updated KAFKA-7037:
--
Affects Version/s: 1.0.0

> Kafka doesn't allow to delete topic with '+' in the name
> 
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Priority: Major
>
>  
> {code:java}
> [kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic test+topic
> Created topic "test+topic".
> [kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --list
> __consumer_offsets
> test+topic
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --delete --topic test+topic
> Error while executing topic command : Topic test+topic does not exist on ZK 
> path ssltester-3.openstacklocal:2181
> [2018-06-11 09:36:32,989] ERROR java.lang.IllegalArgumentException: Topic 
> test+topic does not exist on ZK path ssltester-3.openstacklocal:2181
>  at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:166)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:68)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> {code}
> The major issue is that while executing a delete command kafka cli tool is 
> removing the "+" symbol and deleting the incorrect topic. In below case if 
> _"*test+topic"*_ is deleted kafka deletes _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  
> It seems create topic doesn't check for '+' and delete topic replaces '+' 
> from the topic name.
> Create Topic: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L77-L85]
>  
> Delete Topic : 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7005) Remove duplicate Java Resource class.

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507956#comment-16507956
 ] 

ASF GitHub Bot commented on KAFKA-7005:
---

big-andy-coates opened a new pull request #5184: KAFKA-7005: Remove duplicate 
resource class.
URL: https://github.com/apache/kafka/pull/5184
 
 
   Fix for [KAFKA-7005](https://issues.apache.org/jira/browse/KAFKA-7005).
   
   This is a follow-on change requested as part of the initial PR for KIP-290 
#5117.  @cmccabe requested that the `resource.Resource` class be factored out 
in favour of `ConfigResource` to avoid confusion between all the `Resource` 
implementations.
   
   cc @cmccabe, @junrao 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove duplicate Java Resource class.
> -
>
> Key: KAFKA-7005
> URL: https://issues.apache.org/jira/browse/KAFKA-7005
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Relating to one of the outstanding work items in PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...]
> The o.a.k.c.request.Resource class could be dropped in favour of 
> o.a.k.c..config.ConfigResource.
> This will remove the duplication of `Resource` classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-11 Thread Sandeep Nemuri (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandeep Nemuri updated KAFKA-7037:
--
Summary: delete topic command replaces '+' from the topic name which leads 
incorrect topic deletion  (was: Kafka doesn't allow to delete topic with '+' in 
the name)

> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Priority: Major
>
>  
> {code:java}
> [kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic test+topic
> Created topic "test+topic".
> [kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --list
> __consumer_offsets
> test+topic
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --delete --topic test+topic
> Error while executing topic command : Topic test+topic does not exist on ZK 
> path ssltester-3.openstacklocal:2181
> [2018-06-11 09:36:32,989] ERROR java.lang.IllegalArgumentException: Topic 
> test+topic does not exist on ZK path ssltester-3.openstacklocal:2181
>  at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:166)
>  at kafka.admin.TopicCommand$.main(TopicCommand.scala:68)
>  at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> {code}
> The major issue is that while executing a delete command kafka cli tool is 
> removing the "+" symbol and deleting the incorrect topic. In below case if 
> _"*test+topic"*_ is deleted kafka deletes _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  
> It seems create topic doesn't check for '+' and delete topic replaces '+' 
> from the topic name.
> Create Topic: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L77-L85]
>  
> Delete Topic : 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-11 Thread Sandeep Nemuri (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandeep Nemuri updated KAFKA-7037:
--
Description: 
While executing a delete command kafka cli tool is removing the "+" symbol and 
deleting the incorrect topic. In below case if  _"*test+topic"*_ is deleted 
kafka deletes  _*testtopic.*_
{code:java}
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create 
--zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic 
testtopic
Created topic "testtopic".
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --topic test+topic --delete
Topic testtopic is marked for deletion.{code}
 delete topic replaces '+' and few other special characters from the topic name 
: 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33]

 

  was:
 
{code:java}
[kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create 
--zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic 
test+topic
Created topic "test+topic".
[kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --list
__consumer_offsets
test+topic

[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --delete --topic test+topic
Error while executing topic command : Topic test+topic does not exist on ZK 
path ssltester-3.openstacklocal:2181
[2018-06-11 09:36:32,989] ERROR java.lang.IllegalArgumentException: Topic 
test+topic does not exist on ZK path ssltester-3.openstacklocal:2181
 at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:166)
 at kafka.admin.TopicCommand$.main(TopicCommand.scala:68)
 at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)
{code}
The major issue is that while executing a delete command kafka cli tool is 
removing the "+" symbol and deleting the incorrect topic. In below case if 
_"*test+topic"*_ is deleted kafka deletes _*testtopic.*_
{code:java}
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create 
--zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic 
testtopic
Created topic "testtopic".
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --topic test+topic --delete
Topic testtopic is marked for deletion.{code}
 

It seems create topic doesn't check for '+' and delete topic replaces '+' from 
the topic name.

Create Topic: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L77-L85]
 

Delete Topic : 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33]

 


> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Priority: Major
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' and few other special characters from the topic 
> name : 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-11 Thread Sandeep Nemuri (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandeep Nemuri updated KAFKA-7037:
--
Comment: was deleted

(was: Two options: 
 # Do not allow to create topic names which contains '+'
 # Allow deleting topics where topic name contains '+'

I'd be interested to work on this jira, Kindly provide your inputs/comments on 
how to proceed further.)

> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Priority: Major
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' and few other special characters from the topic 
> name : 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3575) Use console consumer access topic that does not exist, can not use "Control + C" to exit process

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3575:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for almost an year, moving this out to 
2.1.0.

> Use console consumer access topic that does not exist, can not use "Control + 
> C" to exit process
> 
>
> Key: KAFKA-3575
> URL: https://issues.apache.org/jira/browse/KAFKA-3575
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: SUSE Linux Enterprise Server 11 SP3
>Reporter: NieWang
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: patch-available
> Fix For: 2.1.0
>
>
> 1.  use "sh kafka-console-consumer.sh --zookeeper 10.252.23.133:2181 --topic 
> topic_02"  start console consumer. topic_02 does not exist.
> 2. you can not use "Control + C" to exit console consumer process. The 
> process is blocked.
> 3. use jstack check process stack, as follows:
> linux:~ # jstack 122967
> 2016-04-18 15:46:06
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode):
> "Attach Listener" #29 daemon prio=9 os_prio=0 tid=0x01781800 
> nid=0x1e0c8 waiting on condition [0x]
>java.lang.Thread.State: RUNNABLE
> "Thread-4" #27 prio=5 os_prio=0 tid=0x018a4000 nid=0x1e08a waiting on 
> condition [0x7ffbe5ac]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe00ed3b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at kafka.tools.ConsoleConsumer$$anon$1.run(ConsoleConsumer.scala:101)
> "SIGINT handler" #28 daemon prio=9 os_prio=0 tid=0x019d5800 
> nid=0x1e089 in Object.wait() [0x7ffbe5bc1000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.$$YJP$$wait(Native Method)
> at java.lang.Object.wait(Object.java)
> at java.lang.Thread.join(Thread.java:1245)
> - locked <0xe71fd4e8> (a kafka.tools.ConsoleConsumer$$anon$1)
> at java.lang.Thread.join(Thread.java:1319)
> at 
> java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
> at 
> java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
> at java.lang.Shutdown.runHooks(Shutdown.java:123)
> at java.lang.Shutdown.sequence(Shutdown.java:167)
> at java.lang.Shutdown.exit(Shutdown.java:212)
> - locked <0xe00abfd8> (a java.lang.Class for 
> java.lang.Shutdown)
> at java.lang.Terminator$1.handle(Terminator.java:52)
> at sun.misc.Signal$1.run(Signal.java:212)
> at java.lang.Thread.run(Thread.java:745)
> "metrics-meter-tick-thread-2" #20 daemon prio=5 os_prio=0 
> tid=0x7ffbec77a800 nid=0x1e079 waiting on condition [0x7ffbe66c8000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xe6fa6438> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> "metrics-meter-tick-thread-1" #19 daemon prio=5 os_prio=0 
> tid=0x7ffbec783000 nid=0x1e078 waiting on condition [0x7ffbe67c9000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(

[jira] [Updated] (KAFKA-3733) Avoid long command lines by setting CLASSPATH in environment

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3733:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Avoid long command lines by setting CLASSPATH in environment
> 
>
> Key: KAFKA-3733
> URL: https://issues.apache.org/jira/browse/KAFKA-3733
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Adrian Muraru
>Assignee: Adrian Muraru
>Priority: Minor
> Fix For: 2.1.0
>
>
> {{kafka-run-class.sh}} sets the JVM classpath in the command line via {{-cp}}.
> This generates long command lines that gets trimmed by the shell in commands 
> like ps, pgrep,etc.
> An alternative is to set the CLASSPATH in environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4126) No relevant log when the topic is non-existent

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4126:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0

> No relevant log when the topic is non-existent
> --
>
> Key: KAFKA-4126
> URL: https://issues.apache.org/jira/browse/KAFKA-4126
> Project: Kafka
>  Issue Type: Bug
>Reporter: Balázs Barnabás
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 2.1.0
>
>
> When a producer sends a ProducerRecord into a Kafka topic that doesn't 
> existst, there is no relevant debug/error log that points out the error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4931) stop script fails due 4096 ps output limit

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4931:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> stop script fails due 4096 ps output limit
> --
>
> Key: KAFKA-4931
> URL: https://issues.apache.org/jira/browse/KAFKA-4931
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0
>Reporter: Amit Jain
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: patch-available
> Fix For: 2.1.0
>
>
> When run the script: bin/zookeeper-server-stop.sh fails to stop the zookeeper 
> server process if the ps output exceeds 4096 character limit of linux. I 
> think instead of ps we can use ${JAVA_HOME}/bin/jps -vl | grep QuorumPeerMain 
>  it would correctly stop zookeeper process. Currently we are using kill 
> PIDS=$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk 
> '{print $1}')



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4794:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5359) Exceptions from RequestFuture lack parts of the stack trace

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5359:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Exceptions from RequestFuture lack parts of the stack trace
> ---
>
> Key: KAFKA-5359
> URL: https://issues.apache.org/jira/browse/KAFKA-5359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Magnus Reftel
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 2.1.0
>
>
> When an exception occurs within a task that reports its result using a 
> RequestFuture, that exception is stored in a field on the RequestFuture using 
> the {{raise}} method. In many places in the code where such futures are 
> completed, that exception is then thrown directly using {{throw 
> future.exception();}} (see e.g. 
> [Fetcher.getTopicMetadata|https://github.com/apache/kafka/blob/aebba89a2b9b5ea6a7cab2599555232ef3fe21ad/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L316]).
> This means that the exception that ends up in client code only has stack 
> traces related to the original exception, but nothing leading up to the 
> completion of the future. The client therefore gets no indication of what was 
> going on in the client code - only that it somehow ended up in the Kafka 
> libraries, and that a task failed at some point.
> One solution to this is to use the exceptions from the future as causes for 
> chained exceptions, so that the client gets a stack trace that shows what the 
> client was doing, in addition to getting the stack traces for the exception 
> in the task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5517) Support linking to particular configuration parameters

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5517:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Support linking to particular configuration parameters
> --
>
> Key: KAFKA-5517
> URL: https://issues.apache.org/jira/browse/KAFKA-5517
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: patch-available
> Fix For: 2.1.0
>
>
> Currently the configuration parameters are documented long tables, and it's 
> only possible to link to the heading before a particular table. When 
> discussing configuration parameters on forums it would be helpful to be able 
> to link to the particular parameter under discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6448) Mx4jLoader#props.getBoolean("kafka_mx4jenable", false) conflict with the annotation

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6448:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Mx4jLoader#props.getBoolean("kafka_mx4jenable", false) conflict with the 
> annotation
> ---
>
> Key: KAFKA-6448
> URL: https://issues.apache.org/jira/browse/KAFKA-6448
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Hongyuan Li
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: KAFKA-6448-1.patch, KAFKA-6448-2.patch
>
>
> In the annotation, it said 
> {code}*This feature must be enabled with -Dmx4jenable=true*{code}
> *which is not compatible with the code* 
> {code}
> **
> props.getBoolean("kafka_mx4jenable", false)
>  **
> {code}
> patch KAFKA-6448-1.patch modifies the code, and KAFKA-6448-2.patch modifies 
> the annotation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3999) Consumer bytes-fetched metric uses decompressed message size

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3999:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Looks like the KIP associated with this JIRA is still under discussion, moving 
out to 2.1.0.

> Consumer bytes-fetched metric uses decompressed message size
> 
>
> Key: KAFKA-3999
> URL: https://issues.apache.org/jira/browse/KAFKA-3999
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 2.1.0
>
>
> It looks like the computation for the bytes-fetched metrics uses the size of 
> the decompressed message set. I would have expected it to be based off of the 
> raw size of the fetch responses. Perhaps it would be helpful to expose both 
> the raw and decompressed fetch sizes? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4893) async topic deletion conflicts with max topic length

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4893:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several weeks, moving this out to 
2.1.0.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 2.1.0
>
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4203) Java producer default max message size does not align with broker default

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4203:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Java producer default max message size does not align with broker default
> -
>
> Key: KAFKA-4203
> URL: https://issues.apache.org/jira/browse/KAFKA-4203
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1
>Reporter: Grant Henke
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.1.0
>
>
> The Java producer sets max.request.size = 1048576 (the base 2 version of 1 MB 
> (MiB))
> The broker sets max.message.bytes = 112 (the base 10 value of 1 MB + 12 
> bytes for overhead)
> This means that by default the producer can try to produce messages larger 
> than the broker will accept resulting in RecordTooLargeExceptions.
> There were not similar issues in the old producer because it sets 
> max.message.size = 100 (the base 10 value of 1 MB)
> I propose we increase the broker default for max.message.bytes to 1048588 
> (the base 2 value of 1 MB (MiB) + 12 bytes for overhead) so that any message 
> produced with default configs from either producer does not result in a 
> RecordTooLargeException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3554) Generate actual data with specific compression ratio and add multi-thread support in the ProducerPerformance tool.

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3554:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several weeks, moving this out to 
2.1.0.

> Generate actual data with specific compression ratio and add multi-thread 
> support in the ProducerPerformance tool.
> --
>
> Key: KAFKA-3554
> URL: https://issues.apache.org/jira/browse/KAFKA-3554
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently the ProducerPerformance always generate the payload with same 
> bytes. This does not quite well to test the compressed data because the 
> payload is extremely compressible no matter how big the payload is.
> We can make some changes to make it more useful for compressed messages. 
> Currently I am generating the payload containing integer from a given range. 
> By adjusting the range of the integers, we can get different compression 
> ratios. 
> API wise, we can either let user to specify the integer range or the expected 
> compression ratio (we will do some probing to get the corresponding range for 
> the users)
> Besides that, in many cases, it is useful to have multiple producer threads 
> when the producer threads themselves are bottleneck. Admittedly people can 
> run multiple ProducerPerformance to achieve similar result, but it is still 
> different from the real case when people actually use the producer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4249) Document how to customize GC logging options for broker

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4249:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Document how to customize GC logging options for broker
> ---
>
> Key: KAFKA-4249
> URL: https://issues.apache.org/jira/browse/KAFKA-4249
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 0.10.0.1
>Reporter: Jim Hoagland
>Assignee: Tom Bentley
>Priority: Major
> Fix For: 2.1.0
>
>
> We wanted to enable GC logging for Kafka broker and saw that you can set 
> GC_LOG_ENABLED=true.  However, this didn't do what we wanted.  For example, 
> the GC log will be overwritten every time the broker gets restarted.  It 
> wasn't clear how we could do that (no documentation of it that I can find), 
> so I did some research by looking at the source code and did some testing and 
> found that KAFKA_GC_LOG_OPTS could be set with alternate JVM options prior to 
> starting broker.  I posted my solution to StackOverflow:
>   
> http://stackoverflow.com/questions/39854424/how-to-enable-gc-logging-for-apache-kafka-brokers-while-preventing-log-file-ove
> (feel free to critique)
> That solution is now public, but it seems like the Kafka documentation should 
> say how to do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4307) Inconsistent parameters between console producer and consumer

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4307:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Inconsistent parameters between console producer and consumer
> -
>
> Key: KAFKA-4307
> URL: https://issues.apache.org/jira/browse/KAFKA-4307
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.1.0
>Reporter: Gwen Shapira
>Assignee: Balint Molnar
>Priority: Major
>  Labels: newbie
> Fix For: 2.1.0
>
>
> kafka-console-producer uses --broker-list while kafka-console-consumer uses 
> --bootstrap-server.
> Let's add --bootstrap-server to the producer for some consistency?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4665) Inconsistent handling of non-existing topics in offset fetch handling

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4665:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Inconsistent handling of non-existing topics in offset fetch handling
> -
>
> Key: KAFKA-4665
> URL: https://issues.apache.org/jira/browse/KAFKA-4665
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.1.0
>
>
> For version 0 of the offset fetch API, the broker returns 
> UNKNOWN_TOPIC_OR_PARTITION for any topics/partitions which do not exist at 
> the time of fetching. In later versions, we skip this check. We do, however, 
> continue to return UNKNOWN_TOPIC_OR_PARTITION for authorization errors (i.e. 
> if the principal does not have Describe access to the corresponding topic). 
> We should probably make this behavior consistent across versions.
> Note also that currently the consumer raises {{KafkaException}} when it 
> encounters an UNKNOWN_TOPIC_OR_PARTITION error in the offset fetch response, 
> which is inconsistent with how we usually handle this error. This probably 
> doesn't cause any problems currently only because of the inconsistency 
> mentioned in the first paragraph above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4862) Kafka client connect to a shutdown node will block for a long time

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4862:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

The KIP associated with this JIRA has not been accepted yet, moving out to 2.1.0

> Kafka client connect to a shutdown node will block for a long time
> --
>
> Key: KAFKA-4862
> URL: https://issues.apache.org/jira/browse/KAFKA-4862
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.2.0
>Reporter: Pengwei
>Assignee: Pengwei
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently in our test env, we found after one of the broker node crash(reboot 
> or os crash), the client maybe still connecting to the crash node to send 
> metadata request or other request, and it need about several  minutes to 
> aware the connection is timeout then try another node to connect to send the 
> request.  Then the client may still not aware the metadata change after 
> several minutes.
> We don't have a connection timeout for the network client, we should add a 
> connection timeout for the client



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5403) Transactions system test should dedup consumed messages by offset

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5403:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Transactions system test should dedup consumed messages by offset
> -
>
> Key: KAFKA-5403
> URL: https://issues.apache.org/jira/browse/KAFKA-5403
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
> Fix For: 2.1.0
>
>
> In KAFKA-5396, we saw that the consumers which verify the data in multiple 
> topics could read the same offsets multiple times, for instance when a 
> rebalance happens. 
> This would detect spurious duplicates, causing the test to fail. We should 
> dedup the consumed messages by offset and only fail the test if we have 
> duplicate values for a if for a unique set of offsets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5951) Autogenerate Producer RecordAccumulator metrics

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5951:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Autogenerate Producer RecordAccumulator metrics
> ---
>
> Key: KAFKA-5951
> URL: https://issues.apache.org/jira/browse/KAFKA-5951
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
>Priority: Major
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5952) Refactor Consumer Fetcher metrics

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5952:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on the PR for several months, moving this out 
to 2.1.0.

> Refactor Consumer Fetcher metrics
> -
>
> Key: KAFKA-5952
> URL: https://issues.apache.org/jira/browse/KAFKA-5952
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
>Priority: Major
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3297:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on the PR for several months, moving this out 
to 2.1.0.

> More optimally balanced partition assignment strategy (new consumer)
> 
>
> Key: KAFKA-3297
> URL: https://issues.apache.org/jira/browse/KAFKA-3297
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 2.1.0
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the new consumer. For the original high-level consumer, 
> see KAFKA-2435.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4850) RocksDb cannot use Bloom Filters

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4850:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on the PR for several months, moving out to 
2.1.0.

> RocksDb cannot use Bloom Filters
> 
>
> Key: KAFKA-4850
> URL: https://issues.apache.org/jira/browse/KAFKA-4850
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Bharat Viswanadham
>Priority: Major
> Fix For: 2.1.0
>
>
> Bloom Filters would speed up RocksDb lookups. However they currently do not 
> work in RocksDb 5.0.2. This has been fixed in trunk, but we'll have to wait 
> until that is released and tested. 
> Then we can add the line in RocksDbStore.java in openDb:
> tableConfig.setFilter(new BloomFilter(10));



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4808) send of null key to a compacted topic should throw error back to user

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4808:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on the PR for several months, moving this out 
to 2.1.0.

> send of null key to a compacted topic should throw error back to user
> -
>
> Key: KAFKA-4808
> URL: https://issues.apache.org/jira/browse/KAFKA-4808
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
>Reporter: Ismael Juma
>Assignee: Mayuresh Gharat
>Priority: Major
> Fix For: 2.1.0
>
>
> If a message with a null key is produced to a compacted topic, the broker 
> returns `CorruptRecordException`, which is a retriable exception. As such, 
> the producer keeps retrying until retries are exhausted or request.timeout.ms 
> expires and eventually throws a TimeoutException. This is confusing and not 
> user-friendly.
> We should throw a meaningful error back to the user. From an implementation 
> perspective, we would have to use a non retriable error code to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4701) Allow kafka brokers to dynamically reload truststore without restarting.

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4701:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

We have added dynamic update of truststore using AdminClient/kafka-configs.sh 
with new truststore files. Moving this out to 2.1.0 to see if we need to 
support updates without filename changes.

> Allow kafka brokers to dynamically reload truststore without restarting.
> 
>
> Key: KAFKA-4701
> URL: https://issues.apache.org/jira/browse/KAFKA-4701
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Allen Xiang
>Priority: Major
>  Labels: security
> Fix For: 2.1.0
>
>
> Right now in order to add SSL clients(update broker truststores), a rolling 
> restart of all brokers is required. This is very time consuming and 
> unnecessary. A dynamic truststore manager is needed to reload truststore from 
> file system without restarting brokers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3190) KafkaProducer should not invoke callback in send()

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3190:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on the PR for several months, moving this out 
to 2.1.0.

> KafkaProducer should not invoke callback in send()
> --
>
> Key: KAFKA-3190
> URL: https://issues.apache.org/jira/browse/KAFKA-3190
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 2.1.0
>
>
> Currently KafkaProducer will invoke callback.onComplete() if it receives an 
> ApiException during send(). This breaks the guarantee that callback will be 
> invoked in order. It seems ApiException in send() only comes from metadata 
> refresh. If so, we can probably simply throw it instead of invoking 
> callback().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5054:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for a long time, moving out to 2.1.0.

> ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
> 
>
> Key: KAFKA-5054
> URL: https://issues.apache.org/jira/browse/KAFKA-5054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Critical
> Fix For: 2.1.0
>
>
> {{putIfAbsent}} and {{delete}} should be synchronized as they involve at 
> least 2 operations on the underlying store and may result in inconsistent 
> results if someone were to query via IQ



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4682:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0.

> Committed offsets should not be deleted if a consumer is still active 
> (KIP-211)
> ---
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: kip
> Fix For: 2.1.0
>
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3042:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since there is no fix yet.

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.0.0
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>Priority: Major
>  Labels: reliability
> Fix For: 2.1.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7033) Modify AbstractOptions's timeoutMs as Long type

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508205#comment-16508205
 ] 

ASF GitHub Bot commented on KAFKA-7033:
---

darionyaphet opened a new pull request #5186: [KAFKA-7033] Modify 
AbstractOptions's timeoutMs as Long type
URL: https://github.com/apache/kafka/pull/5186
 
 
   [KAFKA-7033 | Modify AbstractOptions's timeoutMs as Long 
type](https://issues.apache.org/jira/browse/KAFKA-7033)
   
   Currently AbstractOptions's timeoutMs is Integer and using Long to represent 
timeout Millisecond maybe better .
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Modify AbstractOptions's timeoutMs as Long type
> ---
>
> Key: KAFKA-7033
> URL: https://issues.apache.org/jira/browse/KAFKA-7033
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 1.1.0
>Reporter: darion yaphet
>Priority: Minor
>
> Currently AbstractOptions's timeoutMs is Integer and using Long  to represent 
> timeout Millisecond maybe better .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508215#comment-16508215
 ] 

ASF GitHub Bot commented on KAFKA-6788:
---

cyrusv closed pull request #4878: KAFKA-6788: Combine queries for describe and 
delete groups in AdminCl…
URL: https://github.com/apache/kafka/pull/4878
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index fa3f943555b..cd79453d29a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2252,14 +2252,17 @@ public DescribeConsumerGroupsResult 
describeConsumerGroups(final Collection describedGroupIds = new HashSet<>();
+
 for (final Map.Entry> entry : futures.entrySet()) {
 // skip sending request for those futures that already failed.
 if (entry.getValue().isCompletedExceptionally())
 continue;
 
 final String groupId = entry.getKey();
+if (describedGroupIds.contains(groupId)) {
+continue;
+}
 
 final long startFindCoordinatorMs = time.milliseconds();
 final long deadline = calcDeadlineMs(startFindCoordinatorMs, 
options.timeoutMs());
@@ -2274,53 +2277,82 @@ public DescribeConsumerGroupsResult 
describeConsumerGroups(final Collection future = 
futures.get(groupId);
-final DescribeGroupsResponse.GroupMetadata 
groupMetadata = response.groups().get(groupId);
+final Set groupIdsToDescribe = new 
HashSet<>();
+for (ListGroupsResponse.Group group : 
listResponse.groups()) {
+groupIdsToDescribe.add(group.groupId());
+}
 
-final Errors groupError = groupMetadata.error();
-if (groupError != Errors.NONE) {
-// TODO: KAFKA-6789, we can retry based on the 
error code
-
future.completeExceptionally(groupError.exception());
-} else {
-final String protocolType = 
groupMetadata.protocolType();
-if 
(protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) 
{
-final 
List members = groupMetadata.members();
-final List consumers = 
new ArrayList<>(members.size());
-
-for (DescribeGroupsResponse.GroupMember 
groupMember : members) {
-final PartitionAssignor.Assignment 
assignment =
-
ConsumerProtocol.deserializeAssignment(
-
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment(;
-
-final MemberDescription 
memberDescription =
-new MemberDescription(
-groupMember.memberId(),
-groupMember.clientId(),
-
groupMember.clientHost(),
-new 
MemberAssignment(assignment.partitions()));
-consumers.add(memberDescription);
+runnable.call(new Call("describeConsumerGroups", 
deadline, new ConstantNodeIdProvider(nodeId)) {
+
+@Override
+AbstractRequest.Builder createRequest(int 
timeoutMs) {
+return new 
DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
+}
+
+@Override
+void handleResponse(AbstractResponse 
abstractResponse) {
+final DescribeGroupsResponse response = 
(DescribeGroupsResponse) abstractResponse;
+for (String describedCandidate : 
groupIdsToDescribe) {
+if 
(response.groups().containsKey(describedCandidate)) {
+
describedGroupIds.add(describedCandidate);
+  

[jira] [Updated] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-11 Thread Sandeep Nemuri (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandeep Nemuri updated KAFKA-7037:
--
Description: 
While executing a delete command kafka cli tool is removing the "+" symbol and 
deleting the incorrect topic. In below case if  _"*test+topic"*_ is deleted 
kafka deletes  _*testtopic.*_
{code:java}
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create 
--zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic 
testtopic
Created topic "testtopic".
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --topic test+topic --delete
Topic testtopic is marked for deletion.{code}
 delete topic replaces '+' from the topic name  

  was:
While executing a delete command kafka cli tool is removing the "+" symbol and 
deleting the incorrect topic. In below case if  _"*test+topic"*_ is deleted 
kafka deletes  _*testtopic.*_
{code:java}
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create 
--zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic 
testtopic
Created topic "testtopic".
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --topic test+topic --delete
Topic testtopic is marked for deletion.{code}
 delete topic replaces '+' and few other special characters from the topic name 
: 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33]

 


> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Priority: Major
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' from the topic name  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508243#comment-16508243
 ] 

Guozhang Wang commented on KAFKA-6889:
--

Confluent jenkins has read access but not write access: i.e. you can check the 
results of a certain job, but cannot create new jobs.

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7038) Support AdminClient Example

2018-06-11 Thread darion yaphet (JIRA)
darion yaphet created KAFKA-7038:


 Summary: Support AdminClient Example
 Key: KAFKA-7038
 URL: https://issues.apache.org/jira/browse/KAFKA-7038
 Project: Kafka
  Issue Type: New Feature
  Components: admin
Reporter: darion yaphet






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-6889:
---
Attachment: (was: Screen Shot 2018-06-11 at 7.15.02 PM.png)

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
> Attachments: Screen Shot 2018-06-11 at 7.15.02 PM.png
>
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-6889:
---
Attachment: Screen Shot 2018-06-11 at 7.15.02 PM.png

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
> Attachments: Screen Shot 2018-06-11 at 7.15.02 PM.png
>
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-6889:
---
Attachment: Screen Shot 2018-06-11 at 7.15.02 PM.png

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
> Attachments: Screen Shot 2018-06-11 at 7.15.02 PM.png
>
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-6889:
---
Attachment: (was: Screen Shot 2018-06-11 at 7.15.02 PM.png)

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508285#comment-16508285
 ] 

Stanislav Kozlovski commented on KAFKA-6889:


What do you mean the Jenkins has read access? 

I personally cannot open the link sent by Matthias -  https://imgur.com/yt7Su2e

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508317#comment-16508317
 ] 

Guozhang Wang commented on KAFKA-6860:
--

I see. Thanks for the explanation [~mjsax]. And your proposed fix makes sense 
to me. I think a more general solution would involve also fixing the double 
checkpointing for non-EOS case: today we checkpoint in `suspend` if EOS is not 
turned in and in `closeSuspended` always. So for EOS, we only checkpoint in 
`closeSuspended`, while in non EOS we checkpoint in both, hence we have 
unnecessarily written twice of the checkpoints when closing. But for this 
general fix, I think it may be better to consider fixing with some refactoring 
on the ProcessorStateManager code, and hence not necessarily to be included for 
this JIRA.

> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> fatalException);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6860:
--

Assignee: Matthias J. Sax

> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Assignee: Matthias J. Sax
>Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> fatalException);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6860:
---
Fix Version/s: 2.0.0

> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 2.0.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> fatalException);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7035) Kafka Processor's init() method sometimes is not called

2018-06-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508402#comment-16508402
 ] 

Guozhang Wang commented on KAFKA-7035:
--

Hi [~akonopko], processor.init() should always be called when the task was 
initialized for the first time. I've just checked the code in 1.0.0 and 
confirmed it is still the case. 

BTW, could you elaborate the scenario why 1) four processors were created, and 
processorC and D takes over the data from processorA and B? Are there only two 
input partitions available? How many total num.threads existed for this 
application?

> Kafka Processor's init() method sometimes is not called
> ---
>
> Key: KAFKA-7035
> URL: https://issues.apache.org/jira/browse/KAFKA-7035
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Oleksandr Konopko
>Priority: Critical
> Attachments: TransformProcessor.java
>
>
> Scenario:
> 1. We have processing of Kafka Topic which is implemented with Processor API
> 2. We want to collect metrics (lets say just count number of processed 
> entities for simplicity)
> 3. How we tried to organize this
>  * process data with process() method and send it down the stream with context
>  * on each call of process() method update the counter
>  * schedule puctuate function which will send metric to special topic. Metric 
> is build with counter
> You can find the code (we removed all business sensitive code out of it, so 
> it should be easy to read) in attachment
>  
> Problematic Kafka Streams behaviour that i can see by logging every step:
> 1. We have 80 messages in the input topic
> 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
> ProcessorB, ProcessorC and ProcessorD
> 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed 
> correctly, results are sent down the stream. Counter is upated
> 4. init() method was not called for ProcessorA and ProcessorB
> 5. ProcessorC and ProcessorD are created and they start to receive all the 
> rest of data. 95-99%
> 6. init() method is called for both ProcessorC and ProcessorD. It initiates 
> punctuation, which causes Metrics message be created and sent down the metric 
> stream periodically
> 7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
> Metric entity was not sent to metrics topic
> 8. Processing is finished.
>  
> In the end:
> Expected:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to 80
> Actual results:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to some number 3-6% less 
> than 80, for example 786543
>  
> Problem:
>  * init() method call is not guaranteed
>  * there is no way to guarantee that all work was done by punctuate method 
> before close()
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508420#comment-16508420
 ] 

ASF GitHub Bot commented on KAFKA-6946:
---

lindong28 closed pull request #5164: KAFKA-6946: Keep the session id for 
incremental fetch when fetch responses are throttled
URL: https://github.com/apache/kafka/pull/5164
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/FetchSession.scala 
b/core/src/main/scala/kafka/server/FetchSession.scala
index 7a47780a135..68f79cace38 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -290,6 +290,12 @@ trait FetchContext extends Logging {
 
   def partitionsToLogString(partitions: util.Collection[TopicPartition]): 
String =
 FetchSession.partitionsToLogString(partitions, isTraceEnabled)
+
+  /**
+* Return an empty throttled response due to quota violation.
+*/
+  def getThrottledResponse(throttleTimeMs: Int): FetchResponse[Records] =
+new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, throttleTimeMs, 
INVALID_SESSION_ID)
 }
 
 /**
@@ -474,6 +480,21 @@ class IncrementalFetchContext(private val time: Time,
   }
 }
   }
+
+  override def getThrottledResponse(throttleTimeMs: Int): 
FetchResponse[Records] = {
+session.synchronized {
+  // Check to make sure that the session epoch didn't change in between
+  // creating this fetch context and generating this response.
+  val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch())
+  if (session.epoch != expectedEpoch) {
+info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
+  s"got ${session.epoch}.  Possible duplicate request.")
+new FetchResponse(Errors.INVALID_FETCH_SESSION_EPOCH, new 
FetchSession.RESP_MAP, throttleTimeMs, session.id)
+  } else {
+new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, 
throttleTimeMs, session.id)
+  }
+}
+  }
 }
 
 case class LastUsedKey(val lastUsedMs: Long,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6d9e3d115b8..f7e9ec98fdb 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -655,8 +655,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 quotas.request.throttle(request, requestThrottleTimeMs, 
sendResponse)
   }
   // If throttling is required, return an empty response.
-  unconvertedFetchResponse = new FetchResponse(Errors.NONE, new 
util.LinkedHashMap[TopicPartition,
-FetchResponse.PartitionData[Records]](), maxThrottleTimeMs, 
INVALID_SESSION_ID)
+  unconvertedFetchResponse = 
fetchContext.getThrottledResponse(maxThrottleTimeMs)
 } else {
   // Get the actual response. This will update the fetch context.
   unconvertedFetchResponse = 
fetchContext.updateAndGenerateResponseData(partitions)
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index 84efa6b684d..b79692d69d9 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -201,25 +201,34 @@ class FetchSessionTest {
 assertEquals(Errors.INVALID_FETCH_SESSION_EPOCH,
   context6.updateAndGenerateResponseData(respData2).error())
 
+// Test generating a throttled response for the incremental fetch session
+val reqData7 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+val context7 = fetchManager.newContext(
+  new JFetchMetadata(resp2.sessionId(), 2), reqData7, EMPTY_PART_LIST, 
false)
+val resp7 = context7.getThrottledResponse(100)
+assertEquals(Errors.NONE, resp7.error())
+assertEquals(resp2.sessionId(), resp7.sessionId())
+assertEquals(100, resp7.throttleTimeMs())
+
 // Close the incremental fetch session.
 val prevSessionId = resp5.sessionId
 var nextSessionId = prevSessionId
 do {
-  val reqData7 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-  reqData7.put(new TopicPartition("bar", 0), new 
FetchRequest.PartitionData(0, 0, 100))
-  reqData7.put(new TopicPartition("bar", 1), new 
FetchRequest.PartitionData(10, 0, 100))
-  val context7 = fetchManager.newContext(
-new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData7, 
EMPTY_PART_LIST, false)
-  assertEquals(classOf[SessionlessFetchContext], context7.getClass)
+  val 

[jira] [Commented] (KAFKA-7035) Kafka Processor's init() method sometimes is not called

2018-06-11 Thread Oleksandr Konopko (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508425#comment-16508425
 ] 

Oleksandr Konopko commented on KAFKA-7035:
--

ok, but processor constructor and processor.init() are not atomic piece of 
logic, right ?
 * data processed by A and B is not reprocessed by C and D
 * will check...

> Kafka Processor's init() method sometimes is not called
> ---
>
> Key: KAFKA-7035
> URL: https://issues.apache.org/jira/browse/KAFKA-7035
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Oleksandr Konopko
>Priority: Critical
> Attachments: TransformProcessor.java
>
>
> Scenario:
> 1. We have processing of Kafka Topic which is implemented with Processor API
> 2. We want to collect metrics (lets say just count number of processed 
> entities for simplicity)
> 3. How we tried to organize this
>  * process data with process() method and send it down the stream with context
>  * on each call of process() method update the counter
>  * schedule puctuate function which will send metric to special topic. Metric 
> is build with counter
> You can find the code (we removed all business sensitive code out of it, so 
> it should be easy to read) in attachment
>  
> Problematic Kafka Streams behaviour that i can see by logging every step:
> 1. We have 80 messages in the input topic
> 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
> ProcessorB, ProcessorC and ProcessorD
> 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed 
> correctly, results are sent down the stream. Counter is upated
> 4. init() method was not called for ProcessorA and ProcessorB
> 5. ProcessorC and ProcessorD are created and they start to receive all the 
> rest of data. 95-99%
> 6. init() method is called for both ProcessorC and ProcessorD. It initiates 
> punctuation, which causes Metrics message be created and sent down the metric 
> stream periodically
> 7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
> Metric entity was not sent to metrics topic
> 8. Processing is finished.
>  
> In the end:
> Expected:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to 80
> Actual results:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to some number 3-6% less 
> than 80, for example 786543
>  
> Problem:
>  * init() method call is not guaranteed
>  * there is no way to guarantee that all work was done by punctuate method 
> before close()
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508424#comment-16508424
 ] 

ASF GitHub Bot commented on KAFKA-6860:
---

mjsax opened a new pull request #5187: KAFKA-6860: Fix NPE in Kafka Streams 
with EOS enabled
URL: https://github.com/apache/kafka/pull/5187
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 2.0.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize globa

[jira] [Resolved] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled

2018-06-11 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6946.
-
Resolution: Fixed

> Keep the session id for incremental fetch when fetch responses are throttled 
> -
>
> Key: KAFKA-6946
> URL: https://issues.apache.org/jira/browse/KAFKA-6946
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jon Lee
>Priority: Major
>
> The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with 
> INVALID_SESSION_ID if the response needs to be throttled due to quota 
> violation. If it is for incremental fetch, this will make the client reset 
> its session and send a full fetch request next time. This is not a 
> correctness issue, but it may affect performance when fetches are throttled.
> In case of incremental fetch, a throttled response should use the same 
> session id as before so that the next unthrottled response can be in the same 
> session. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-11 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-7037:
--

Assignee: Vahid Hashemian

> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Assignee: Vahid Hashemian
>Priority: Major
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' from the topic name  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6697) JBOD configured broker should not die if log directory is invalid

2018-06-11 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6697:

Issue Type: Bug  (was: Improvement)

> JBOD configured broker should not die if log directory is invalid
> -
>
> Key: KAFKA-6697
> URL: https://issues.apache.org/jira/browse/KAFKA-6697
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 2.0.0
>
>
> Currently JBOD configured broker will still die on startup if 
> dir.getCanonicalPath() throws IOException. We should mark such log directory 
> as offline and broker should still run if there is good disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7029) ReplicaVerificationTool should not use the deprecated SimpleConsumer

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508472#comment-16508472
 ] 

ASF GitHub Bot commented on KAFKA-7029:
---

omkreddy opened a new pull request #5188: KAFKA-7029: Update 
ReplicaVerificationTool to use Java Consumer
URL: https://github.com/apache/kafka/pull/5188
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReplicaVerificationTool should not use the deprecated SimpleConsumer
> 
>
> Key: KAFKA-7029
> URL: https://issues.apache.org/jira/browse/KAFKA-7029
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Manikumar
>Priority: Major
>
> Unless there's a reason not to, the simplest would be to use KafkaConsumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6546) Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508476#comment-16508476
 ] 

ASF GitHub Bot commented on KAFKA-6546:
---

rajinisivaram opened a new pull request #5189: KAFKA-6546: Use 
LISTENER_NOT_FOUND_ON_LEADER error for missing listener
URL: https://github.com/apache/kafka/pull/5189
 
 
   For metadata request version 6 and above, use a different error code to 
indicate missing listener on leader broker to enable diagnosis of listener 
configuration issues.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener
> 
>
> Key: KAFKA-6546
> URL: https://issues.apache.org/jira/browse/KAFKA-6546
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.1.0
>
>
> In 1,1, if an endpoint is available on the broker processing a metadata 
> request, but the corresponding listener is not available on the leader of a 
> partition, LEADER_NOT_AVAILABLE is returned (earlier versions returned 
> UNKNOWN_SERVER_ERROR). This could indicate broker misconfiguration where some 
> brokers are not configured with all listeners or it could indicate a 
> transient error when listeners are dynamically added, We want to treat the 
> error as a transient error to process dynamic updates, but we should notify 
> clients of the actual error. This change should be made when MetadataRequest 
> version is updated so that LEADER_NOT_AVAILABLE is returned to older clients.
> See 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration]
>  and  [https://github.com/apache/kafka/pull/4539] for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned

2018-06-11 Thread Magesh kumar Nandakumar (JIRA)
Magesh kumar Nandakumar created KAFKA-7039:
--

 Summary: DelegatingClassLoader creates plugin instance even if its 
not Versioned
 Key: KAFKA-7039
 URL: https://issues.apache.org/jira/browse/KAFKA-7039
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Magesh kumar Nandakumar
Assignee: Magesh kumar Nandakumar
 Fix For: 2.0.0


The versioned interface was introduced as part of 
[KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin].
 DelegatingClassLoader is now attempting to create an instance of all the 
plugins, even if it's not required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508556#comment-16508556
 ] 

ASF GitHub Bot commented on KAFKA-7039:
---

mageshn opened a new pull request #5191: KAFKA-7039 : Create an instance of the 
plugin only it's a Versioned Plugin
URL: https://github.com/apache/kafka/pull/5191
 
 
   Create an instance of the plugin only it's a Versioned Plugin.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behavior change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DelegatingClassLoader creates plugin instance even if its not Versioned
> ---
>
> Key: KAFKA-7039
> URL: https://issues.apache.org/jira/browse/KAFKA-7039
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Critical
> Fix For: 2.0.0
>
>
> The versioned interface was introduced as part of 
> [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin].
>  DelegatingClassLoader is now attempting to create an instance of all the 
> plugins, even if it's not required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508566#comment-16508566
 ] 

Eugen Feller commented on KAFKA-6990:
-

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
particular job does the following logic:

 
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
 

Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38,
>  metadata=''}, sightings_sighting_byclientmac_0-1=OffsetAndMetadata{offset=1, 
> metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_2] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-2=OffsetAndMetadata{offset=8,
>  metadata=''}, 
> sightings_sighting_byclientmac_0-2=OffsetAndMetadata{offset=24, metadata=''}} 
> due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_2 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_3] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-3=OffsetAndMetadata{offset=21, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-3=OffsetAndMetadata{offset=102,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.in

[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508566#comment-16508566
 ] 

Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:08 PM:
--

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? For 
this particular job max poll records is set to 50. This job implements the 
following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
 

Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 


was (Author: efeller):
Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
particular job does the following logic:

 
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
 

Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=Offset

[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508566#comment-16508566
 ] 

Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:08 PM:
--

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? For 
this particular job max poll records is set to 50. This job implements the 
following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 


was (Author: efeller):
Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? For 
this particular job max poll records is set to 50. This job implements the 
following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
 

Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactm

[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508566#comment-16508566
 ] 

Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:09 PM:
--

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 50,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
job implements the following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 


was (Author: efeller):
Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? For 
this particular job max poll records is set to 50. This job implements the 
following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offs

[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508566#comment-16508566
 ] 

Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:10 PM:
--

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 50,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
job implements the following logic:
{code:java}
stream().foreach(record => {
// Buffered write this record to MongoDB 
// Await.result(record, 5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 


was (Author: efeller):
Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 50,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
job implements the following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38,
>  metadata=''}, sightings_sigh

[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508566#comment-16508566
 ] 

Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:11 PM:
--

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 50,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
job implements the following logic:
{code:java}
stream().foreach(record => {
// Buffered write this record to MongoDB 
// Await.result(record, 5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (in order 
for the consumer to considered alive) and one that actually calls poll(). In 
that case blocking too long inside foreach should not kick us out of the 
consumer group?

I will try to collect DEBUG level logs today.

 


was (Author: efeller):
Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 50,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
job implements the following logic:
{code:java}
stream().foreach(record => {
// Buffered write this record to MongoDB 
// Await.result(record, 5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMeta

[jira] [Comment Edited] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508661#comment-16508661
 ] 

Eugen Feller edited comment on KAFKA-6977 at 6/11/18 8:13 PM:
--

Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely 
consumed by multiple downstream consumers. However, only this job actually 
consumes it for the purposes of writing out to a database. Just tested 
consuming the topic with plain KafkaConsumer and it works. I also went over all 
the stack traces seen in that context and found the following:
{code:java}
KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 
is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 
3432237873)
1
at maybeEnsureValid 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002)
2
at nextFetchedRecord 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1059)
3
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1090)
4
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
5
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
6
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
7
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
8
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
9
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
10
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
11
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
12
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-17. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1110)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
In this particular instance I think the following stack trace was seen at the 
same time with error code 2:
{code:java}
CorruptRecordException: Record size is less than the minimum record overhead 
(14)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-16. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1076)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
I am wondering what could have lead to error code 2 and maybe the above 
ConcurrentRecordException and what would be the best way to mitigate them? Do 
the records get corrupt on the wire somehow?

 


was (Author: efeller):
Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely 
consumed by multiple downstream consumers. However, only this job actually 
consumes it for the purposes of writing out to a database. Just tested 
consuming the topic with plain KafkaConsumer and it works. I also went over all 
the stack traces seen in that context and found the following:
{code:java}
KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 
is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 
3432237873)
1
at maybeEnsureValid 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002)
2
at nextFetc

[jira] [Commented] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508661#comment-16508661
 ] 

Eugen Feller commented on KAFKA-6977:
-

Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely 
consumed by multiple downstream consumers. However, only this job actually 
consumes it for the purposes of writing out to a database. Just tested 
consuming the topic with plain KafkaConsumer and it works. I also went over all 
the stack traces seen in that context and found the following:
{code:java}
KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 
is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 
3432237873)
1
at maybeEnsureValid 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002)
2
at nextFetchedRecord 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1059)
3
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1090)
4
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
5
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
6
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
7
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
8
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
9
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
10
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
11
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
12
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-17. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1110)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
In this particular instance I think the following stack trace was seen at the 
same time with error code 2:

 
{code:java}
CorruptRecordException: Record size is less than the minimum record overhead 
(14)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-16. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1076)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
I am wondering what could have lead to error code 2 and maybe the above 
ConcurrentRecordException and what would be the best way to mitigate them? Do 
the records get corrupt on the wire somehow?

 

>  Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 
> while fetching data
> -
>
> Key: KAFKA-6977
> URL: https://issues.apache.org/jira/browse/KAFKA-6977
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>  Labels: streams
>
> We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and 
> constantly run into the following exception: 
> {code:java}
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
>

[jira] [Created] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-7040:
-

 Summary: The replica fetcher thread may truncate accepted messages 
during multiple fast leadership transitions
 Key: KAFKA-7040
 URL: https://issues.apache.org/jira/browse/KAFKA-7040
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang


Problem Statement:
Consider the scenario where there are two brokers, broker0, and broker1, and 
there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
the leader and broker0 as the follower. The following sequence of events 
happened on broker0

1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
broker1, and awaits to get the response
2. A LeaderAndISR request causes broker0 to become the leader for one partition 
t1p0, which in turn will remove the partition t1p0 from the replica fetcher 
thread
3. Broker0 accepts some messages from a producer
4. A 2nd LeaderAndISR request causes broker1 to become the leader, and broker0 
to become the follower for partition t1p0. This will cause the partition t1p0 
to be added back to the replica fetcher thread on broker0.
5. The replica fetcher thread on broker0 receives a response for the 
LeaderEpoch request issued in step 1, and truncates the accepted messages in 
step3.

The issue can be reproduced with the test from 
https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea

[1] Initially we set up broker0 to be the follower of two partitions instead of 
just one, to avoid the shutting down of the replica fetcher thread when it 
becomes idle.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2018-06-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508683#comment-16508683
 ] 

Matthias J. Sax commented on KAFKA-6977:


Hard to tell where the corruption occurs. I could be on the wire... Maybe 
[~hachikuji] can help out here. It does not seem to be a Kafka Streams issues. 
Did you use a 0.11.0 or 0.10.2 consumer? Did the consumer run in the same 
environment (ie, AWS ECS as mentioned in the description)? Can you consumer the 
data with Kafka Streams in a different environment?

>  Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 
> while fetching data
> -
>
> Key: KAFKA-6977
> URL: https://issues.apache.org/jira/browse/KAFKA-6977
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>  Labels: streams
>
> We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and 
> constantly run into the following exception: 
> {code:java}
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> partition assignment took 40 ms.
> current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 
> 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, 
> 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31]
> current standby tasks: []
> previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 
> 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28]
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> State transition from PARTITIONS_ASSIGNED to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State 
> transition from REBALANCING to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> ERROR org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> Encountered the following error during processing:
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Shutting down
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from RUNNING to PENDING_SHUTDOWN.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka 
> producer with timeoutMillis = 9223372036854775807 ms.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Stream thread shutdown complete
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from PENDING_SHUTDOWN to DEAD.
> [visitstats-mongowriter-3727734b-3c2f-4775

[jira] [Commented] (KAFKA-7005) Remove duplicate Java Resource class.

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508684#comment-16508684
 ] 

ASF GitHub Bot commented on KAFKA-7005:
---

junrao closed pull request #5184: KAFKA-7005: Remove duplicate resource class.
URL: https://github.com/apache/kafka/pull/5184
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 450de06fcd1..495095a9276 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -112,8 +112,6 @@
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
 import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
-import org.apache.kafka.common.requests.Resource;
-import org.apache.kafka.common.requests.ResourceType;
 import org.apache.kafka.common.security.token.delegation.DelegationToken;
 import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.AppInfoParser;
@@ -1683,19 +1681,19 @@ public DescribeConfigsResult 
describeConfigs(Collection configRe
 
 // The BROKER resources which we want to describe.  We must make a 
separate DescribeConfigs
 // request for every BROKER resource we want to describe.
-final Collection brokerResources = new ArrayList<>();
+final Collection brokerResources = new ArrayList<>();
 
 // The non-BROKER resources which we want to describe.  These 
resources can be described by a
 // single, unified DescribeConfigs request.
-final Collection unifiedRequestResources = new 
ArrayList<>(configResources.size());
+final Collection unifiedRequestResources = new 
ArrayList<>(configResources.size());
 
 for (ConfigResource resource : configResources) {
 if (resource.type() == ConfigResource.Type.BROKER && 
!resource.isDefault()) {
-brokerFutures.put(resource, new KafkaFutureImpl());
-brokerResources.add(configResourceToResource(resource));
+brokerFutures.put(resource, new KafkaFutureImpl<>());
+brokerResources.add(resource);
 } else {
-unifiedRequestFutures.put(resource, new 
KafkaFutureImpl());
-
unifiedRequestResources.add(configResourceToResource(resource));
+unifiedRequestFutures.put(resource, new KafkaFutureImpl<>());
+unifiedRequestResources.add(resource);
 }
 }
 
@@ -1716,7 +1714,7 @@ void handleResponse(AbstractResponse abstractResponse) {
 for (Map.Entry> 
entry : unifiedRequestFutures.entrySet()) {
 ConfigResource configResource = entry.getKey();
 KafkaFutureImpl future = entry.getValue();
-DescribeConfigsResponse.Config config = 
response.config(configResourceToResource(configResource));
+DescribeConfigsResponse.Config config = 
response.config(configResource);
 if (config == null) {
 future.completeExceptionally(new 
UnknownServerException(
 "Malformed broker response: missing config for 
" + configResource));
@@ -1746,7 +1744,7 @@ void handleFailure(Throwable throwable) {
 
 for (Map.Entry> entry : 
brokerFutures.entrySet()) {
 final KafkaFutureImpl brokerFuture = entry.getValue();
-final Resource resource = configResourceToResource(entry.getKey());
+final ConfigResource resource = entry.getKey();
 final int nodeId = Integer.parseInt(resource.name());
 runnable.call(new Call("describeBrokerConfigs", 
calcDeadlineMs(now, options.timeoutMs()),
 new ConstantNodeIdProvider(nodeId)) {
@@ -1792,21 +1790,6 @@ void handleFailure(Throwable throwable) {
 return new DescribeConfigsResult(allFutures);
 }
 
-private Resource configResourceToResource(ConfigResource configResource) {
-ResourceType resourceType;
-switch (configResource.type()) {
-case TOPIC:
-resourceType = ResourceType.TOPIC;
-break;
-case BROKER:
-resourceType = ResourceType.BROKER;
-break;
-default:
-throw new IllegalArgumentException("Unexpected resource type " 
+ configResource.ty

  1   2   >