[jira] [Commented] (KAFKA-5600) Group loading regression causing stale metadata/offsets cache

2018-09-17 Thread Girish Aher (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618569#comment-16618569
 ] 

Girish Aher commented on KAFKA-5600:


[~bjrke] I see that you included 0.10.2.1 as the affected version for this bug 
however the bug for which this is the fix (to reposition the closing brace of 
while loop) was only introduced in 0.11. So how could this bug be present in 
0.10.2.1?

I ask because I am facing this issue of resetting offsets with 0.10.1.1 and I 
stumbled upon this bug. I can't quite find out the root cause yet.

PS: I understand it has been about a year since you fixed this; but would be 
really helpful if I can know if this bug existed (in a different form perhaps) 
in 0.10.2.1 or may be 0.10.1.1 too. Appreciate your response here.

> Group loading regression causing stale metadata/offsets cache
> -
>
> Key: KAFKA-5600
> URL: https://issues.apache.org/jira/browse/KAFKA-5600
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1, 0.11.0.0
> Environment: any
>Reporter: Jan Burkhardt
>Assignee: Jan Burkhardt
>Priority: Critical
>  Labels: regression, reliability
> Fix For: 0.11.0.1, 1.0.0
>
> Attachments: KafkaErrorConsumer.java, KafkaErrorProducer.java
>
>
> After long investigation we found a Problem in Kafka.
> When a __consumer_offsets partition gets segmented and Kafka is restarted and 
> needs to reload offsets, consumers will start at a wrong position when 
> metadata and offset events are in both segments.
> Reproduction:
> 1.) Start zookeeper and kafka as is from the archive
> {code}
> KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/zookeeper-server-start.sh 
> config/zookeeper.properties
> KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/kafka-server-start.sh 
> config/server.properties
> {code}
> 2.) Start [^KafkaErrorProducer.java] which adds 1M log entries to the topic 
> test
> 3.) Start [^KafkaErrorConsumer.java] which starts a consumer, reads 100 
> entries one by one and then closes the consumer. This leads to a 2nd segment 
> in /tmp/kafka-logs/__consumer_offsets-27. This step takes some time (around 
> 5mins). The close of the consumer is needed to have metadata events in the 
> segments too.
> 4.) Stop and restart the Kafka broker
> 5.) Start any consumer on topic test and group testgroup
> {code}
> bin/kafka-console-consumer.sh --from-beginning --bootstrap-server 
> localhost:9092 --topic test --consumer-property group.id=testgroup
> {code}
> Is:
> the consumer starts at the segmentation boundary
> Expected:
> the consumer starts at the end
> The Reason for this behavior is the closing brace of the while loop in 
> GroupMetadataManager#loadGroupsAndOffsets at a wrong position introduced with 
> commit 
> https://github.com/apache/kafka/commit/5bd06f1d542e6b588a1d402d059bc24690017d32
> I will prepare a pull request.
> *Edit*: The issue can happen if there are multiple reads from the same 
> segment, see https://github.com/apache/kafka/pull/3538#discussion_r127759694



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7322) Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618552#comment-16618552
 ] 

ASF GitHub Bot commented on KAFKA-7322:
---

lindong28 closed pull request #5591: KAFKA-7322: Fix race condition between log 
cleaner thread and log retention thread when topic cleanup policy is updated
URL: https://github.com/apache/kafka/pull/5591
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 91ddbf09305..0b4abe80ef1 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
-import scala.collection.{Set, mutable}
+import scala.collection.{Iterable, Set, mutable}
 
 /**
  * The cleaner is responsible for removing obsolete records from logs which 
have the "compact" retention strategy.
@@ -219,10 +219,10 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   *  Resume the cleaning of a paused partition. This call blocks until the 
cleaning of a partition is resumed.
-   */
-  def resumeCleaning(topicPartition: TopicPartition) {
-cleanerManager.resumeCleaning(topicPartition)
+*  Resume the cleaning of paused partitions.
+*/
+  def resumeCleaning(topicPartitions: Iterable[TopicPartition]) {
+cleanerManager.resumeCleaning(topicPartitions)
   }
 
   /**
@@ -246,6 +246,15 @@ class LogCleaner(initialConfig: CleanerConfig,
 isCleaned
   }
 
+  /**
+* To prevent race between retention and compaction,
+* retention threads need to make this call to obtain:
+* @return A list of log partitions that retention threads can safely work 
on
+*/
+  def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, 
Log)] = {
+cleanerManager.pauseCleaningForNonCompactedPartitions()
+  }
+
   // Only for testing
   private[kafka] def currentConfig: CleanerConfig = config
 
@@ -315,14 +324,16 @@ class LogCleaner(initialConfig: CleanerConfig,
   true
   }
   val deletable: Iterable[(TopicPartition, Log)] = 
cleanerManager.deletableLogs()
-  deletable.foreach{
-case (topicPartition, log) =>
-  try {
+
+  try {
+deletable.foreach {
+  case (_, log) =>
 log.deleteOldSegments()
-  } finally {
-cleanerManager.doneDeleting(topicPartition)
-  }
+}
+  } finally {
+cleanerManager.doneDeleting(deletable.map(_._1))
   }
+
   if (!cleaned)
 pause(config.backOffMs, TimeUnit.MILLISECONDS)
 }
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index ba8d7c7e9c0..83d902f952a 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.errors.KafkaStorageException
 
-import scala.collection.{immutable, mutable}
+import scala.collection.{Iterable, immutable, mutable}
 
 private[log] sealed trait LogCleaningState
 private[log] case object LogCleaningInProgress extends LogCleaningState
@@ -148,6 +148,28 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
 }
   }
 
