[jira] [Commented] (KAFKA-7442) forceUnmap mmap on linux when index resize

2018-09-28 Thread huxihx (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632760#comment-16632760
 ] 

huxihx commented on KAFKA-7442:
---

Seems it's a duplicate of KAFKA-4614

> forceUnmap mmap on linux when index resize
> --
>
> Key: KAFKA-7442
> URL: https://issues.apache.org/jira/browse/KAFKA-7442
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.1.1
>Reporter: 翟玉勇
>Priority: Major
>
> when resize OffsetIndex or TimeIndex,We should force unmap mmap for linux 
> platform. Rather than waiting mixedgc or  fullgc to unmap MappedByteBuffer 
> objects
> ##before full gc
> {code}
> {"request":{"mbean":"java.nio:name=mapped,type=BufferPool","type":"read"},"value":{"TotalCapacity":2434496968,"MemoryUsed":2434496968,"Count":5392,"Name":"mapped","ObjectName":{"objectName":"java.nio:name=mapped,type=BufferPool"}},"timestamp":1537945759,"status":200}
> S0 S1 E  O  M CCSYGC YGCTFGCFGCT GCT  
>  
> 0.00 100.00  28.88   4.93  97.64  94.72 240.176 00.000
> 0.176
> 0.00 100.00  31.37   4.93  97.64  94.72 240.176 00.000
> 0.176
> {code}
> {code}
> jmap -histo:live kafka_pid
> {code}
>  
> ###after full gc
> {code}
> S0 S1 E  O  M CCSYGC YGCTFGCFGCT GCT  
>  
> 0.00   0.00  23.22   5.03  97.92  94.93 240.176 10.617
> 0.793
> 0.00   0.00  25.70   5.03  97.92  94.93 240.176 10.617
> 0.793
> 0.00   0.00  27.86   5.03  97.92  94.93 240.176 10.617
> 0.793
> {"request":{"mbean":"java.nio:name=mapped,type=BufferPool","type":"read"},"value":{"TotalCapacity":1868266036,"MemoryUsed":1868266036,"Count":5338,"Name":"mapped","ObjectName":{"objectName":"java.nio:name=mapped,type=BufferPool"}},"timestamp":1537945860,"status":200}
> {code}
> {code}
> def resize(newSize: Int) {
> inLock(lock) {
>   val raf = new RandomAccessFile(_file, "rw")
>   val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
>   val position = mmap.position
>   /* Windows won't let us modify the file length while the file is 
> mmapped :-( */
>   if(Os.isWindows)
> forceUnmap(mmap)
>   try {
> raf.setLength(roundedNewSize)
> mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 
> roundedNewSize)
> _maxEntries = mmap.limit / entrySize
> mmap.position(position)
>   } finally {
> CoreUtils.swallow(raf.close())
>   }
> }
>   }
> {code}
> {code}
> [2018-09-21 13:12:24,078] INFO Rolled new log segment for 'topic-265' in 2 
> ms. (kafka.log.Log)
> [2018-09-21 13:13:16,436] FATAL [ReplicaFetcherThread-12-15], Disk error 
> while replicating data for topic-264 (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaStorageException: I/O exception in append to log 'topic-264'
> at kafka.log.Log.append(Log.scala:349)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:153)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:141)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:141)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:138)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:138)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:136)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: java.io.IOException: Map failed
> at 

[jira] [Created] (KAFKA-7458) Avoid enforced processing during bootstrap phase

2018-09-28 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7458:


 Summary: Avoid enforced processing during bootstrap phase
 Key: KAFKA-7458
 URL: https://issues.apache.org/jira/browse/KAFKA-7458
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang
Assignee: Guozhang Wang


In KAFKA-3514, we introduced a new config for allowing users to delay enforcing 
processing without all input topic partitions to have data. This config's 
default value is 0, which means that as long as the first fetch does not 
contains some records for all the partitions it will fall into enforced 
processing immediately, which is a high risk especially under bootstrap case.

We should consider leveraging on pause / resume to make sure that upon 
starting, some partition indeed does not have any data before we fall into 
enforced processing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632669#comment-16632669
 ] 

ASF GitHub Bot commented on KAFKA-3514:
---

guozhangwang opened a new pull request #5714: KAFKA-3514: Upgrade Documentation
URL: https://github.com/apache/kafka/pull/5714
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: architecture, kip
> Fix For: 2.1.0
>
>
> KIP-353: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization]
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code:java}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code:java}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> [http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor])
> {quote}Let's assume the following case.
>  - a stream processor that uses the Processor API
>  - context.schedule(1000) is called in the init()
>  - the processor reads only one topic that has one partition
>  - using custom timestamp extractor, but that timestamp is just a wall
>  clock time
>  Image the following events:
>  1., for 10 seconds I send in 5 messages / second
>  2., does not send any messages for 3 seconds
>  3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
>  send any messages. This is ok according to the documentation, because 
>  there is not any new messages to trigger the punctuate() call. When the 
>  first few messages arrives after a restart the sending (point 3. above) I 
>  see the following sequence of method calls:
> 1., process() on the 1st message
>  2., punctuate() is called 3 times
>  3., process() on the 2nd message
>  4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
>  process() is called on the messages, because the first message's timestamp 
>  is already 3 seconds older then the last punctuate() was called, so the 
>  first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632668#comment-16632668
 ] 

ASF GitHub Bot commented on KAFKA-3514:
---

guozhangwang closed pull request #5669: KAFKA-3514: Modify pause logic if we 
being enforced processing
URL: https://github.com/apache/kafka/pull/5669
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 35e1f77fd4c..a4e08bad479 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -90,6 +90,7 @@ Upgrade Guide and API Changes
 We have also removed some public APIs that are deprecated prior to 
1.0.x in 2.0.0.
 See below for a detailed list of removed APIs.
 
+
 Streams API changes in 2.1.0
 
 We updated TopologyDescription API to allow for better 
runtime checking.
@@ -99,6 +100,14 @@ Streams API
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes;>KIP-321.
 
 
+
+We've added a new config named max.task.idle.ms to allow 
users specify how to handle out-of-order data within a task that may be 
processing multiple
+topic-partitions (see Out-of-Order
 Handling section for more details).
+The default value is set to 0, to favor minimized latency 
over synchronization between multiple input streams from topic-partitions.
+If users would like to wait for longer time when some of the 
topic-partitions do not have data available to process and hence cannot 
determine its corresponding stream time,
+they can override this config to a larger value.
+
+
 Streams API changes in 2.0.0
 
 We have removed the skippedDueToDeserializationError-rate 
and skippedDueToDeserializationError-total metrics.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2f97b7f27ba..b77a18ef85e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -233,6 +233,7 @@ public StreamTask(final TaskId id,
 partitionQueues.put(partition, queue);
 }
 
+idleStartTime = RecordQueue.UNKNOWN;
 recordInfo = new PartitionGroup.RecordInfo();
 partitionGroup = new PartitionGroup(partitionQueues);
 processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp);