+  /**
+* Pause logs cleaning for logs that do not have compaction enabled
+* and do not have other deletion or compaction in progress.
+* This is to handle potential race between retention and cleaner threads 
when users
+* switch topic configuration between compacted and non-compacted topic.
+* @return retention logs that have log cleaning successfully paused
+*/
+  def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, 
Log)] = {
+inLock(lock) {
+  val deletableLogs = logs.filter {
+case (_, log) => !log.config.compact // pick non-compacted logs
+  }.filterNot {
+case (topicPartition, _) => inProgress.contains(topicPartition) // 
skip any logs already in-progress
+  }
+
+  deletableLogs.foreach {
+case (topicPartition, _) => inProgress.put(topicPartition, 
LogCleaningPaused)
+  }
+  deletableLogs
+}
+  }
+
   /**
 * Find any logs that have compact and delete enabled
 */
@@ -170,7 +192,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   def abortCleaning(topicPartition: TopicPartition) {
 inLock(lock) {
   abortAndPauseCleaning(topicPa

[jira] [Updated] (KAFKA-7322) Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated

2018-09-17 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7322:

Fix Version/s: 2.1.0

> Fix race condition between log cleaner thread and log retention thread when 
> topic cleanup policy is updated
> ---
>
> Key: KAFKA-7322
> URL: https://issues.apache.org/jira/browse/KAFKA-7322
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
> Fix For: 2.1.0
>
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7322) Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated

2018-09-17 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7322.
-
Resolution: Fixed

> Fix race condition between log cleaner thread and log retention thread when 
> topic cleanup policy is updated
> ---
>
> Key: KAFKA-7322
> URL: https://issues.apache.org/jira/browse/KAFKA-7322
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
> Fix For: 2.1.0
>
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-17 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618550#comment-16618550
 ] 

Jon Lee commented on KAFKA-7403:


[~vahid] Below code is from GroupMetadataManager.offsetCommitValue() and it 
looks to me that the if condition "apiVersion < KAFKA_2_1_IV0" is problematic. 
When inter.broker.protocol.version is set < 2.1, it will always pick V1 
regardless of the existence of expireTimestamp, which may cause the error 
mentioned above.

I think we should get rid of this condition and determine the version solely 
based on "offsetAndMetadata.expireTimestamp.nonEmpty". What do you think? 

 
{code:java}
val (version, value) = {
  if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty)
  // if an older version of the API is used, or if an explicit expiration is 
provided, use the older schema
(1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1))
  else
(2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2))
}
{code}
 

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[ka

[jira] [Updated] (KAFKA-7322) Fix race condition between compaction thread and retention thread when topic cleanup policy is updated

2018-09-17 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7322:

Summary: Fix race condition between compaction thread and retention thread 
when topic cleanup policy is updated  (was: race between compaction thread and 
retention thread when changing topic cleanup policy)

> Fix race condition between compaction thread and retention thread when topic 
> cleanup policy is updated
> --
>
> Key: KAFKA-7322
> URL: https://issues.apache.org/jira/browse/KAFKA-7322
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7322) Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated

2018-09-17 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7322:

Summary: Fix race condition between log cleaner thread and log retention 
thread when topic cleanup policy is updated  (was: Fix race condition between 
compaction thread and retention thread when topic cleanup policy is updated)

> Fix race condition between log cleaner thread and log retention thread when 
> topic cleanup policy is updated
> ---
>
> Key: KAFKA-7322
> URL: https://issues.apache.org/jira/browse/KAFKA-7322
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup

2018-09-17 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-3359:
---
Description: 
On startup, currently the log segments within a logDir are loaded sequentially 
when there is a un-clean shutdown. This will take a lot of time for the 
segments to be loaded as the logSegment.recover(..) is called for every segment 
and for brokers which have many partitions, the time taken will be very high 
(we have noticed ~40mins for 2k partitions).