@@ -355,10 +356,19 @@ public boolean process() {
 consumedOffsets.put(partition, record.offset());
 commitNeeded = true;
 
-// after processing this record, if its partition queue's buffered 
size has been
-// decreased to the threshold, we can then resume the consumption 
on this partition
-if (recordInfo.queue().size() == maxBufferedSize) {
-consumer.resume(singleton(partition));
+// if we are not in the enforced processing state, then after 
processing
+// this record, if its partition queue's buffered size has been 
decreased below
+// the threshold, we can then resume the consumption on this 
partition;
+// otherwise, we only resume the consumption on this partition 
after it
+// has been drained.
+if (idleStartTime != RecordQueue.UNKNOWN) {
+if (recordInfo.queue().isEmpty()) {
+consumer.resume(singleton(partition));
+}
+} else {
+if (recordInfo.queue().size() == maxBufferedSize) {
+consumer.resume(singleton(partition));
+}
 }
 } catch (final ProducerFencedException fatal) {
 throw new TaskMigratedException(this, fatal);
@@ -713,10 +723,19 @@ public void addRecords(final TopicPartition partition, 
final Iterable maxBufferedSize) {
-consumer.pause(singleton(partition));
+// if we are not in the enforced processing state, then after adding 
these records,
+// we can then pause the consumption for this partition if its 
partition queue's
+// buffered size has been increased beyond the threshold;
+// otherwise, we will immediately pause the consumption on this 
partition after it
+// has at least some records already
+if (idleStartTime != RecordQueue.UNKNOWN) {
+if (newQueueSize > 0) {
+consumer.pause(singleton(partition));
+}
+ 

[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-28 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632659#comment-16632659
 ] 

Vahid Hashemian commented on KAFKA-7403:


[~jonlee2] Thanks a lot for reporting it.

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jon Lee
>Priority: Blocker
> Fix For: 2.1.0
>
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[scala-library-2.11.12.jar:?]
> at 

[jira] [Updated] (KAFKA-7446) Better error message to explain the upper limit of TimeWindow

2018-09-28 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7446:
-
Labels: newbie++  (was: )

> Better error message to explain the upper limit of TimeWindow
> -
>
> Key: KAFKA-7446
> URL: https://issues.apache.org/jira/browse/KAFKA-7446
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>  Labels: newbie++
>
> The following code throws a {{IllegalArgumentException}}.
> {code:java}
> import org.apache.kafka.streams.kstream.TimeWindows
> import scala.concurrent.duration._
> val timeWindow = TimeWindows
> .of(1.minute.toMillis)
> .advanceBy(2.minutes.toMillis)
> {code}
> The exception is as follows and it's not clear why {{6}} is the upper 
> limit (not to mention that {{AdvanceMs}} with the uppercase {{A}} did also 
> confuse me).
> {code:java}
> java.lang.IllegalArgumentException: AdvanceMs must lie within interval (0, 
> 6].
> at 
> org.apache.kafka.streams.kstream.TimeWindows.advanceBy(TimeWindows.java:100)
> ... 44 elided{code}
> I think that the message should be more developer-friendly and explain the 
> boundaries, perhaps with an example (and a link to docs)?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6620) Documentation about "exactly_once" doesn't mention "transaction.state.log.min.isr"

2018-09-28 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6620.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

> Documentation about "exactly_once" doesn't mention 
> "transaction.state.log.min.isr" 
> ---
>
> Key: KAFKA-6620
> URL: https://issues.apache.org/jira/browse/KAFKA-6620
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Daniel Qian
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.1.0
>
>
> Documentation about "processing.guarantee" says:
> {quote}The processing guarantee that should be used. Possible values are 
> {{at_least_once}}(default) and {{exactly_once}}. Note that exactly-once 
> processing requires a cluster of at least three brokers by default what is 
> the recommended setting for production; *for development you can change this, 
> by adjusting broker setting* 
> `{color:#FF}*transaction.state.log.replication.factor*{color}`
> {quote}
> If one only set *transaction.state.log.replication.factor=1* but leave 
> *transaction.state.log.min.isr* with default value (which is 2) the Streams 
> Application will break.
> Hope you guys modify the doc, thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6620) Documentation about "exactly_once" doesn't mention "transaction.state.log.min.isr"

2018-09-28 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-6620:


Assignee: Lee Dongjin

> Documentation about "exactly_once" doesn't mention 
> "transaction.state.log.min.isr" 
> ---
>
> Key: KAFKA-6620
> URL: https://issues.apache.org/jira/browse/KAFKA-6620
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Daniel Qian
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.1.0
>
>
> Documentation about "processing.guarantee" says:
> {quote}The processing guarantee that should be used. Possible values are 
> {{at_least_once}}(default) and {{exactly_once}}. Note that exactly-once 
> processing requires a cluster of at least three brokers by default what is 
> the recommended setting for production; *for development you can change this, 
> by adjusting broker setting* 
> `{color:#FF}*transaction.state.log.replication.factor*{color}`
> {quote}
> If one only set *transaction.state.log.replication.factor=1* but leave 
> *transaction.state.log.min.isr* with default value (which is 2) the Streams 
> Application will break.
> Hope you guys modify the doc, thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6620) Documentation about "exactly_once" doesn't mention "transaction.state.log.min.isr"

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632572#comment-16632572
 ] 

ASF GitHub Bot commented on KAFKA-6620:
---

guozhangwang closed pull request #5434: KAFKA-6620: Documentation about 
'exactly_once' doesn't mention 'transaction.state.log.min.isr'
URL: https://github.com/apache/kafka/pull/5434
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
index 75f1045de14..a2265178759 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -452,7 +452,7 @@ num.stream.threadsretries=Integer.MAX_VALUE, enable.idempotence=true,
  and max.in.flight.requests.per.connection=1 per default.
  Note that by default exactly-once processing requires a 
cluster of at least three brokers what is the recommended setting for 
production.
- For development you can change this, by adjusting broker 
setting transaction.state.log.replication.factor to the 
number of broker you want to use.
+ For development you can change this, by adjusting broker 
setting transaction.state.log.replication.factor and transaction.state.log.min.isr to the number of broker 
you want to use.
  For more details see Processing Guarantees.
 
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b9eaaa68f67..06aea215624 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -391,7 +391,8 @@
 @SuppressWarnings("WeakerAccess")
 public static final String PROCESSING_GUARANTEE_CONFIG = 
"processing.guarantee";
 private static final String PROCESSING_GUARANTEE_DOC = "The processing 