[https://github.com/apache/kafka/pull/1035]

This pull request will make the log-segment load parallel with two configurable 
properties "log.recovery.threads" and "log.recovery.max.interval.ms".

Logic:
1. Have a threadpool defined of fixed length (log.recovery.threads)
2. Submit the logSegment recovery as a job to the threadpool and add the future 
returned to a job list
3. Wait till all the jobs are done within req. time 
(log.recovery.max.interval.ms - default set to Long.Max).
4. If they are done and the futures are all null (meaning that the jobs are 
successfully completed), it is considered done.
5. If any of the recovery jobs failed, then it is logged and 
LogRecoveryFailedException is thrown
6. If the timeout is reached, LogRecoveryFailedException is thrown.

The logic is backward compatible with the current sequential implementation as 
the default thread count is set to 1.

PS: I am new to Scala and the code might look Java-ish but I will be happy to 
modify the code review changes.

  was:
On startup, currently the log segments within a logDir are loaded sequentially 
when there is a un-clean shutdown. This will take a lot of time for the 
segments to be loaded as the logSegment.recover(..) is called for every segment 
and for brokers which have many partitions, the time taken will be very high 
(we have noticed ~40mins for 2k partitions).

https://github.com/apache/kafka/pull/1035

This pull request will make the log-segment load parallel with two configurable 
properties "log.recovery.threads" and "log.recovery.max.interval.ms".

Logic:
1. Have a threadpool defined of fixed length (log.recovery.threads)
2. Submit the logSegment recovery as a job to the threadpool and add the future 
returned to a job list
3. Wait till all the jobs are done within req. time 
(log.recovery.max.interval.ms - default set to Long.Max).
4. If they are done and the futures are all null (meaning that the jobs are 
successfully completed), it is considered done.
5. If any of the recovery jobs failed, then it is logged and 
LogRecoveryFailedException is thrown
6. If the timeout is reached, LogRecoveryFailedException is thrown.

The logic is backward compatible with the current sequential implementation as 
the default thread count is set to 1.

PS: I am new to Scala and the code might look Java-ish but I will be happy to 
modify the code review changes.


> Parallel log-recovery of un-flushed segments on startup
> ---
>
> Key: KAFKA-3359
> URL: https://issues.apache.org/jira/browse/KAFKA-3359
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Vamsi Subhash Achanta
>Assignee: Jay Kreps
>Priority: Major
>
> On startup, currently the log segments within a logDir are loaded 
> sequentially when there is a un-clean shutdown. This will take a lot of time 
> for the segments to be loaded as the logSegment.recover(..) is called for 
> every segment and for brokers which have many partitions, the time taken will 
> be very high (we have noticed ~40mins for 2k partitions).
> [https://github.com/apache/kafka/pull/1035]
> This pull request will make the log-segment load parallel with two 
> configurable properties "log.recovery.threads" and 
> "log.recovery.max.interval.ms".
> Logic:
> 1. Have a threadpool defined of fixed length (log.recovery.threads)
> 2. Submit the logSegment recovery as a job to the threadpool and add the 
> future returned to a job list
> 3. Wait till all the jobs are done within req. time 
> (log.recovery.max.interval.ms - default set to Long.Max).
> 4. If they are done and the futures are all null (meaning that the jobs are 
> successfully completed), it is considered done.
> 5. If any of the recovery jobs failed, then it is logged and 
> LogRecoveryFailedException is thrown
> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
> The logic is backward compatible with the current sequential implementation 
> as the default thread count is set to 1.
> PS: I am new to Scala and the code might look Java-ish but I will be happy to 
> modify the code review changes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-17 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618458#comment-16618458
 ] 

Jon Lee edited comment on KAFKA-7403 at 9/18/18 4:41 AM:
-

[~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 
for your brokers and commit an offset from a consumer? 

BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any 
consumer who would send out an OffsetCommitRequest with the retentionTime field 
set to DEFAULT_RETENTION_TIME will experience this error.

I was able to reproduce the issue with the following setup:
 * 2.0 broker with KAFKA-4682 patch
 ** inter.broker.protocol.version = 1.0
 * 2.0 client
 ** enable.auto.commit = true
 ** auto.commit.interval.ms = 1000

Then the client gets an error when it commits an offset:
{code:java}
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 
--consumer.config config/consumer.properties --topic t --from-beginning
:

:
[2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: 
The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Asynchronous auto-commit of offsets 
{t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in 
commit: The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code}
 

As mentioned above, to reproduce this in unit tests, pass KAFKA_0_11_0_IV2 
instead of ApiVersion.latestVersion as the second parameter of the 
GroupMetadataManager constructor in GroupMetadataManagerTest.scala. Then, 
multiple tests including testCommitOffset fail.


was (Author: jonlee2):
[~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 
for your brokers and commit an offset from a consumer? 

BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any 
consumer who would send out an OffsetCommitRequest with the retentionTime field 
set to DEFAULT_RETENTION_TIME will experience this error.

I was able to reproduce the issue with the following setup:
 * 2.0 broker with KAFKA-4682 patch
 ** inter.broker.protocol.version = 1.0
 * 2.0 client
 ** enable.auto.commit = true
 ** auto.commit.interval.ms = 1000

Then the client gets an error when it commits an offset:
{code:java}
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 
--consumer.config config/consumer.properties --topic t --from-beginning
:

:
[2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: 
The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Asynchronous auto-commit of offsets 
{t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in 
commit: The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code}
 

As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as 
the ApiVersion (the second parameter) to the constructor of 
GroupMetadataManager in GroupMetadataManagerTest.scala.

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.8

[jira] [Comment Edited] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-17 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618458#comment-16618458
 ] 

Jon Lee edited comment on KAFKA-7403 at 9/18/18 4:32 AM:
-

[~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 
for your brokers and commit an offset from a consumer? 

BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any 
consumer who would send out an OffsetCommitRequest with the retentionTime field 
set to DEFAULT_RETENTION_TIME will experience this error.

I was able to reproduce the issue with the following setup:
 * 2.0 broker with KAFKA-4682 patch
 ** inter.broker.protocol.version = 1.0
 * 2.0 client
 ** enable.auto.commit = true
 ** auto.commit.interval.ms = 1000

Then the client gets an error when it commits an offset:
{code:java}
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 
--consumer.config config/consumer.properties --topic t --from-beginning
:

:
[2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: 
The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Asynchronous auto-commit of offsets 
{t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in 
commit: The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code}
 

As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as 
the ApiVersion (the second parameter) to the constructor of 
GroupMetadataManager in GroupMetadataManagerTest.scala.


was (Author: jonlee2):
[~vahid] It looks to me that the problem is not 0.10.2 consumer specific. Any 
consumer who would send out an OffsetCommitRequest with the retentionTime field 
set to DEFAULT_RETENTION_TIME will experience this error.

I was able to reproduce the issue with the following setup:
 * 2.0 broker with KAFKA-4682 patch
 ** inter.broker.protocol.version = 1.0
 * 2.0 client
 ** enable.auto.commit = true
 ** auto.commit.interval.ms = 1000

Then the client gets an error when it commits an offset:
{code:java}
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 
--consumer.config config/consumer.properties --topic t --from-beginning
:

:
[2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: 
The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Asynchronous auto-commit of offsets 
{t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in 
commit: The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code}
 

As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as 
the ApiVersion (the second parameter) to the constructor of 
GroupMetadataManager in GroupMetadataManagerTest.scala.

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafk

[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-17 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618458#comment-16618458
 ] 

Jon Lee commented on KAFKA-7403:


[~vahid] It looks to me that the problem is not 0.10.2 consumer specific. Any 
consumer who would send out an OffsetCommitRequest with the retentionTime field 
set to DEFAULT_RETENTION_TIME will experience this error.

I was able to reproduce the issue with the following setup:
 * 2.0 broker with KAFKA-4682 patch
 ** inter.broker.protocol.version = 1.0
 * 2.0 client
 ** enable.auto.commit = true
 ** auto.commit.interval.ms = 1000

Then the client gets an error when it commits an offset:
{code:java}
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 
--consumer.config config/consumer.properties --topic t --from-beginning
:

:
[2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: 
The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, 
groupId=test-consumer-group] Asynchronous auto-commit of offsets 
{t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in 
commit: The server experienced an unexpected error when processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code}
 

As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as 
the ApiVersion (the second parameter) to the constructor of 
GroupMetadataManager in GroupMetadataManagerTest.scala.

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error b

[jira] [Commented] (KAFKA-7235) Use brokerZkNodeVersion to prevent broker from processing outdated controller request

2018-09-17 Thread Zhanxiang (Patrick) Huang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618368#comment-16618368
 ] 

Zhanxiang (Patrick) Huang commented on KAFKA-7235:
--

We cannot simply use broker znode zkVersion because btoker znode is an 
ephemeral znode and its zkVersion is always 0. We can use czxid which is the 
monotonically increasing zookeeper transaction id for create operation. I will 
open a KIP for this.

> Use brokerZkNodeVersion to prevent broker from processing outdated controller 
> request
> -
>
> Key: KAFKA-7235
> URL: https://issues.apache.org/jira/browse/KAFKA-7235
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>
> Currently a broker can process controller requests that are sent before the 
> broker is restarted. This could cause a few problems. Here is one example:
> Let's assume partitions p1 and p2 exists on broker1.
> 1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.
> 2) Before controller sends the request, broker1 is quickly restarted.
> 3) The LeaderAndIsrRequest with p1 is delivered to broker1.
> 4) After processing the first LeaderAndIsrRequest, broker1 starts to 
> checkpoint high watermark for all partitions that it owns. Thus it may 
> overwrite high watermark checkpoint file with only the hw for partition p1. 
> The hw for partition p2 is now lost, which could be a problem.
> In general, the correctness of broker logic currently relies on a few 
> assumption, e.g. the first LeaderAndIsrRequest received by broker should 
> contain all partitions hosted by the broker, which could break if broker can 
> receive controller requests that were generated before it restarts. 
> One reasonable solution to the problem is to include the 
> expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
> the broker znode zkVersion after it registers itself in the zookeeper. Then 
> broker can reject those controller requests whose expectedBrokeNodeZkVersion 
> is different from its broker znode zkVersion.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7370) Enhance FileConfigProvider to read a directory

2018-09-17 Thread Robert Yokota (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Yokota resolved KAFKA-7370.
--
Resolution: Won't Do

> Enhance FileConfigProvider to read a directory
> --
>
> Key: KAFKA-7370
> URL: https://issues.apache.org/jira/browse/KAFKA-7370
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 2.0.0
>Reporter: Robert Yokota
>Assignee: Robert Yokota
>Priority: Minor
>
> Currently FileConfigProvider can read a Properties file as a set of key-value 
> pairs.  This enhancement is to augment FileConfigProvider so that it can also 
> read a directory, where the file names are the keys and the corresponding 
> file contents are the values.
> This will allow for easier integration with secret management systems where 
> each secret is often an individual file, such as in Docker and Kubernetes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7370) Enhance FileConfigProvider to read a directory

2018-09-17 Thread Robert Yokota (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618240#comment-16618240
 ] 

Robert Yokota commented on KAFKA-7370:
--

Recommendation was to add a DirectoryConfigProvider instead.  That would likely 
require a KIP if we decide to add it.

> Enhance FileConfigProvider to read a directory
> --
>
> Key: KAFKA-7370
> URL: https://issues.apache.org/jira/browse/KAFKA-7370
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 2.0.0
>Reporter: Robert Yokota
>Assignee: Robert Yokota
>Priority: Minor
>
> Currently FileConfigProvider can read a Properties file as a set of key-value 
> pairs.  This enhancement is to augment FileConfigProvider so that it can also 
> read a directory, where the file names are the keys and the corresponding 
> file contents are the values.
> This will allow for easier integration with secret management systems where 
> each secret is often an individual file, such as in Docker and Kubernetes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7370) Enhance FileConfigProvider to read a directory

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618239#comment-16618239
 ] 

ASF GitHub Bot commented on KAFKA-7370:
---

rayokota closed pull request #5596: KAFKA-7370: Enhance FileConfigProvider to 
read a dir
URL: https://github.com/apache/kafka/pull/5596
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java
index 4e376ecdeed..7b059af9faf 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java
@@ -22,15 +22,22 @@
 import java.io.IOException;
 import java.io.Reader;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 /**
- * An implementation of {@link ConfigProvider} that represents a Properties 
file.
+ * An implementation of {@link ConfigProvider} that can read from either a 
file or a directory.
+ * If the given path is a file, it is interpreted as a Properties file 
containing key-value pairs.
+ * If the given path is a directory, the keys are the file names contained in 
the directory and the values are
+ * the corresponding contents of the files.
  * All property keys and values are stored as cleartext.
  */
 public class FileConfigProvider implements ConfigProvider {
@@ -39,16 +46,55 @@ public void configure(Map configs) {
 }
 
 /**
- * Retrieves the data at the given Properties file.
+ * Retrieves the data at the given path.
  *
- * @param path the file where the data resides
+ * @param path the path corresponding to either a directory or a 
Properties file
  * @return the configuration data
  */
 public ConfigData get(String path) {
-Map data = new HashMap<>();
 if (path == null || path.isEmpty()) {
+return new ConfigData(new HashMap<>());
+}
+Path p = Paths.get(path);
+return Files.isDirectory(p) ? getFromDirectory(p) : 
getFromPropertiesFile(p);
+}
+
+/**
+ * Retrieves the data with the given keys at the given path.
+ *
+ * @param path the path corresponding to either a directory or a 
Properties file
+ * @param keys the keys whose values will be retrieved.  In the case of a 
directory, these are the file names.
+ * @return the configuration data
+ */
+public ConfigData get(String path, Set keys) {
+if (path == null || path.isEmpty()) {
+return new ConfigData(new HashMap<>());
+}
+Path p = Paths.get(path);
+return Files.isDirectory(p) ? getFromDirectory(p, keys) : 
getFromPropertiesFile(p, keys);
+}
+
+private ConfigData getFromDirectory(Path path) {
+try {
+Map data = Files.list(path)
+.filter(Files::isRegularFile)
+.collect(Collectors.toMap(file -> 
file.getFileName().toString(), this::readAll));
 return new ConfigData(data);
+} catch (IOException e) {
+throw new ConfigException("Could not read from directory " + path);
 }
+}
+
+private ConfigData getFromDirectory(Path path, Set keys) {
+Map data = keys.stream()
+.map(path::resolve)
+.filter(Files::isRegularFile)
+.collect(Collectors.toMap(file -> 
file.getFileName().toString(), this::readAll));
+return new ConfigData(data);
+}
+
+private ConfigData getFromPropertiesFile(Path path) {
+Map data = new HashMap<>();
 try (Reader reader = reader(path)) {
 Properties properties = new Properties();
 properties.load(reader);
@@ -66,18 +112,8 @@ public ConfigData get(String path) {
 }
 }
 
-/**
- * Retrieves the data with the given keys at the given Properties file.
- *
- * @param path the file where the data resides
- * @param keys the keys whose values will be retrieved
- * @return the configuration data
- */
-public ConfigData get(String path, Set keys) {
+private ConfigData getFromPropertiesFile(Path path, Set keys) {
 Map data = new HashMap<>();
-if (path == null || path.isEmpty()) {
-return new ConfigData(data);
-}
 try (Reader 

[jira] [Resolved] (KAFKA-7150) Error in processing fetched data for one partition may stop follower fetching other partitions

2018-09-17 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7150.

Resolution: Not A Problem

This is no longer a problem since we have removed the logic to raise a fatal 
error from the out of range error handling.

> Error in processing fetched data for one partition may stop follower fetching 
> other partitions
> --
>
> Key: KAFKA-7150
> URL: https://issues.apache.org/jira/browse/KAFKA-7150
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.0
>Reporter: Anna Povzner
>Priority: Major
>
> If the followers fails to process data for one topic partitions, like out of 
> order offsets error, the whole ReplicaFetcherThread is killed, which also 
> stops fetching for other topic partitions serviced by this fetcher thread. 
> This may result in un-necessary under-replicated partitions. I think it would 
> be better to continue fetching for other topic partitions, and just remove 
> the partition with an error from the responsibility of the fetcher thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7414) Do not fail broker on out of range offsets in replica fetcher

2018-09-17 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7414.

   Resolution: Fixed
Fix Version/s: 2.1.0

> Do not fail broker on out of range offsets in replica fetcher
> -
>
> Key: KAFKA-7414
> URL: https://issues.apache.org/jira/browse/KAFKA-7414
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.1.0
>
>
> In the replica fetcher, we have logic to detect the case when the follower's 
> offset is ahead of the leader's. If unclean leader election is not enabled, 
> we raise a fatal error and kill the broker. 
> This behavior is inconsistent depending on the message format. With 
> KIP-101/KIP-279, upon becoming a follower, the replica would use leader epoch 
> information to reconcile the end of the log with the leader and simply 
> truncate. Additionally, with the old format, the check is not really 
> bulletproof for detecting data loss since the unclean leader's end offset 
> might have already caught up to the follower's offset at the time of its 
> initial fetch or when it queries for the current log end offset.
> To make the logic consistent, we could raise a fatal error whenever the 
> follower has to truncate below the high watermark. However, the fatal error 
> is probably overkill and it would be better to log a warning since most of 
> the damage is already done if the leader has already been elected and this 
> causes a huge blast radius.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7414) Do not fail broker on out of range offsets in replica fetcher

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618057#comment-16618057
 ] 

ASF GitHub Bot commented on KAFKA-7414:
---

hachikuji closed pull request #5654: KAFKA-7414; Out of range errors should 
never be fatal for follower
URL: https://github.com/apache/kafka/pull/5654
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 44137cf35c3..4a2719e36c7 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -38,9 +38,9 @@ import java.util.concurrent.atomic.AtomicLong
 import com.yammer.metrics.core.Gauge
 import kafka.log.LogAppendInfo
 import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
+import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, 
FetchResponse, ListOffsetRequest}
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, 
FetchResponse}
 
 import scala.math._
 