guarantee that should be used. Possible values are " + AT_LEAST_ONCE + 
" (default) and " + EXACTLY_ONCE + ". " +
-"Note that exactly-once processing requires a cluster of at least 
three brokers by default what is the recommended setting for production; for 
development you can change this, by adjusting broker setting 
transaction.state.log.replication.factor.";
+"Note that exactly-once processing requires a cluster of at least 
three brokers by default what is the recommended setting for production; for 
development you can change this, by adjusting broker setting " +
+"transaction.state.log.replication.factor and 
transaction.state.log.min.isr.";
 
 /** {@code receive.buffer.bytes} */
 @SuppressWarnings("WeakerAccess")


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Documentation about "exactly_once" doesn't mention 
> "transaction.state.log.min.isr" 
> ---
>
> Key: KAFKA-6620
> URL: https://issues.apache.org/jira/browse/KAFKA-6620
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Daniel Qian
>Priority: Major
>
> Documentation about "processing.guarantee" says:
> {quote}The processing guarantee that should be used. Possible values are 
> {{at_least_once}}(default) and {{exactly_once}}. Note that exactly-once 
> processing requires a cluster of at least three brokers by default what is 
> the recommended setting for production; *for development you can change this, 
> by adjusting broker setting* 
> `{color:#FF}*transaction.state.log.replication.factor*{color}`
> {quote}
> If one only set *transaction.state.log.replication.factor=1* but leave 
> *transaction.state.log.min.isr* with default value (which is 2) the Streams 
> Application will break.
> Hope you guys modify the doc, thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7457) AbstractCoordinator struck in Discover

2018-09-28 Thread Joseph Aliase (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph Aliase updated KAFKA-7457:
-
Summary: AbstractCoordinator struck in Discover  (was: AbstractCoordinator 
struck in discover)

> AbstractCoordinator struck in Discover
> --
>
> Key: KAFKA-7457
> URL: https://issues.apache.org/jira/browse/KAFKA-7457
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: Linux
>Reporter: Joseph Aliase
>Priority: Minor
>
> AbstractCoordinator in kafka-client is stuck in discover and never rejoins 
> the group. Post restart application is able to join the consumer group and 
> consume from the topic.
> We see below logs every 10 minute. The sequence of events are:
> a) NetworkClient complains that connection is idle and closes the connection.
> b) Consumer client tries to determine co-ordinator by sending request to Node 
> 2.
> c) Node 2 responds by saying Node 3 is group co-ordinator.
> d) Consumer client connects to group co-ordinator.
> e) There is radio silence for 10 minutes and the sequence gets repeated. 
>  
> 2018-09-28 16:35:59.771 TRACE org.apache.kafka.common.network.Selector 
> [pool-4-thread-50] [active] [wc] About to close the idle connection from 
> 2147483644 due to being idle for 540140 millis
> 2018-09-28 16:35:59.771 DEBUG org.apache.kafka.clients.NetworkClient 
> [pool-4-thread-50] [active] [wc] Node 2147483644 disconnected.
> 2018-09-28 16:35:59.771 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> [pool-4-thread-50] [active] [wc] Marking the coordinator kafka03-wc.net:9092 
> (id: 2147483644 rack: null) dead for group test
> 2018-09-28 16:35:59.771 DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> [pool-4-thread-50] [active] [wc] Sending coordinator request for group test 
> to broker kafka02-wc.net:9092 (id: 2 rack: null)
> 2018-09-28 16:35:59.771 DEBUG org.apache.kafka.clients.NetworkClient 
> [pool-4-thread-50] [active] [wc] Sending metadata request \{topics=[address]} 
> to node 2
> 2018-09-28 16:35:59.796 DEBUG org.apache.kafka.clients.Metadata 
> [pool-4-thread-50] [active] [wc] Updated cluster metadata version 401 to 
> Cluster(id = oC0BPXfOT42WqN7-v6b5Gw, nodes = [kafka02-wc.net:9092 (id: 2 
> rack: null), kafka03-wc.net:9092 (id: 3 rack: null), kafka05-wc.net:9092 (id: 
> 5 rack: null), kafka01-wc.net:9092 (id: 1 rack: null), kafka04-wc.net:9092 
> (id: 4 rack: null)], partitions = [Partition(topic = address, partition = 2, 
> leader = 1, replicas = [1,5,4,], isr = [5,1,4,]), Partition(topic = address, 
> partition = 1, leader = 5, replicas = [5,3,4,], isr = [5,3,4,]), 
> Partition(topic = address, partition = 0, leader = 4, replicas = [2,3,4,], 
> isr = [4,3,2,]), Partition(topic = address, partition = 6, leader = 5, 
> replicas = [1,5,4,], isr = [5,1,4,]), Partition(topic = address, partition = 
> 5, leader = 4, replicas = [5,3,4,], isr = [5,4,3,]), Partition(topic = 
> address, partition = 4, leader = 3, replicas = [1,2,3,], isr = [1,3,2,]), 
> Partition(topic = address, partition = 3, leader = 2, replicas = [1,5,2,], 
> isr = [5,1,2,]), Partition(topic = address, partition = 16, leader = 5, 
> replicas = [5,2,3,], isr = [5,3,2,]), Partition(topic = address, partition = 
> 15, leader = 4, replicas = [1,2,4,], isr = [1,4,2,]), Partition(topic = 
> address, partition = 10, leader = 4, replicas = [1,5,4,], isr = [5,1,4,]), 
> Partition(topic = address, partition = 9, leader = 3, replicas = [2,3,4,], 
> isr = [3,4,2,]), Partition(topic = address, partition = 8, leader = 2, 
> replicas = [1,2,3,], isr = [1,3,2,]), Partition(topic = address, partition = 
> 7, leader = 1, replicas = [1,5,2,], isr = [5,1,2,]), Partition(topic = 
> address, partition = 14, leader = 3, replicas = [5,3,4,], isr = [5,4,3,]), 
> Partition(topic = address, partition = 13, leader = 2, replicas = [2,3,4,], 
> isr = [3,4,2,]), Partition(topic = address, partition = 12, leader = 1, 
> replicas = [1,2,3,], isr = [1,3,2,]), Partition(topic = address, partition = 
> 11, leader = 5, replicas = [1,5,2,], isr = [5,1,2,])])
> 2018-09-28 16:35:59.797 DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> [pool-4-thread-50] [active] [wc] Received group coordinator response 
> ClientResponse(receivedTimeMs=1538152559797, disconnected=false, 
> request=ClientRequest(expectResponse=true, 
> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@2531ff5f,
>  
> request=RequestSend(header=\{api_key=10,api_version=0,correlation_id=8354,client_id=active-wc-1-256},
>  body=\{group_id=test}), createdTimeMs=1538152559771, 
> sendTimeMs=1538152559771), 
> 

[jira] [Created] (KAFKA-7457) AbstractCoordinator struck in discover

2018-09-28 Thread Joseph Aliase (JIRA)
Joseph Aliase created KAFKA-7457:


 Summary: AbstractCoordinator struck in discover
 Key: KAFKA-7457
 URL: https://issues.apache.org/jira/browse/KAFKA-7457
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.1.1
 Environment: Linux
Reporter: Joseph Aliase


AbstractCoordinator in kafka-client is stuck in discover and never rejoins the 
group. Post restart application is able to join the consumer group and consume 
from the topic.

We see below logs every 10 minute. The sequence of events are:

a) NetworkClient complains that connection is idle and closes the connection.

b) Consumer client tries to determine co-ordinator by sending request to Node 2.

c) Node 2 responds by saying Node 3 is group co-ordinator.

d) Consumer client connects to group co-ordinator.

e) There is radio silence for 10 minutes and the sequence gets repeated. 

 