@@ -77,8 +77,6 @@ abstract class AbstractFetcherThread(name: String,
 
   protected def buildFetch(partitionMap: Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
 
-  protected def isUncleanLeaderElectionAllowed(topicPartition: 
TopicPartition): Boolean
-
   protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
 
   protected def logEndOffset(topicPartition: TopicPartition): Long
@@ -289,7 +287,6 @@ abstract class AbstractFetcherThread(name: String,
 info(s"Current offset 
${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " +
   s"out of range, which typically implies a leader change. 
Reset fetch offset to $newOffset")
   } catch {
-case e: FatalExitError => throw e
 case e: Throwable =>
   error(s"Error getting offset for partition 
$topicPartition", e)
   partitionsWithError += topicPartition
@@ -458,16 +455,6 @@ abstract class AbstractFetcherThread(name: String,
  */
 val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition)
 if (leaderEndOffset < replicaEndOffset) {
-  // Prior to truncating the follower's log, ensure that doing so is not 
disallowed by the configuration for unclean leader election.
-  // This situation could only happen if the unclean election 
configuration for a topic changes while a replica is down. Otherwise,
-  // we should never encounter this situation since a non-ISR leader 
cannot be elected if disallowed by the broker configuration.
-  if (!isUncleanLeaderElectionAllowed(topicPartition)) {
-// Log a fatal error and shutdown the broker to ensure that data loss 
does not occur unexpectedly.
-fatal(s"Exiting because log truncation is not allowed for partition 
$topicPartition, current leader's " +
-  s"latest offset $leaderEndOffset is less than replica's latest 
offset $replicaEndOffset}")
-throw new FatalExitError
-  }
-
   warn(s"Reset fetch offset for partition $topicPartition from 
$replicaEndOffset to current " +
 s"leader's latest offset $leaderEndOffset")
   truncate(topicPartition, new EpochEndOffset(Errors.NONE, 
UNDEFINED_EPOCH, leaderEndOffset))
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index dc585ebd926..2244771d14c 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -125,8 +125,6 @@ class ReplicaAlterLogDirsThread(name: String,
 logAppendInfo
   }
 
-  override protected def isUncleanLeaderElectionAllowed(topicPartition: 
TopicPartition): Boolean = true
-
   override protected def fetchEarliestOffsetFromLeader(topicPartition: 
TopicPartition): Long = {
 replicaMgr.getReplicaOrException(topicPartition).logStartOffset
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5dcd29b473d..bdbadd9b731 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -21,9 +21,8 @@ import java.util

[jira] [Created] (KAFKA-7417) Some replicas cannot become in-sync

2018-09-17 Thread Mikhail Khomenko (JIRA)
Mikhail Khomenko created KAFKA-7417:
---

 Summary: Some replicas cannot become in-sync
 Key: KAFKA-7417
 URL: https://issues.apache.org/jira/browse/KAFKA-7417
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 2.0.0
Reporter: Mikhail Khomenko


Hi,
we have faced with the next issue - some replicas cannot become in-sync. 
Distribution of in-sync replicas amongst topics is random. For instance:
{code:java}
$ kafka-topics --zookeeper 1.2.3.4:8181 --describe --topic TEST
Topic:TEST PartitionCount:8 ReplicationFactor:3 Configs:
Topic: TEST Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 0,1,2
Topic: TEST Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 0,1,2
Topic: TEST Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 0,1,2
Topic: TEST Partition: 3 Leader: 2 Replicas: 0,1,2 Isr: 0,1,2
Topic: TEST Partition: 4 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2
Topic: TEST Partition: 5 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2
Topic: TEST Partition: 6 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
Topic: TEST Partition: 7 Leader: 0 Replicas: 1,0,2 Isr: 0,2{code}
Files in segment TEST-7 are equal (the same md5sum) on all 3 brokers. Also were 
checked by kafka.tools.DumpLogSegments - messages are the same.


We have 3-broker cluster configuration with Confluent Kafka 5.0.0 (it's Apache 
Kafka 2.0.0).
Each broker has the next configuration:
{code:java}
advertised.host.name = null
advertised.listeners = PLAINTEXT://1.2.3.4:9200
advertised.port = null
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name = 
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 1
broker.id.generation.enable = true
broker.interceptor.class = class 
org.apache.kafka.server.interceptor.DefaultBrokerInterceptor
broker.rack = null
client.quota.callback.class = null
compression.type = producer
connections.max.idle.ms = 60
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 3
create.topic.policy.class.name = null
default.replication.factor = 3
delegation.token.expiry.check.interval.ms = 360
delegation.token.expiry.time.ms = 8640
delegation.token.master.key = null
delegation.token.max.lifetime.ms = 60480
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 3000
group.max.session.timeout.ms = 30
group.min.session.timeout.ms = 6000
host.name = 
inter.broker.listener.name = null
inter.broker.protocol.version = 2.0
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = 
PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = PLAINTEXT://0.0.0.0:9200
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 8640
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /var/lib/kafka/data
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 6
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 6
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.downconversion.enable = true
log.message.format.version = 2.0
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 30
log.retention.hours = 8760
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 6
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides = 
max.incremental.fetch.session.cache.slots = 1000
message.max.bytes = 112
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
min.insync.replicas = 2
num.io.threads = 8
num.network.threads = 8
num.partitions = 8
num.recovery.threads.per.data.dir = 1
num.replica.alter.log.dirs.threads = null
num.replica.fetchers = 4
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 60
offsets.retention.minutes = 525600
offsets.t

[jira] [Resolved] (KAFKA-5690) kafka-acls command should be able to list per principal

2018-09-17 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-5690.
-
Resolution: Fixed

> kafka-acls command should be able to list per principal
> ---
>
> Key: KAFKA-5690
> URL: https://issues.apache.org/jira/browse/KAFKA-5690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Koelli Mungee
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently the `kafka-acls` command has a `--list` option that can list per 
> resource which is --topic  or --group  or --cluster. In order 
> to look at the ACLs for a particular principal the user needs to iterate 
> through the entire list to figure out what privileges a particular principal 
> has been granted. An option to list the ACL per principal would simplify this 
> process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5690) kafka-acls command should be able to list per principal

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617784#comment-16617784
 ] 

ASF GitHub Bot commented on KAFKA-5690:
---

lindong28 closed pull request #5633: KAFKA-5690: Add support to list ACLs for a 
given principal (KIP-357)
URL: https://github.com/apache/kafka/pull/5633
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala 
b/core/src/main/scala/kafka/admin/AclCommand.scala
index c2dda33d5ab..ad375d20572 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -138,10 +138,22 @@ object AclCommand extends Logging {
 def listAcls(): Unit = {
   withAdminClient(opts) { adminClient =>
 val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
 val resourceToAcls = getAcls(adminClient, filters)
 
-for ((resource, acls) <- resourceToAcls)
-  println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+if (listPrincipals.isEmpty) {
+  for ((resource, acls) <- resourceToAcls)
+println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+} else {
+  listPrincipals.foreach(principal => {
+println(s"ACLs for principal `$principal`")
+val filteredResourceToAcls =  resourceToAcls.mapValues(acls =>
+  acls.filter(acl => 
principal.toString.equals(acl.principal))).filter(entry => entry._2.nonEmpty)
+
+for ((resource, acls) <- filteredResourceToAcls)
+  println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+  })
+}
   }
 }
 
@@ -237,13 +249,20 @@ object AclCommand extends Logging {
 def listAcls(): Unit = {
   withAuthorizer() { authorizer =>
 val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
+val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
 
-val resourceToAcls: Iterable[(Resource, Set[Acl])] =
-  if (filters.isEmpty) authorizer.getAcls()
-  else filters.flatMap(filter => getAcls(authorizer, filter))
-
-for ((resource, acls) <- resourceToAcls)
-  println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+if (listPrincipals.isEmpty) {
+  val resourceToAcls =  getFilteredResourceToAcls(authorizer, filters)
+  for ((resource, acls) <- resourceToAcls)
+println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+} else {
+  listPrincipals.foreach(principal => {
+println(s"ACLs for principal `$principal`")
+val resourceToAcls =  getFilteredResourceToAcls(authorizer, 
filters, Some(principal))
+for ((resource, acls) <- resourceToAcls)
+  println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
+  })
+}
   }
 }
 
@@ -256,9 +275,23 @@ object AclCommand extends Logging {
 )
 }
 
-private def getAcls(authorizer: Authorizer, filter: 
ResourcePatternFilter): Map[Resource, Set[Acl]] =
-  authorizer.getAcls()
-.filter { case (resource, acl) => filter.matches(resource.toPattern) }
+private def getFilteredResourceToAcls(authorizer: Authorizer, filters: 
Set[ResourcePatternFilter],
+  listPrincipal: 
Option[KafkaPrincipal] = None): Iterable[(Resource, Set[Acl])] = {
+  if (filters.isEmpty)
+if (listPrincipal.isEmpty)
+  authorizer.getAcls()
+else
+  authorizer.getAcls(listPrincipal.get)
+  else filters.flatMap(filter => getAcls(authorizer, filter, 
listPrincipal))
+}
+
+private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter,
+listPrincipal: Option[KafkaPrincipal] = None): 
Map[Resource, Set[Acl]] =
+  if (listPrincipal.isEmpty)
+authorizer.getAcls().filter { case (resource, acl) => 
filter.matches(resource.toPattern) }
+  else
+authorizer.getAcls(listPrincipal.get).filter { case (resource, acl) => 
filter.matches(resource.toPattern) }
+
   }
 
   private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, 
Set[Acl]] = {
@@ -521,6 +554,12 @@ object AclCommand extends Logging {
   .describedAs

[jira] [Resolved] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-09-17 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7316.
--
Resolution: Fixed

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Fix For: 2.0.1, 2.1.0
>
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7276) Consider using re2j to speed up regex operations

2018-09-17 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617636#comment-16617636
 ] 

John Roesler commented on KAFKA-7276:
-

Hi [~chenyuyun-emc] and [~yuzhih...@gmail.com],

The license on the project you linked seems not to be a standard one: 
[https://github.com/google/re2j/blob/master/LICENSE]

Before doing any software work, you would have to verify that its licence is 
compatible with ours.

Also, it's not clear whether you're talking about using this one the broker or 
client side.

On the broker side, we can be more flexible, but on the client side, we need to 
be extremely skeptical of new dependencies. Since our client code is a library 
that people pull in, we transitively expose them to all of our dependencies, 
setting them up for the Java equivalent of "DLL hell" if they happen to 
(transitively) depend on the same library at a different version.

As much as I like algorithmic efficiency, I would hesitate to bring in a change 
that introduces a new dependency unless there was a benchmark that shows a 
compelling performance improvement in production code.

Would you all consider pursuing these tasks in the following order:
 # verify that we are legally allowed to use this code, with respect to our 
mutual licenses
 # put together some experiments to determine what, if any, real performance 
improvement will result from this change

Thanks,

-John

> Consider using re2j to speed up regex operations
> 
>
> Key: KAFKA-7276
> URL: https://issues.apache.org/jira/browse/KAFKA-7276
> Project: Kafka
>  Issue Type: Task
>  Components: packaging
>Reporter: Ted Yu
>Assignee: kevin.chen
>Priority: Major
>
> https://github.com/google/re2j
> re2j claims to do linear time regular expression matching in Java.
> Its benefit is most obvious for deeply nested regex (such as a | b | c | d).
> We should consider using re2j to speed up regex operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread

2018-09-17 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7280:
--
Affects Version/s: (was: 1.1.2)
Fix Version/s: 1.1.2

> ConcurrentModificationException in FetchSessionHandler in heartbeat thread
> --
>
> Key: KAFKA-7280
> URL: https://issues.apache.org/jira/browse/KAFKA-7280
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Request/response handling in FetchSessionHandler is not thread-safe. But we 
> are using it in Kafka consumer without any synchronization even though poll() 
> from heartbeat thread can process responses. Heartbeat thread holds the 
> coordinator lock while processing its poll and responses, making other 
> operations involving the group coordinator safe. We also need to lock 
> FetchSessionHandler for the operations that update or read 
> FetchSessionHandler#sessionPartitions.
> This exception is from a system test run on trunk of 
> TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
> {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
> groupId=group] Heartbeat thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  java.util.ConcurrentModificationException
>  at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
>  at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
> {quote}
>  
> The logs just prior to the exception show that a partition was removed from 
> the session:
> {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Skipping fetch for partition test_topic-1 because there is an 
> in-flight request to worker4:9095 (id: 3 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Completed receive from node 2 for FETCH with correlation id 
> 417, received 
> {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
> Unknown macro: 
> \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
> ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 
> bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Added READ_UNCOMMITTED fetch request for partition 
> test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for 
> node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) 
> (org.apache.kafka.clients.FetchSessionHandler)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), 
> toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: 
> 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Sending FETCH 
> {r

[jira] [Commented] (KAFKA-7384) Compatibility issues between Kafka Brokers 1.1.0 and older kafka clients

2018-09-17 Thread hackerwin7 (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617228#comment-16617228
 ] 

hackerwin7 commented on KAFKA-7384:
---

My upgrading brokers from 0.10.2.1 to 1.1.0 seems there is no this error above 
with producer (1.1.0) and consumer (0.10.2.1)

> Compatibility issues between Kafka Brokers 1.1.0 and older kafka clients
> 
>
> Key: KAFKA-7384
> URL: https://issues.apache.org/jira/browse/KAFKA-7384
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.0
>Reporter: Vasilis Tsamis
>Priority: Blocker
> Attachments: logs2.txt
>
>
> Hello
> After upgrading the Kafka Brokers from 0.10.2.1 to 1.1.0, I am getting the 
> following error logs thrown by the kafka clients 0.10.2.1 & 0.10.0.1. This 
> seems to be some kind of incompatibility issue for the older clients although 
> this shouldn't be true according to the following [doc 
> 1|https://docs.confluent.io/current/installation/upgrade.html#preparation], 
> [doc2|https://cwiki-test.apache.org/confluence/display/KAFKA/KIP-35+-+Retrieving+protocol+version]
>  and 
> [thread|https://lists.apache.org/thread.html/9bc87a2c683d13fda27f01a635dba822520113cfd8fb50f3a3e82fcf@%3Cusers.kafka.apache.org%3E].
> Can someone please help on this issue? Does this mean that I have to upgrade 
> all kafka-clients to 1.1.0?
>  
> (Please also check the attached log, some extra compression type ids are also 
> occurring)
>  
> {noformat}
> java.lang.IllegalArgumentException: Unknown compression type id: 4
> at 
> org.apache.kafka.common.record.CompressionType.forId(CompressionType.java:46)
> at 
> org.apache.kafka.common.record.Record.compressionType(Record.java:260)
> at 
> org.apache.kafka.common.record.LogEntry.isCompressed(LogEntry.java:89)
> at 
> org.apache.kafka.common.record.RecordsIterator.makeNext(RecordsIterator.java:70)
> at 
> org.apache.kafka.common.record.RecordsIterator.makeNext(RecordsIterator.java:34)
> at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:785)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:480)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at 
> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:130)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> --- Another kind of exception due to same reason
> java.lang.IndexOutOfBoundsException: null
> at java.nio.Buffer.checkIndex(Buffer.java:546)
> at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:365) 
> at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:784)
> at org.apache.kafka.common.record.Record.value(Record.java:268)
> at 
> org.apache.kafka.common.record.RecordsIterator$DeepRecordsIterator.(RecordsIterator.java:149)
> at 
> org.apache.kafka.common.record.RecordsIterator.makeNext(RecordsIterator.java:79)
> at 
> org.apache.kafka.common.record.RecordsIterator.makeNext(RecordsIterator.java:34)
> at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:785)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:480)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at 
> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:130)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j

[jira] [Updated] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread

2018-09-17 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7280:
--
Affects Version/s: 1.1.2

> ConcurrentModificationException in FetchSessionHandler in heartbeat thread
> --
>
> Key: KAFKA-7280
> URL: https://issues.apache.org/jira/browse/KAFKA-7280
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.1, 1.1.2, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.0.1, 2.1.0
>
>
> Request/response handling in FetchSessionHandler is not thread-safe. But we 
> are using it in Kafka consumer without any synchronization even though poll() 
> from heartbeat thread can process responses. Heartbeat thread holds the 
> coordinator lock while processing its poll and responses, making other 
> operations involving the group coordinator safe. We also need to lock 
> FetchSessionHandler for the operations that update or read 
> FetchSessionHandler#sessionPartitions.
> This exception is from a system test run on trunk of 
> TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
> {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
> groupId=group] Heartbeat thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  java.util.ConcurrentModificationException
>  at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
>  at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
> {quote}
>  
> The logs just prior to the exception show that a partition was removed from 
> the session:
> {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Skipping fetch for partition test_topic-1 because there is an 
> in-flight request to worker4:9095 (id: 3 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Completed receive from node 2 for FETCH with correlation id 
> 417, received 
> {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
> Unknown macro: 
> \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
> ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 
> bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Added READ_UNCOMMITTED fetch request for partition 
> test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for 
> node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) 
> (org.apache.kafka.clients.FetchSessionHandler)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), 
> toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: 
> 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Sending FETCH 
> {replica_id=-1,max_wait_time=500,min_bytes

[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-09-17 Thread Joan Goyeau (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617175#comment-16617175
 ] 

Joan Goyeau commented on KAFKA-7316:


The PR #5539 is now merged.

There is no documentation change needed here since it's an internal change to 
fix the issue.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Fix For: 2.0.1, 2.1.0
>
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)