2018-09-28 16:35:59.771 TRACE org.apache.kafka.common.network.Selector 
[pool-4-thread-50] [active] [wc] About to close the idle connection from 
2147483644 due to being idle for 540140 millis
2018-09-28 16:35:59.771 DEBUG org.apache.kafka.clients.NetworkClient 
[pool-4-thread-50] [active] [wc] Node 2147483644 disconnected.
2018-09-28 16:35:59.771 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
[pool-4-thread-50] [active] [wc] Marking the coordinator kafka03-wc.net:9092 
(id: 2147483644 rack: null) dead for group test
2018-09-28 16:35:59.771 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
[pool-4-thread-50] [active] [wc] Sending coordinator request for group test to 
broker kafka02-wc.net:9092 (id: 2 rack: null)
2018-09-28 16:35:59.771 DEBUG org.apache.kafka.clients.NetworkClient 
[pool-4-thread-50] [active] [wc] Sending metadata request \{topics=[address]} 
to node 2
2018-09-28 16:35:59.796 DEBUG org.apache.kafka.clients.Metadata 
[pool-4-thread-50] [active] [wc] Updated cluster metadata version 401 to 
Cluster(id = oC0BPXfOT42WqN7-v6b5Gw, nodes = [kafka02-wc.net:9092 (id: 2 rack: 
null), kafka03-wc.net:9092 (id: 3 rack: null), kafka05-wc.net:9092 (id: 5 rack: 
null), kafka01-wc.net:9092 (id: 1 rack: null), kafka04-wc.net:9092 (id: 4 rack: 
null)], partitions = [Partition(topic = address, partition = 2, leader = 1, 
replicas = [1,5,4,], isr = [5,1,4,]), Partition(topic = address, partition = 1, 
leader = 5, replicas = [5,3,4,], isr = [5,3,4,]), Partition(topic = address, 
partition = 0, leader = 4, replicas = [2,3,4,], isr = [4,3,2,]), 
Partition(topic = address, partition = 6, leader = 5, replicas = [1,5,4,], isr 
= [5,1,4,]), Partition(topic = address, partition = 5, leader = 4, replicas = 
[5,3,4,], isr = [5,4,3,]), Partition(topic = address, partition = 4, leader = 
3, replicas = [1,2,3,], isr = [1,3,2,]), Partition(topic = address, partition = 
3, leader = 2, replicas = [1,5,2,], isr = [5,1,2,]), Partition(topic = address, 
partition = 16, leader = 5, replicas = [5,2,3,], isr = [5,3,2,]), 
Partition(topic = address, partition = 15, leader = 4, replicas = [1,2,4,], isr 
= [1,4,2,]), Partition(topic = address, partition = 10, leader = 4, replicas = 
[1,5,4,], isr = [5,1,4,]), Partition(topic = address, partition = 9, leader = 
3, replicas = [2,3,4,], isr = [3,4,2,]), Partition(topic = address, partition = 
8, leader = 2, replicas = [1,2,3,], isr = [1,3,2,]), Partition(topic = address, 
partition = 7, leader = 1, replicas = [1,5,2,], isr = [5,1,2,]), 
Partition(topic = address, partition = 14, leader = 3, replicas = [5,3,4,], isr 
= [5,4,3,]), Partition(topic = address, partition = 13, leader = 2, replicas = 
[2,3,4,], isr = [3,4,2,]), Partition(topic = address, partition = 12, leader = 
1, replicas = [1,2,3,], isr = [1,3,2,]), Partition(topic = address, partition = 
11, leader = 5, replicas = [1,5,2,], isr = [5,1,2,])])
2018-09-28 16:35:59.797 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
[pool-4-thread-50] [active] [wc] Received group coordinator response 
ClientResponse(receivedTimeMs=1538152559797, disconnected=false, 
request=ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@2531ff5f,
 
request=RequestSend(header=\{api_key=10,api_version=0,correlation_id=8354,client_id=active-wc-1-256},
 body=\{group_id=test}), createdTimeMs=1538152559771, 
sendTimeMs=1538152559771), 
responseBody=\{error_code=0,coordinator={node_id=3,host=kafka03-wc.net,port=9092}})
2018-09-28 16:35:59.797 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
[pool-4-thread-50] [active] [wc] Discovered coordinator kafka03-wc.net:9092 
(id: 2147483644 rack: null) for group test.
2018-09-28 16:35:59.797 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
[pool-4-thread-50] [active] [wc] Marking the coordinator kafka03-wc.net:9092 
(id: 

[jira] [Commented] (KAFKA-7453) Enable idle expiry of connections which are never selected

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632509#comment-16632509
 ] 

ASF GitHub Bot commented on KAFKA-7453:
---

rajinisivaram closed pull request #5712: KAFKA-7453: Expire registered channels 
not selected within idle timeout
URL: https://github.com/apache/kafka/pull/5712
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 44223e7ab31..93325d5b4a9 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -319,6 +319,8 @@ private SelectionKey registerChannel(String id, 
SocketChannel socketChannel, int
 SelectionKey key = socketChannel.register(nioSelector, interestedOps);
 KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, 
key);
 this.channels.put(id, channel);
+if (idleExpiryManager != null)
+idleExpiryManager.update(channel.id(), time.nanoseconds());
 return key;
 }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index cfd7fb3af8d..cef7c7fae49 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -363,6 +363,19 @@ public void testCloseOldestConnection() throws Exception {
 assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
 }
 
+@Test
+public void testIdleExpiryWithoutReadyKeys() throws IOException {
+String id = "0";
+selector.connect(id, new InetSocketAddress("localhost", server.port), 
BUFFER_SIZE, BUFFER_SIZE);
+KafkaChannel channel = selector.channel(id);
+channel.selectionKey().interestOps(0);
+
+time.sleep(6000); // The max idle time is 5000ms
+selector.poll(0);
+assertTrue("The idle connection should have been closed", 
selector.disconnected().containsKey(id));
+assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
+}
+
 @Test
 public void testImmediatelyConnectedCleaned() throws Exception {
 Metrics metrics = new Metrics(); // new metrics object to avoid metric 
registration conflicts


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable idle expiry of connections which are never selected
> --
>
> Key: KAFKA-7453
> URL: https://issues.apache.org/jira/browse/KAFKA-7453
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.1.0
>
>
> We add connections to Selector#channels when a connection is registered, but 
> we start idle expiry of connections only when the connection is first  
> selected. In some environments where the channel may never get selected, this 
> could leak memory and sockets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6051) ReplicaFetcherThread should close the ReplicaFetcherBlockingSend earlier on shutdown

2018-09-28 Thread Zhanxiang (Patrick) Huang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632280#comment-16632280
 ] 

Zhanxiang (Patrick) Huang edited comment on KAFKA-6051 at 9/28/18 7:23 PM:
---

[~mchinavan]

[~junrao]

I have a follow up of this issue.

In 2.0 deployment, we saw the following log when shutting down the 
ReplicaManager in broker cleaned shutdown:
{noformat}
2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
java.lang.IllegalArgumentException: null
at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
~[?:1.8.0_121]
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
 ~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
 ~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
~[kafka-clients-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
 ~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
~[scala-library-2.11.12.jar:?]
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
~[scala-library-2.11.12.jar:?]
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) 
~[scala-library-2.11.12.jar:?]
at 
kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
 ~[kafka_2.11-2.0.0.22.jar:?]
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
~[kafka_2.11-2.0.0.22.jar:?]
{noformat}
After that, we noticed that some of the replica fetcher thread fail to shutdown:
{noformat}
2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
[ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
~[?:1.8.0_121]
at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
~[?:1.8.0_121]
at 
org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) 
~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) 
~[kafka-clients-2.0.0.22.jar:?]
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) 
~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 

[jira] [Commented] (KAFKA-6051) ReplicaFetcherThread should close the ReplicaFetcherBlockingSend earlier on shutdown

2018-09-28 Thread Zhanxiang (Patrick) Huang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632280#comment-16632280
 ] 

Zhanxiang (Patrick) Huang commented on KAFKA-6051:
--

[~mchinavan]

[~junrao]

I have a follow up of this issue.

In 2.0 deployment, we saw the following log when shutting down the 
ReplicaManager in broker cleaned shutdown:
{noformat}
2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
java.lang.IllegalArgumentException: null
at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
~[?:1.8.0_121]
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
 ~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
 ~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
~[kafka-clients-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
 ~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
~[scala-library-2.11.12.jar:?]
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
~[scala-library-2.11.12.jar:?]
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) 
~[scala-library-2.11.12.jar:?]
at 
kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
 ~[kafka_2.11-2.0.0.22.jar:?]
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
~[kafka_2.11-2.0.0.22.jar:?]
{noformat}
After that, we noticed that some of the replica fetcher thread fail to shutdown:
{noformat}
2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
[ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
~[?:1.8.0_121]
at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
~[?:1.8.0_121]
at 
org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) 
~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) 
~[kafka-clients-2.0.0.22.jar:?]
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) 
~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
~[kafka_2.11-2.0.0.22.jar:?]
at 

[jira] [Created] (KAFKA-7456) Serde Inheritance in Streams DSL

2018-09-28 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7456:


 Summary: Serde Inheritance in Streams DSL
 Key: KAFKA-7456
 URL: https://issues.apache.org/jira/browse/KAFKA-7456
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 2.1.0


This is a prerequisite for further topology optimization in the Streams DSL: we 
should let different operators inside the DSL to be able to pass along key and 
value serdes if they are not explicitly specified by users. The serde 
specification precedence should generally be:

1) Overridden values via control objects (e.g. Materialized, Serialized, 
Consumed, etc)
2) Serdes that can be inferred from the operator itself (e.g. 
groupBy().count(), where value serde can default to `LongSerde`).
3) Serde inherited from parent operator if possible (note if the key / value 
types have been changed, then the corresponding serde cannot be inherited).
4) Default serde specified in the config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7455) JmxTool cannot connect to an SSL-enabled JMX RMI port

2018-09-28 Thread Attila Sasvari (JIRA)
Attila Sasvari created KAFKA-7455:
-

 Summary: JmxTool cannot connect to an SSL-enabled JMX RMI port
 Key: KAFKA-7455
 URL: https://issues.apache.org/jira/browse/KAFKA-7455
 Project: Kafka
  Issue Type: Bug
  Components: tools
Reporter: Attila Sasvari


When JmxTool tries to connect to an SSL-enabled JMX RMI port with 
JMXConnectorFactory'connect(), the connection attempt results in a 
"java.rmi.ConnectIOException: non-JRMP server at remote endpoint":

{code}
$ export 
KAFKA_OPTS="-Djavax.net.ssl.trustStore=/tmp/kafka.server.truststore.jks 
-Djavax.net.ssl.trustStorePassword=test"

$ bin/kafka-run-class.sh kafka.tools.JmxTool --object-name 
"kafka.server:type=kafka-metrics-count"  --jmx-url 
service:jmx:rmi:///jndi/rmi://localhost:9393/jmxrmi

ConnectIOException: non-JRMP server at remote endpoint].
java.io.IOException: Failed to retrieve RMIServer stub: 
javax.naming.CommunicationException [Root exception is 
java.rmi.ConnectIOException: non-JRMP server at remote endpoint]
at 
javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:369)
at 
javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:270)
at kafka.tools.JmxTool$.main(JmxTool.scala:120)
at kafka.tools.JmxTool.main(JmxTool.scala)
{code}

The problem is that {{JmxTool}} does not specify {{SslRMIClientSocketFactory}} 
when it tries to connect
https://github.com/apache/kafka/blob/70d90c371833b09cf934c8c2358171433892a085/core/src/main/scala/kafka/tools/JmxTool.scala#L120
{code}  
  jmxc = JMXConnectorFactory.connect(url, null)
{code}
To connect to a secured RMI port, it should pass an envionrment map that 
contains a {{("com.sun.jndi.rmi.factory.socket", new 
SslRMIClientSocketFactory)}} entry.

More info:
- https://docs.oracle.com/cd/E19698-01/816-7609/security-35/index.html
- https://docs.oracle.com/javase/8/docs/technotes/guides/management/agent.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-28 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632074#comment-16632074
 ] 

Jon Lee commented on KAFKA-7403:


[~vahid] [~hachikuji] Thank you very much for the patch. 

One question: Why is the expiration timestamp always based on current timestamp 
now? Previously, if there was custom commit timestamp provided, it was used to 
compute the expiration timestamp, instead of current timestamp. 

 

 

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jon Lee
>Priority: Blocker
> Fix For: 2.1.0
>
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at 

[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2018-09-28 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631998#comment-16631998
 ] 

Rajini Sivaram commented on KAFKA-7304:
---

[~yuyang08] We came across a similar issue with large number of clients for 
which we had a heap dump. There was no leak in `Selector#closingChannels` which 
were mostly empty, but there were a large number of channels in 
`Selector#channels`, mainly due to client reconnections. I have submitted two 
PRs to reduce memory pressure in this type of scenario:
 * [https://github.com/apache/kafka/pull/5713]
 * [https://github.com/apache/kafka/pull/5712]

It will be good to know if these help with your scenario. We are also 
considering limiting the number of new connections processed in each iteration 
by the broker to improve fairness between processing of new and existing 
connections, but we need to do more testing for that before submitting a PR.

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at 
> 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot 
> 2018-08-29 at 10.50.47 AM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7454) Use lazy allocation for SslTransportLayer buffers

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631981#comment-16631981
 ] 

ASF GitHub Bot commented on KAFKA-7454:
---

rajinisivaram opened a new pull request #5713: KAFKA-7454: Use lazy allocation 
for SslTransportLayer buffers
URL: https://github.com/apache/kafka/pull/5713
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use lazy allocation for SslTransportLayer buffers
> -
>
> Key: KAFKA-7454
> URL: https://issues.apache.org/jira/browse/KAFKA-7454
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.1.0
>
>
> At the moment, three heap buffers are allocated for SslTransportLayers at the 
> time when the instance is created (before establishing the connection on the 
> client-side and when accepting the connection on the broker-side). When there 
> are a large number of connections and the broker is overloaded, this can 
> result in unnecessary memory pressure on the broker due to client 
> reconnections since buffers may be allocated unnecessarily for client 
> connections whose handshake is never processed. It will be better to lazily 
> allocate buffers to reduce memory pressure. On the broker-side, buffers will 
> be allocated when the first packet is received from the client, starting 
> handshake. On the client-side, buffers will be allocated once the connection 
> is established when the client initiates handshake.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7454) Use lazy allocation for SslTransportLayer buffers

2018-09-28 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-7454:
-

 Summary: Use lazy allocation for SslTransportLayer buffers
 Key: KAFKA-7454
 URL: https://issues.apache.org/jira/browse/KAFKA-7454
 Project: Kafka
  Issue Type: Improvement
  Components: security
Affects Versions: 2.0.0, 1.1.1, 1.0.2, 0.11.0.3
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.1.0


At the moment, three heap buffers are allocated for SslTransportLayers at the 
time when the instance is created (before establishing the connection on the 
client-side and when accepting the connection on the broker-side). When there 
are a large number of connections and the broker is overloaded, this can result 
in unnecessary memory pressure on the broker due to client reconnections since 
buffers may be allocated unnecessarily for client connections whose handshake 
is never processed. It will be better to lazily allocate buffers to reduce 
memory pressure. On the broker-side, buffers will be allocated when the first 
packet is received from the client, starting handshake. On the client-side, 
buffers will be allocated once the connection is established when the client 
initiates handshake.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7453) Enable idle expiry of connections which are never selected

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631932#comment-16631932
 ] 

ASF GitHub Bot commented on KAFKA-7453:
---

rajinisivaram opened a new pull request #5712: KAFKA-7453: Expire registered 
channels not selected within idle timeout
URL: https://github.com/apache/kafka/pull/5712
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable idle expiry of connections which are never selected
> --
>
> Key: KAFKA-7453
> URL: https://issues.apache.org/jira/browse/KAFKA-7453
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.1.0
>
>
> We add connections to Selector#channels when a connection is registered, but 
> we start idle expiry of connections only when the connection is first  
> selected. In some environments where the channel may never get selected, this 
> could leak memory and sockets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7453) Enable idle expiry of connections which are never selected

2018-09-28 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-7453:
-

 Summary: Enable idle expiry of connections which are never selected
 Key: KAFKA-7453
 URL: https://issues.apache.org/jira/browse/KAFKA-7453
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 2.0.0, 1.1.1, 1.0.2, 0.11.0.3, 0.10.2.2
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.1.0


We add connections to Selector#channels when a connection is registered, but we 
start idle expiry of connections only when the connection is first  selected. 
In some environments where the channel may never get selected, this could leak 
memory and sockets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7451) Missing JMX metrics

2018-09-28 Thread Jork Zijlstra (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631802#comment-16631802
 ] 

Jork Zijlstra commented on KAFKA-7451:
--

I updated my config according to the documentation and noticed 2 things.

1) missing metrics "records-lead-min" in 
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="\{client-id}".

I think this is because we are missing the entire 
kafka.consumer:type=consumer-fetch-manager-metrics,partition="\{partition}",topic="\{topic}",client-id="\{client-id}"
 metrics, so the aggregation "records-lead-min" also doesn't happen

2) missing metrics "bufferpool-wait-time" in 
kafka.producer:type=producer-metrics,client-id=([-.\w]+).

We do see "bufferpool-wait-time-total" being exported

> Missing JMX metrics
> ---
>
> Key: KAFKA-7451
> URL: https://issues.apache.org/jira/browse/KAFKA-7451
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.1
>Reporter: Jork Zijlstra
>Priority: Minor
> Attachments: consumer-metrics.png, consumer-node-metrics.png
>
>
> We are trying to use the jmx metrics that are being exported using the 
> documentation located at: 
> [https://docs.confluent.io/current/kafka/monitoring.html]
> According to the docs there should be a "request-latency-avg" and 
> "request-latency-max" available on "MBean: 
> kafka.consumer:type=consumer-metrics,client-id=([-.w]+)". However what we see 
> is that these metrics aren't there.
> What I have noticed is that these are only available on 
> "kafka.consumer:type=consumer-node-metrics,client-id=*,node-id=*". So the per 
> node metrics, while according to the documentation they shouldn't exist there.
> See attached screenshots.
>  
> So I was wondering if the documentation is wrong or are the metrics not 
> exported properly?
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7451) Missing JMX metrics

2018-09-28 Thread Jork Zijlstra (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631673#comment-16631673
 ] 

Jork Zijlstra commented on KAFKA-7451:
--

Thnx, it seem we where using the wrong documentation.

The documentation you provided indeed show the metrics to be on the 
consumer-node-metrics not on the consumer-metrics.

> Missing JMX metrics
> ---
>
> Key: KAFKA-7451
> URL: https://issues.apache.org/jira/browse/KAFKA-7451
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.1
>Reporter: Jork Zijlstra
>Priority: Minor
> Attachments: consumer-metrics.png, consumer-node-metrics.png
>
>
> We are trying to use the jmx metrics that are being exported using the 
> documentation located at: 
> [https://docs.confluent.io/current/kafka/monitoring.html]
> According to the docs there should be a "request-latency-avg" and 
> "request-latency-max" available on "MBean: 
> kafka.consumer:type=consumer-metrics,client-id=([-.w]+)". However what we see 
> is that these metrics aren't there.
> What I have noticed is that these are only available on 
> "kafka.consumer:type=consumer-node-metrics,client-id=*,node-id=*". So the per 
> node metrics, while according to the documentation they shouldn't exist there.
> See attached screenshots.
>  
> So I was wondering if the documentation is wrong or are the metrics not 
> exported properly?
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7451) Missing JMX metrics

2018-09-28 Thread huxihx (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631556#comment-16631556
 ] 

huxihx commented on KAFKA-7451:
---

Check out [https://kafka.apache.org/documentation/#common_node_monitoring]. It 
should list the correct MBean names.

> Missing JMX metrics
> ---
>
> Key: KAFKA-7451
> URL: https://issues.apache.org/jira/browse/KAFKA-7451
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.1
>Reporter: Jork Zijlstra
>Priority: Minor
> Attachments: consumer-metrics.png, consumer-node-metrics.png
>
>
> We are trying to use the jmx metrics that are being exported using the 
> documentation located at: 
> [https://docs.confluent.io/current/kafka/monitoring.html]
> According to the docs there should be a "request-latency-avg" and 
> "request-latency-max" available on "MBean: 
> kafka.consumer:type=consumer-metrics,client-id=([-.w]+)". However what we see 
> is that these metrics aren't there.
> What I have noticed is that these are only available on 
> "kafka.consumer:type=consumer-node-metrics,client-id=*,node-id=*". So the per 
> node metrics, while according to the documentation they shouldn't exist there.
> See attached screenshots.
>  
> So I was wondering if the documentation is wrong or are the metrics not 
> exported properly?
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7417) Some topics lost / cannot recover their ISR status following broker crash

2018-09-28 Thread Mikhail Khomenko (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631542#comment-16631542
 ] 

Mikhail Khomenko commented on KAFKA-7417:
-

Currently this issue was fixed by adding a new broker to cluster:
 * new broker was running
 *  all partitions were manually rebalanced after it (according to this manual 
- [https://svn.apache.org/repos/asf/kafka/site/082/ops.html)]
 * currently there are NO under-replicated-partitions
 * statistics of topics/partitions:
topics = 1113, partitions = 8981

- notice: during rebalancing RF became 4, then automatically returned to RF=3

> Some topics lost / cannot recover their ISR status following broker crash
> -
>
> Key: KAFKA-7417
> URL: https://issues.apache.org/jira/browse/KAFKA-7417
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Mikhail Khomenko
>Priority: Major
>
> Hi,
>  we have faced with the next issue - some replicas cannot become in-sync. 
> Distribution of in-sync replicas amongst topics is random. For instance:
> {code:java}
> $ kafka-topics --zookeeper 1.2.3.4:8181 --describe --topic TEST
> Topic:TEST PartitionCount:8 ReplicationFactor:3 Configs:
> Topic: TEST Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 0,1,2
> Topic: TEST Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 0,1,2
> Topic: TEST Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 0,1,2
> Topic: TEST Partition: 3 Leader: 2 Replicas: 0,1,2 Isr: 0,1,2
> Topic: TEST Partition: 4 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2
> Topic: TEST Partition: 5 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2
> Topic: TEST Partition: 6 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
> Topic: TEST Partition: 7 Leader: 0 Replicas: 1,0,2 Isr: 0,2{code}
> Files in segment TEST-7 are equal (the same md5sum) on all 3 brokers. Also 
> were checked by kafka.tools.DumpLogSegments - messages are the same.
> We have 3-broker cluster configuration with Confluent Kafka 5.0.0 (it's 
> Apache Kafka 2.0.0).
>  Each broker has the next configuration:
> {code:java}
> advertised.host.name = null
> advertised.listeners = PLAINTEXT://1.2.3.4:9200
> advertised.port = null
> alter.config.policy.class.name = null
> alter.log.dirs.replication.quota.window.num = 11
> alter.log.dirs.replication.quota.window.size.seconds = 1
> authorizer.class.name = 
> auto.create.topics.enable = true
> auto.leader.rebalance.enable = true
> background.threads = 10
> broker.id = 1
> broker.id.generation.enable = true
> broker.interceptor.class = class 
> org.apache.kafka.server.interceptor.DefaultBrokerInterceptor
> broker.rack = null
> client.quota.callback.class = null
> compression.type = producer
> connections.max.idle.ms = 60
> controlled.shutdown.enable = true
> controlled.shutdown.max.retries = 3
> controlled.shutdown.retry.backoff.ms = 5000
> controller.socket.timeout.ms = 3
> create.topic.policy.class.name = null
> default.replication.factor = 3
> delegation.token.expiry.check.interval.ms = 360
> delegation.token.expiry.time.ms = 8640
> delegation.token.master.key = null
> delegation.token.max.lifetime.ms = 60480
> delete.records.purgatory.purge.interval.requests = 1
> delete.topic.enable = true
> fetch.purgatory.purge.interval.requests = 1000
> group.initial.rebalance.delay.ms = 3000
> group.max.session.timeout.ms = 30
> group.min.session.timeout.ms = 6000
> host.name = 
> inter.broker.listener.name = null
> inter.broker.protocol.version = 2.0
> leader.imbalance.check.interval.seconds = 300
> leader.imbalance.per.broker.percentage = 10
> listener.security.protocol.map = 
> PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> listeners = PLAINTEXT://0.0.0.0:9200
> log.cleaner.backoff.ms = 15000
> log.cleaner.dedupe.buffer.size = 134217728
> log.cleaner.delete.retention.ms = 8640
> log.cleaner.enable = true
> log.cleaner.io.buffer.load.factor = 0.9
> log.cleaner.io.buffer.size = 524288
> log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
> log.cleaner.min.cleanable.ratio = 0.5
> log.cleaner.min.compaction.lag.ms = 0
> log.cleaner.threads = 1
> log.cleanup.policy = [delete]
> log.dir = /tmp/kafka-logs
> log.dirs = /var/lib/kafka/data
> log.flush.interval.messages = 9223372036854775807
> log.flush.interval.ms = null
> log.flush.offset.checkpoint.interval.ms = 6
> log.flush.scheduler.interval.ms = 9223372036854775807
> log.flush.start.offset.checkpoint.interval.ms = 6
> log.index.interval.bytes = 4096
> log.index.size.max.bytes = 10485760
> log.message.downconversion.enable = true
> log.message.format.version = 2.0
> log.message.timestamp.difference.max.ms = 9223372036854775807
> log.message.timestamp.type = CreateTime
> log.preallocate = false
> log.retention.bytes = -1
> log.retention.check.interval.ms = 

[jira] [Assigned] (KAFKA-7452) Deleting snapshot files after check-pointing log recovery offsets can slow down replication when truncation happens

2018-09-28 Thread Zhanxiang (Patrick) Huang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanxiang (Patrick) Huang reassigned KAFKA-7452:


Assignee: Zhanxiang (Patrick) Huang

> Deleting snapshot files after check-pointing log recovery offsets can slow 
> down replication when truncation happens
> ---
>
> Key: KAFKA-7452
> URL: https://issues.apache.org/jira/browse/KAFKA-7452
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0, 1.0.1, 1.1.0, 1.1.1, 2.0.0
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>
> After KAFKA-5829, Kafka will try to iterate through all the partition dirs to 
> delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir". 
> Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following 
> places:
>  # Truncation
>  # Log dir deletion and movement
>  # Background thread checkpointing recovery offsets
> In 2.0 deployment on a cluster with 10k partitions per broker, we found out 
> that deleting useless snapshot files in the critical path of log truncation 
> can significantly slow down followers to catch up with leader during rolling 
> bounce (~2x slower than 0.11). The reason is that we basically do a "ls -R" 
> for the whole data directory only to potentially delete the snapshot files in 
> one partition directory because the way we identify snapshot files is to list 
> the directories and check the filename suffix.
> In our case, "listSnapshotFiles" takes ~1ms per partition directory so it 
> takes ~1ms * 10k = ~10s to just delete snapshot files in one partition after 
> the truncation, which delays future fetches in the fetcher thread.
> Here are the related code snippets:
>  LogManager.scala
>  
> {code:java}
> private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
>   for {
> partitionToLog <- logsByDir.get(dir.getAbsolutePath)
> checkpoint <- recoveryPointCheckpoints.get(dir)
>   } {
> try {
>   checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
>   allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
> } catch {
>   case e: IOException =>
> logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, 
> s"Disk error while writing to recovery point " +
>   s"file in directory $dir", e)
> }
>   }
> }
> {code}
>  
>  ProducerStateChangeManager.scala
>  
> {code:java}
> private[log] def listSnapshotFiles(dir: File): Seq[File] = {
>   if (dir.exists && dir.isDirectory) {
> Option(dir.listFiles).map { files =>
>   files.filter(f => f.isFile && isSnapshotFile(f)).toSeq
> }.getOrElse(Seq.empty)
>   } else Seq.empty
> }
> private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => 
> true) {
>   listSnapshotFiles(dir).filter(file => 
> predicate(offsetFromFile(file))).foreach { file =>
> Files.deleteIfExists(file.toPath)
>   }
> }
> {code}
>  
> There are a few things that can be optimized here:
>  # We can have an in-memory cache for the snapshot files metadata (filename) 
> in ProducerStateManager to avoid calling dir.listFiles in 
> "deleteSnapshotFiles", "latestSnapshotFile" and "oldestSnapshotFile".
>  # After truncation, we can only try to delete snapshot files for the 
> truncated partitions (in replica fetcher thread, we truncate one partition at 
> a time) instead of all partitions. Or maybe we don't even need to delete 
> snapshot files in the critical path of truncation because the background 
> log-recovery-offset-checkpoint-thread will do it periodically. This can also 
> apply on log deletion/movement.
>  # If we want to further optimize the actual snapshot file deletion, we can 
> make it async. But I am not sure whether it is needed after we have 1) and 2).
> Also, we notice that there is no way to disable transaction/exactly-once 
> support in the broker-side given that it will bring in some extra overhead 
> even though we have no clients using this feature. Not sure whether this is a 
> common use case, but it is useful if we can have a switch to avoid the extra 
> performance overhead.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7452) Deleting snapshot files after check-pointing log recovery offsets can slow down replication when truncation happens

2018-09-28 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-7452:


 Summary: Deleting snapshot files after check-pointing log recovery 
offsets can slow down replication when truncation happens
 Key: KAFKA-7452
 URL: https://issues.apache.org/jira/browse/KAFKA-7452
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0, 1.1.1, 1.1.0, 1.0.1, 1.0.0
Reporter: Zhanxiang (Patrick) Huang


After KAFKA-5829, Kafka will try to iterate through all the partition dirs to 
delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir". 
Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following places:
 # Truncation
 # Log dir deletion and movement
 # Background thread checkpointing recovery offsets

In 2.0 deployment on a cluster with 10k partitions per broker, we found out 
that deleting useless snapshot files in the critical path of log truncation can 
significantly slow down followers to catch up with leader during rolling bounce 
(~2x slower than 0.11). The reason is that we basically do a "ls -R" for the 
whole data directory only to potentially delete the snapshot files in one 
partition directory because the way we identify snapshot files is to list the 
directories and check the filename suffix.

In our case, "listSnapshotFiles" takes ~1ms per partition directory so it takes 
~1ms * 10k = ~10s to just delete snapshot files in one partition after the 
truncation, which delays future fetches in the fetcher thread.

Here are the related code snippets:

 LogManager.scala

 
{code:java}
private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
  for {
partitionToLog <- logsByDir.get(dir.getAbsolutePath)
checkpoint <- recoveryPointCheckpoints.get(dir)
  } {
try {
  checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
  allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
} catch {
  case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk 
error while writing to recovery point " +
  s"file in directory $dir", e)
}
  }
}
{code}
 

 ProducerStateChangeManager.scala

 
{code:java}
private[log] def listSnapshotFiles(dir: File): Seq[File] = {
  if (dir.exists && dir.isDirectory) {
Option(dir.listFiles).map { files =>
  files.filter(f => f.isFile && isSnapshotFile(f)).toSeq
}.getOrElse(Seq.empty)
  } else Seq.empty
}


private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => 
true) {
  listSnapshotFiles(dir).filter(file => 
predicate(offsetFromFile(file))).foreach { file =>
Files.deleteIfExists(file.toPath)
  }
}

{code}
 

There are a few things that can be optimized here:
 # We can have an in-memory cache for the snapshot files metadata (filename) in 
ProducerStateManager to avoid calling dir.listFiles in "deleteSnapshotFiles", 
"latestSnapshotFile" and "oldestSnapshotFile".
 # After truncation, we can only try to delete snapshot files for the truncated 
partitions (in replica fetcher thread, we truncate one partition at a time) 
instead of all partitions. Or maybe we don't even need to delete snapshot files 
in the critical path of truncation because the background 
log-recovery-offset-checkpoint-thread will do it periodically. This can also 
apply on log deletion/movement.
 # If we want to further optimize the actual snapshot file deletion, we can 
make it async. But I am not sure whether it is needed after we have 1) and 2).

Also, we notice that there is no way to disable transaction/exactly-once 
support in the broker-side given that it will bring in some extra overhead even 
though we have no clients using this feature. Not sure whether this is a common 
use case, but it is useful if we can have a switch to avoid the extra 
performance overhead.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7451) Missing JMX metrics

2018-09-28 Thread Jork Zijlstra (JIRA)
Jork Zijlstra created KAFKA-7451:


 Summary: Missing JMX metrics
 Key: KAFKA-7451
 URL: https://issues.apache.org/jira/browse/KAFKA-7451
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 1.0.1
Reporter: Jork Zijlstra
 Attachments: consumer-metrics.png, consumer-node-metrics.png

We are trying to use the jmx metrics that are being exported using the 
documentation located at: 
[https://docs.confluent.io/current/kafka/monitoring.html]

According to the docs there should be a "request-latency-avg" and 
"request-latency-max" available on "MBean: 
kafka.consumer:type=consumer-metrics,client-id=([-.w]+)". However what we see 
is that these metrics aren't there.

What I have noticed is that these are only available on 
"kafka.consumer:type=consumer-node-metrics,client-id=*,node-id=*". So the per 
node metrics, while according to the documentation they shouldn't exist there.

See attached screenshots.

 

So I was wondering if the documentation is wrong or are the metrics not 
exported properly?

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7450) kafka controller RequestSendThread stuck in infinite loop after SSL handshake failure with peer brokers

2018-09-28 Thread Yu Yang (JIRA)
Yu Yang created KAFKA-7450:
--

 Summary: kafka controller RequestSendThread stuck in infinite loop 
after SSL handshake failure with peer brokers
 Key: KAFKA-7450
 URL: https://issues.apache.org/jira/browse/KAFKA-7450
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 2.0.0
Reporter: Yu Yang


After updating security.inter.broker.protocol to SSL for our cluster, we 
observed that the controller can get into almost 100% cpu usage. 

{code}
listeners=PLAINTEXT://:9092,SSL://:9093
security.inter.broker.protocol=SSL
{code}

There is no obvious error in server.log. But in controller.log, there is 
repetitive SSL handshare failure error as below: 

{code}
[2018-09-28 05:53:10,821] WARN [RequestSendThread controllerId=6042] Controller 
6042's connection to broker datakafka06176.ec2.pin220.com:9093 (id: 6176 rack: 
null) was unsuccessful (kafka.controller.RequestSendThread)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence 
violation, 2
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1487)
at 
sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:813)
at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:781)
at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:468)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:331)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence 
violation, 2
at 
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:196)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:963)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416)
at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:473)
... 10 more

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)