[jira] [Created] (KAFKA-7468) KafkaController no need to startup the inner kafkaScheducer when auto.leader.rebalance.enable is false

2018-10-01 Thread ZhangShuai (JIRA)
ZhangShuai created KAFKA-7468:
-

 Summary: KafkaController no need to startup the inner 
kafkaScheducer when auto.leader.rebalance.enable is false
 Key: KAFKA-7468
 URL: https://issues.apache.org/jira/browse/KAFKA-7468
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 2.0.0
Reporter: ZhangShuai


KafkaController has its inner member KafkaScheduler instance to schedule auto 
leader rebalance tasks. But when the auto rebalance config set false, no need 
to startup the kafkaScheduler, because that created a non-used 
ScheduledThreadPoolExecutor with one thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value

2018-10-01 Thread Benjamin Bianchi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benjamin Bianchi updated KAFKA-3832:

Comment: was deleted

(was: Hi,

 

I'd like to try this ticket!)

> Kafka Connect's JSON Converter never outputs a null value
> -
>
> Key: KAFKA-3832
> URL: https://issues.apache.org/jira/browse/KAFKA-3832
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Prasanna Subburaj
>Priority: Major
>  Labels: newbie
>
> Kafka Connect's JSON Converter will never output a null value when 
> {{enableSchemas=true}}. This means that when a connector outputs a 
> {{SourceRecord}} with a null value, the JSON Converter will always produce a 
> message value with:
> {code:javascript}
> {
>   "schema": null,
>   "payload": null
> }
> {code}
> And, this means that while Kafka log compaction will always be able to remove 
> earlier messages with the same key, log compaction will never remove _all_ of 
> the messages with the same key. 
> The JSON Connector's {{fromConnectData(...)}} should always return null when 
> it is supplied a null value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value

2018-10-01 Thread Benjamin Bianchi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634894#comment-16634894
 ] 

Benjamin Bianchi commented on KAFKA-3832:
-

Hi,

 

I'd like to try this ticket!

> Kafka Connect's JSON Converter never outputs a null value
> -
>
> Key: KAFKA-3832
> URL: https://issues.apache.org/jira/browse/KAFKA-3832
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Prasanna Subburaj
>Priority: Major
>  Labels: newbie
>
> Kafka Connect's JSON Converter will never output a null value when 
> {{enableSchemas=true}}. This means that when a connector outputs a 
> {{SourceRecord}} with a null value, the JSON Converter will always produce a 
> message value with:
> {code:javascript}
> {
>   "schema": null,
>   "payload": null
> }
> {code}
> And, this means that while Kafka log compaction will always be able to remove 
> earlier messages with the same key, log compaction will never remove _all_ of 
> the messages with the same key. 
> The JSON Connector's {{fromConnectData(...)}} should always return null when 
> it is supplied a null value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty

2018-10-01 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634878#comment-16634878
 ] 

Ismael Juma commented on KAFKA-7467:


How come this is considered minor? It seems severe since it kills the cleaner.

> NoSuchElementException is raised because controlBatch is empty
> --
>
> Key: KAFKA-7467
> URL: https://issues.apache.org/jira/browse/KAFKA-7467
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Badai Aqrandista
>Assignee: Bob Barrett
>Priority: Minor
>
> Somehow, log cleaner died because of NoSuchElementException when it calls 
> onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 
> 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.util.NoSuchElementException
>   at java.util.Collections$EmptyIterator.next(Collections.java:4189)
>   at 
> kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
>   at 
> kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
>   at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
>   at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
>   at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>   at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>   at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>   at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
> {noformat}
>   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
> consumeAbortedTxnsUpTo(controlBatch.lastOffset)
> val controlRecord = controlBatch.iterator.next()
> val controlType = ControlRecordType.parse(controlRecord.key)
> val producerId = controlBatch.producerId
> {noformat}
> MemoryRecords.filterTo copies the original control attribute for empty 
> batches, which results in empty control batches. Trying to read the control 
> type of an empty batch causes the error.
> {noformat}
>   else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
> if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
> throw new IllegalStateException("Empty batches are only supported for 
> magic v2 and above");
> 
> bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
> DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), 
> batchMagic, batch.producerId(),
> batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), 
> batch.lastOffset(),
> batch.partitionLeaderEpoch(), batch.timestampType(), 
> batch.maxTimestamp(),
> batch.isTransactional(), batch.isControlBatch());
> filterResult.updateRetainedBatchMetadata(batch, 0, true);
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty

2018-10-01 Thread Bob Barrett (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Barrett updated KAFKA-7467:
---
Description: 
Somehow, log cleaner died because of NoSuchElementException when it calls 
onControlBatchRead:
{noformat}
[2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
__consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, 
discarding deletes. (kafka.log.LogCleaner)
[2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
(kafka.log.LogCleaner)
java.util.NoSuchElementException
  at java.util.Collections$EmptyIterator.next(Collections.java:4189)
  at 
kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
  at 
kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
  at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
  at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
(kafka.log.LogCleaner)
{noformat}
The following code does not seem to expect the controlBatch to be empty:

[https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
{noformat}
  def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
consumeAbortedTxnsUpTo(controlBatch.lastOffset)

val controlRecord = controlBatch.iterator.next()
val controlType = ControlRecordType.parse(controlRecord.key)
val producerId = controlBatch.producerId
{noformat}
MemoryRecords.filterTo copies the original control attribute for empty batches, 
which results in empty control batches. Trying to read the control type of an 
empty batch causes the error.
{noformat}
  else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
throw new IllegalStateException("Empty batches are only supported for 
magic v2 and above");


bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), 
batchMagic, batch.producerId(),
batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), 
batch.lastOffset(),
batch.partitionLeaderEpoch(), batch.timestampType(), 
batch.maxTimestamp(),
batch.isTransactional(), batch.isControlBatch());
filterResult.updateRetainedBatchMetadata(batch, 0, true);
{noformat}

  was:
Somehow, log cleaner died because of NoSuchElementException when it calls 
onControlBatchRead:
{noformat}
[2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
__consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, 
discarding deletes. (kafka.log.LogCleaner)
[2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
(kafka.log.LogCleaner)
java.util.NoSuchElementException
  at java.util.Collections$EmptyIterator.next(Collections.java:4189)
  at 
kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
  at 
kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
  at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
  at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
(kafka.log.LogCleaner)
{noformat}
The following code does not seem to expect the controlBatch to be empty:


[jira] [Assigned] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty

2018-10-01 Thread Bob Barrett (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Barrett reassigned KAFKA-7467:
--

Assignee: Bob Barrett

> NoSuchElementException is raised because controlBatch is empty
> --
>
> Key: KAFKA-7467
> URL: https://issues.apache.org/jira/browse/KAFKA-7467
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Badai Aqrandista
>Assignee: Bob Barrett
>Priority: Minor
>
> Somehow, log cleaner died because of NoSuchElementException when it calls 
> onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 
> 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.util.NoSuchElementException
>   at java.util.Collections$EmptyIterator.next(Collections.java:4189)
>   at 
> kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
>   at 
> kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
>   at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
>   at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
>   at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>   at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>   at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>   at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
> {noformat}
>   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
> consumeAbortedTxnsUpTo(controlBatch.lastOffset)
> val controlRecord = controlBatch.iterator.next()
> val controlType = ControlRecordType.parse(controlRecord.key)
> val producerId = controlBatch.producerId
> {noformat}
> MemoryRecords.filterTo copies the original control attribute for empty 
> batches, which results in empty control batches. Trying to read the control 
> type of an empty batch causes the error.
> {noformat}
> else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
> if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
> throw new IllegalStateException("Empty batches are only 
> supported for magic v2 and above");
> 
> bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
> 
> DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, 
> batch.producerId(),
> batch.producerEpoch(), batch.baseSequence(), 
> batch.baseOffset(), batch.lastOffset(),
> batch.partitionLeaderEpoch(), batch.timestampType(), 
> batch.maxTimestamp(),
> batch.isTransactional(), batch.isControlBatch());
> filterResult.updateRetainedBatchMetadata(batch, 0, true);
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty

2018-10-01 Thread Bob Barrett (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Barrett updated KAFKA-7467:
---
Description: 
Somehow, log cleaner died because of NoSuchElementException when it calls 
onControlBatchRead:
{noformat}
[2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
__consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, 
discarding deletes. (kafka.log.LogCleaner)
[2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
(kafka.log.LogCleaner)
java.util.NoSuchElementException
  at java.util.Collections$EmptyIterator.next(Collections.java:4189)
  at 
kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
  at 
kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
  at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
  at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
(kafka.log.LogCleaner)
{noformat}
The following code does not seem to expect the controlBatch to be empty:

[https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946]
{noformat}
  def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
consumeAbortedTxnsUpTo(controlBatch.lastOffset)

val controlRecord = controlBatch.iterator.next()
val controlType = ControlRecordType.parse(controlRecord.key)
val producerId = controlBatch.producerId
{noformat}
MemoryRecords.filterTo copies the original control attribute for empty batches, 
which results in empty control batches. Trying to read the control type of an 
empty batch causes the error.
{noformat}
else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
throw new IllegalStateException("Empty batches are only 
supported for magic v2 and above");


bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);

DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, 
batch.producerId(),
batch.producerEpoch(), batch.baseSequence(), 
batch.baseOffset(), batch.lastOffset(),
batch.partitionLeaderEpoch(), batch.timestampType(), 
batch.maxTimestamp(),
batch.isTransactional(), batch.isControlBatch());
filterResult.updateRetainedBatchMetadata(batch, 0, true);
{noformat}

  was:
Somehow, log cleaner died because of NoSuchElementException when it calls 
onControlBatchRead:

{noformat}
[2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
__consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, 
discarding deletes. (kafka.log.LogCleaner)
[2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
(kafka.log.LogCleaner)
java.util.NoSuchElementException
  at java.util.Collections$EmptyIterator.next(Collections.java:4189)
  at 
kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
  at 
kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
  at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
  at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
(kafka.log.LogCleaner)
{noformat}

The 

[jira] [Commented] (KAFKA-4218) Enable access to key in ValueTransformer

2018-10-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634781#comment-16634781
 ] 

Matthias J. Sax commented on KAFKA-4218:


[~lindong] There was no progress on KIP-149 for 2.1 release.

> Enable access to key in ValueTransformer
> 
>
> Key: KAFKA-4218
> URL: https://issues.apache.org/jira/browse/KAFKA-4218
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>Priority: Major
>  Labels: api, kip
> Fix For: 1.1.0
>
>
> While transforming values via {{KStream.transformValues}} and 
> {{ValueTransformer}}, the key associated with the value may be needed, even 
> if it is not changed.  For instance, it may be used to access stores.  
> As of now, the key is not available within these methods and interfaces, 
> leading to the use of {{KStream.transform}} and {{Transformer}}, and the 
> unnecessary creation of new {{KeyValue}} objects.
> KIP-149: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7320) Provide ability to disable auto topic creation in KafkaConsumer

2018-10-01 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634780#comment-16634780
 ] 

Dhruvil Shah commented on KAFKA-7320:
-

PR is ready for review: https://github.com/apache/kafka/pull/5542

> Provide ability to disable auto topic creation in KafkaConsumer
> ---
>
> Key: KAFKA-7320
> URL: https://issues.apache.org/jira/browse/KAFKA-7320
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> Consumers should have a configuration to control whether subscribing to 
> non-existent topics should automatically create the topic or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7456) Serde Inheritance in Streams DSL

2018-10-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7456.

Resolution: Fixed

> Serde Inheritance in Streams DSL
> 
>
> Key: KAFKA-7456
> URL: https://issues.apache.org/jira/browse/KAFKA-7456
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.1.0
>
>
> This is a prerequisite for further topology optimization in the Streams DSL: 
> we should let different operators inside the DSL to be able to pass along key 
> and value serdes if they are not explicitly specified by users. The serde 
> specification precedence should generally be:
> 1) Overridden values via control objects (e.g. Materialized, Serialized, 
> Consumed, etc)
> 2) Serdes that can be inferred from the operator itself (e.g. 
> groupBy().count(), where value serde can default to `LongSerde`).
> 3) Serde inherited from parent operator if possible (note if the key / value 
> types have been changed, then the corresponding serde cannot be inherited).
> 4) Default serde specified in the config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7456) Serde Inheritance in Streams DSL

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634766#comment-16634766
 ] 

ASF GitHub Bot commented on KAFKA-7456:
---

mjsax closed pull request #5521: KAFKA-7456: Serde Inheritance in DSL
URL: https://github.com/apache/kafka/pull/5521
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
index 6a851a10d9d..2474860a049 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
@@ -24,7 +24,7 @@
 static public class TimeWindowedSerde extends 
Serdes.WrapperSerde> {
 // Default constructor needed for reflection object creation
 public TimeWindowedSerde() {
-super(new TimeWindowedSerializer(), new 
TimeWindowedDeserializer());
+super(new TimeWindowedSerializer<>(), new 
TimeWindowedDeserializer<>());
 }
 
 public TimeWindowedSerde(final Serde inner) {
@@ -35,7 +35,7 @@ public TimeWindowedSerde(final Serde inner) {
 static public class SessionWindowedSerde extends 
Serdes.WrapperSerde> {
 // Default constructor needed for reflection object creation
 public SessionWindowedSerde() {
-super(new SessionWindowedSerializer(), new 
SessionWindowedDeserializer());
+super(new SessionWindowedSerializer<>(), new 
SessionWindowedDeserializer<>());
 }
 
 public SessionWindowedSerde(final Serde inner) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index a0724ebc6d6..e8707513a07 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
@@ -31,32 +32,48 @@
 import java.util.Objects;
 import java.util.Set;
 
-public abstract class AbstractStream {
+/*
+ * Any classes (KTable, KStream, etc) extending this class should follow the 
serde specification precedence ordering as:
+ *
+ * 1) Overridden values via control objects (e.g. Materialized, Serialized, 
Consumed, etc)
+ * 2) Serdes that can be inferred from the operator itself (e.g. 
groupBy().count(), where value serde can default to `LongSerde`).
+ * 3) Serde inherited from parent operator if possible (note if the key / 
value types have been changed, then the corresponding serde cannot be 
inherited).
+ * 4) Default serde specified in the config.
+ */
+public abstract class AbstractStream {
 
-protected final InternalStreamsBuilder builder;
 protected final String name;
+protected final Serde keySerde;
+protected final Serde valSerde;
 protected final Set sourceNodes;
 protected final StreamsGraphNode streamsGraphNode;
+protected final InternalStreamsBuilder builder;
 
 // This copy-constructor will allow to extend KStream
 // and KTable APIs with new methods without impacting the public interface.
-public AbstractStream(final AbstractStream stream) {
-this.builder = stream.builder;
+public AbstractStream(final AbstractStream stream) {
 this.name = stream.name;
+this.builder = stream.builder;
+this.keySerde = stream.keySerde;
+this.valSerde = stream.valSerde;
 this.sourceNodes = stream.sourceNodes;
 this.streamsGraphNode = stream.streamsGraphNode;
 }
 
-AbstractStream(final InternalStreamsBuilder builder,
-   final String name,
+AbstractStream(final String name,
+   final Serde keySerde,
+   final Serde valSerde,
final Set sourceNodes,
-   final StreamsGraphNode streamsGraphNode) {
+   final StreamsGraphNode streamsGraphNode,
+   final InternalStreamsBuilder builder) {
 if (sourceNodes == null || sourceNodes.isEmpty()) {
 throw new IllegalArgumentException("parameter  must 
not be null or empty");
 }
 
-this.builder = builder;
 this.name = name;
+this.builder = builder;
+this.keySerde = keySerde;
+   

[jira] [Reopened] (KAFKA-4218) Enable access to key in ValueTransformer

2018-10-01 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reopened KAFKA-4218:
-

Thanks for working on this [~jeyhunkarimov]!

For KIP-149, currently there are 4 PRs:

- [https://github.com/apache/kafka/pull/3570]

- [https://github.com/apache/kafka/pull/3599]

- [https://github.com/apache/kafka/pull/3600]

- [https://github.com/apache/kafka/pull/3601]

And 3 JIRAs:

- KAFKA-4218

- KAFKA-3745

- KAFKA-4726

Can we create an umbrella Jira for KIP-149 to include these PRs and JIRAs, so 
that we can easily reference and track the entire progress for KIP-149 in e.g. 
Apache Kafka 2.1.0 release 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=91554044?

 

> Enable access to key in ValueTransformer
> 
>
> Key: KAFKA-4218
> URL: https://issues.apache.org/jira/browse/KAFKA-4218
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>Priority: Major
>  Labels: api, kip
> Fix For: 1.1.0
>
>
> While transforming values via {{KStream.transformValues}} and 
> {{ValueTransformer}}, the key associated with the value may be needed, even 
> if it is not changed.  For instance, it may be used to access stores.  
> As of now, the key is not available within these methods and interfaces, 
> leading to the use of {{KStream.transform}} and {{Transformer}}, and the 
> unnecessary creation of new {{KeyValue}} objects.
> KIP-149: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7276) Consider using re2j to speed up regex operations

2018-10-01 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634727#comment-16634727
 ] 

Ted Yu commented on KAFKA-7276:
---

For #1, see the following:
http://hadoop.apache.org/docs/r3.0.2/api/org/apache/hadoop/metrics2/filter/GlobFilter.html#compile-java.lang.String-

It is used by hadoop.



> Consider using re2j to speed up regex operations
> 
>
> Key: KAFKA-7276
> URL: https://issues.apache.org/jira/browse/KAFKA-7276
> Project: Kafka
>  Issue Type: Task
>  Components: packaging
>Reporter: Ted Yu
>Assignee: kevin.chen
>Priority: Major
>
> https://github.com/google/re2j
> re2j claims to do linear time regular expression matching in Java.
> Its benefit is most obvious for deeply nested regex (such as a | b | c | d).
> We should consider using re2j to speed up regex operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread

2018-10-01 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333251#comment-16333251
 ] 

Ted Yu edited comment on KAFKA-6303 at 10/1/18 10:42 PM:
-

+1 from me.


was (Author: yuzhih...@gmail.com):
+1 from me .

> Potential lack of synchronization in NioEchoServer#AcceptorThread
> -
>
> Key: KAFKA-6303
> URL: https://issues.apache.org/jira/browse/KAFKA-6303
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Ted Yu
>Assignee: siva santhalingam
>Priority: Minor
>
> In the run() method:
> {code}
> SocketChannel socketChannel = 
> ((ServerSocketChannel) key.channel()).accept();
> socketChannel.configureBlocking(false);
> newChannels.add(socketChannel);
> {code}
> Modification to newChannels should be protected by synchronized block.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4436) Provide builder pattern for StreamsConfig

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634702#comment-16634702
 ] 

ASF GitHub Bot commented on KAFKA-4436:
---

wykapedia opened a new pull request #5722: [WIP] KAFKA-4436: Configurtion 
Builders
URL: https://github.com/apache/kafka/pull/5722
 
 
   - Added `ProducerConfigBuilder` to `ProducerConfig` to add easier dynamic 
creation of `ProducerConfig` properties `Map`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide builder pattern for StreamsConfig
> -
>
> Key: KAFKA-4436
> URL: https://issues.apache.org/jira/browse/KAFKA-4436
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Currently, {{StreamsConfig}} parameters must be set "manually" as key value 
> pairs. This has multiple disadvantages from a user point of view:
>  - mandatory arguments could be missing
>  - data types might be wrong
>  - producer/consumer config parameters could conflict as they might have the 
> same name (user needs to know to prefix them to avoid conflict)
> Those problems have different impact: either a runtime exception is thrown if 
> the problem is detected (e.g. missing parameter or wrong type) or the 
> application is just not configured correctly (producer/consumer has wrong 
> config).
> A builder pattern would avoid those problems by forcing the user in the first 
> place to specify thing correctly (otherwise, it won't compile). For example 
> something like this:
> {noformat}
> StreamsConfig config = StreamsConfig.builder()
> .setApplicationId(String appId)
> .addBootstrapServer(String host, int port)
> .addBootstrapServer(String host, int port)
> .addZookeeper(String host, int port)
> .addZookeeper(String host, int port)
> .setStateDirectory(File path)
> .setConsumerConfig(
> ConsumerConfig.builder()
> .setAutoOffsetReset(...)
> .build()
> )
> .build();
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7465) kafka-topics.sh command not honouring --disable-rack-aware property when adding partitions.

2018-10-01 Thread Suman B N (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634618#comment-16634618
 ] 

Suman B N commented on KAFKA-7465:
--

Fixed. Pull request is [here|https://github.com/apache/kafka/pull/5721].

> kafka-topics.sh command not honouring --disable-rack-aware property when 
> adding partitions.
> ---
>
> Key: KAFKA-7465
> URL: https://issues.apache.org/jira/browse/KAFKA-7465
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0, 1.0.0, 1.1.0, 2.0.0
>Reporter: Suman B N
>Priority: Minor
> Fix For: 2.0.1
>
>
> kafka-topics.sh command not honouring --disable-rack-aware property when 
> adding partitions.
> Create topic honours --disable-rack-aware property. Where as alter topic 
> always honours default rackAwarMode(Enforced) to add partitions.
> Steps:
>  * Start brokers. 
>     0 -> r1, 1 -> r1, 2 -> r2, 3 -> r2
>  * Create topic _topic1_ with 8 partitions and 2 RF. No partition should have 
> replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be 
> created. Create topic _topic2_ with 8 partitions and 2 RF with 
> --disable-rack-aware. Partition can have replicas on same rack brokers. Ex: 
> 0,1 and 2,3 replica if present is acceptable.
>  * Add 8 more partitions to _topic1._ No newly added partition should have 
> replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be 
> created.
>  * Add 8 more partitions to _topic2_ with --disable-rack-aware_._ Newly added 
> partition can have replicas on same rack brokers. Ex: 0,1 and 2,3 replica if 
> present is acceptable. Try repeating this step, no matter what, partitions 
> are always rack-aware.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty

2018-10-01 Thread Badai Aqrandista (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Badai Aqrandista updated KAFKA-7467:

Priority: Minor  (was: Major)

> NoSuchElementException is raised because controlBatch is empty
> --
>
> Key: KAFKA-7467
> URL: https://issues.apache.org/jira/browse/KAFKA-7467
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Badai Aqrandista
>Priority: Minor
>
> Somehow, log cleaner died because of NoSuchElementException when it calls 
> onControlBatchRead:
> {noformat}
> [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 
> 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.util.NoSuchElementException
>   at java.util.Collections$EmptyIterator.next(Collections.java:4189)
>   at 
> kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
>   at 
> kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
>   at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
>   at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
>   at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
>   at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
>   at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>   at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>   at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>   at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> {noformat}
> The following code does not seem to expect the controlBatch to be empty:
> https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946
> {noformat}
>   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
> consumeAbortedTxnsUpTo(controlBatch.lastOffset)
> val controlRecord = controlBatch.iterator.next()
> val controlType = ControlRecordType.parse(controlRecord.key)
> val producerId = controlBatch.producerId
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty

2018-10-01 Thread Badai Aqrandista (JIRA)
Badai Aqrandista created KAFKA-7467:
---

 Summary: NoSuchElementException is raised because controlBatch is 
empty
 Key: KAFKA-7467
 URL: https://issues.apache.org/jira/browse/KAFKA-7467
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.1.0
Reporter: Badai Aqrandista


Somehow, log cleaner died because of NoSuchElementException when it calls 
onControlBatchRead:

{noformat}
[2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log 
__consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, 
discarding deletes. (kafka.log.LogCleaner)
[2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to 
(kafka.log.LogCleaner)
java.util.NoSuchElementException
  at java.util.Collections$EmptyIterator.next(Collections.java:4189)
  at 
kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
  at 
kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
  at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
  at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
  at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
  at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped 
(kafka.log.LogCleaner)
{noformat}

The following code does not seem to expect the controlBatch to be empty:

https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946

{noformat}
  def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
consumeAbortedTxnsUpTo(controlBatch.lastOffset)

val controlRecord = controlBatch.iterator.next()
val controlType = ControlRecordType.parse(controlRecord.key)
val producerId = controlBatch.producerId
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7465) kafka-topics.sh command not honouring --disable-rack-aware property when adding partitions.

2018-10-01 Thread Suman B N (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634538#comment-16634538
 ] 

Suman B N commented on KAFKA-7465:
--

Working on this fix. Please assgin the jira to me. [~cmccabe] [~omkreddy]

> kafka-topics.sh command not honouring --disable-rack-aware property when 
> adding partitions.
> ---
>
> Key: KAFKA-7465
> URL: https://issues.apache.org/jira/browse/KAFKA-7465
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0, 1.0.0, 1.1.0, 2.0.0
>Reporter: Suman B N
>Priority: Minor
> Fix For: 2.0.1
>
>
> kafka-topics.sh command not honouring --disable-rack-aware property when 
> adding partitions.
> Create topic honours --disable-rack-aware property. Where as alter topic 
> always honours default rackAwarMode(Enforced) to add partitions.
> Steps:
>  * Start brokers. 
>     0 -> r1, 1 -> r1, 2 -> r2, 3 -> r2
>  * Create topic _topic1_ with 8 partitions and 2 RF. No partition should have 
> replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be 
> created. Create topic _topic2_ with 8 partitions and 2 RF with 
> --disable-rack-aware. Partition can have replicas on same rack brokers. Ex: 
> 0,1 and 2,3 replica if present is acceptable.
>  * Add 8 more partitions to _topic1._ No newly added partition should have 
> replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be 
> created.
>  * Add 8 more partitions to _topic2_ with --disable-rack-aware_._ Newly added 
> partition can have replicas on same rack brokers. Ex: 0,1 and 2,3 replica if 
> present is acceptable. Try repeating this step, no matter what, partitions 
> are always rack-aware.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7465) kafka-topics.sh command not honouring --disable-rack-aware property when adding partitions.

2018-10-01 Thread Suman B N (JIRA)
Suman B N created KAFKA-7465:


 Summary: kafka-topics.sh command not honouring 
--disable-rack-aware property when adding partitions.
 Key: KAFKA-7465
 URL: https://issues.apache.org/jira/browse/KAFKA-7465
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.0.0, 1.1.0, 1.0.0, 0.11.0.0
Reporter: Suman B N
 Fix For: 2.0.1


kafka-topics.sh command not honouring --disable-rack-aware property when adding 
partitions.

Create topic honours --disable-rack-aware property. Where as alter topic always 
honours default rackAwarMode(Enforced) to add partitions.

Steps:
 * Start brokers. 
    0 -> r1, 1 -> r1, 2 -> r2, 3 -> r2
 * Create topic _topic1_ with 8 partitions and 2 RF. No partition should have 
replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be created. 
Create topic _topic2_ with 8 partitions and 2 RF with --disable-rack-aware. 
Partition can have replicas on same rack brokers. Ex: 0,1 and 2,3 replica if 
present is acceptable.
 * Add 8 more partitions to _topic1._ No newly added partition should have 
replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be created.
 * Add 8 more partitions to _topic2_ with --disable-rack-aware_._ Newly added 
partition can have replicas on same rack brokers. Ex: 0,1 and 2,3 replica if 
present is acceptable. Try repeating this step, no matter what, partitions are 
always rack-aware.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6914) Kafka Connect - Plugins class should have a constructor that can take in parent ClassLoader

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634495#comment-16634495
 ] 

ASF GitHub Bot commented on KAFKA-6914:
---

kkonstantine opened a new pull request #5720: KAFKA-6914: Set parent 
classloader of DelegatingClassLoader same as the worker's
URL: https://github.com/apache/kafka/pull/5720
 
 
   The parent classloader of the DelegatingClassLoader and therefore the 
classloading scheme used by Connect does not have to be fixed to the System 
classloader.
   
   Setting it the same as the one that was used to load the 
DelegatingClassLoader class itself is more flexible and, while in most cases 
will result in the System classloader to be used, it will also work in othr 
managed environments that control classloading differently (OSGi, and others).
   
   The fix is minimal and the mainstream use is tested via system tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect - Plugins class should have a constructor that can take in 
> parent ClassLoader
> ---
>
> Key: KAFKA-6914
> URL: https://issues.apache.org/jira/browse/KAFKA-6914
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sriram KS
>Assignee: Konstantine Karantasis
>Priority: Minor
>
> Currently Plugins class has a single constructor that takes in map of props.
> Please make Plugin class to have a constructor that takes in a classLoader as 
> well and use it to set DelegationClassLoader's parent classLoader.
> Reason:
> This will be useful if i am already having a managed class Loader environment 
> like a Spring boot app which resolves my class dependencies using my 
> maven/gradle dependency management.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown

2018-10-01 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7464:
---
Affects Version/s: 2.0.0

> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --
>
> Key: KAFKA-7464
> URL: https://issues.apache.org/jira/browse/KAFKA-7464
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Critical
> Fix For: 2.0.1, 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the 
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
> at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
> at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
> ~[?:1.8.0_121]
> at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to 
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
> ~[?:1.8.0_121]
> at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
> ~[?:1.8.0_121]
> at 
> 

[jira] [Updated] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown

2018-10-01 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7464:
---
Priority: Critical  (was: Major)

> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --
>
> Key: KAFKA-7464
> URL: https://issues.apache.org/jira/browse/KAFKA-7464
> Project: Kafka
>  Issue Type: Bug
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Critical
> Fix For: 2.0.1, 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the 
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
> at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
> at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
> ~[?:1.8.0_121]
> at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to 
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
> ~[?:1.8.0_121]
> at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
> ~[?:1.8.0_121]
> at 
> 

[jira] [Updated] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown

2018-10-01 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7464:
---
Fix Version/s: 2.0.1

> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --
>
> Key: KAFKA-7464
> URL: https://issues.apache.org/jira/browse/KAFKA-7464
> Project: Kafka
>  Issue Type: Bug
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Critical
> Fix For: 2.0.1, 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the 
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
> at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
> at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
> ~[?:1.8.0_121]
> at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to 
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
> ~[?:1.8.0_121]
> at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
> ~[?:1.8.0_121]
> at 
> 

[jira] [Updated] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown

2018-10-01 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7464:
---
Fix Version/s: 2.1.0

> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --
>
> Key: KAFKA-7464
> URL: https://issues.apache.org/jira/browse/KAFKA-7464
> Project: Kafka
>  Issue Type: Bug
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the 
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
> at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
> at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
> ~[?:1.8.0_121]
> at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to 
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
> ~[?:1.8.0_121]
> at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
> ~[?:1.8.0_121]
> at 
> 

[jira] [Commented] (KAFKA-6051) ReplicaFetcherThread should close the ReplicaFetcherBlockingSend earlier on shutdown

2018-10-01 Thread Zhanxiang (Patrick) Huang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634331#comment-16634331
 ] 

Zhanxiang (Patrick) Huang commented on KAFKA-6051:
--

[~ijuma]

Sure. I have created KAFKA-7464 for the issue.

> ReplicaFetcherThread should close the ReplicaFetcherBlockingSend earlier on 
> shutdown
> 
>
> Key: KAFKA-6051
> URL: https://issues.apache.org/jira/browse/KAFKA-6051
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, 
> 0.10.2.1, 0.11.0.0
>Reporter: Maytee Chinavanichkit
>Assignee: Maytee Chinavanichkit
>Priority: Major
> Fix For: 1.1.0
>
>
> The ReplicaFetcherBlockingSend works as designed and will blocks until it is 
> able to get data. This becomes a problem when we are gracefully shutting down 
> a broker. The controller will attempt to shutdown the fetchers and elect new 
> leaders. When the last fetch of partition is removed, as part of the 
> {{replicaManager.becomeLeaderOrFollower}} call will proceed to shut down any 
> idle ReplicaFetcherThread. The shutdown process here can block up to until 
> the last fetch request completes. This blocking delay is a big problem 
> because the {{replicaStateChangeLock}}, and {{mapLock}} in 
> {{AbstractFetcherManager}} is still locked causing latency spikes on multiple 
> brokers.
> At this point in time, we do not need the last response as the fetcher is 
> shutting down. We should close the leaderEndpoint early during 
> {{initiateShutdown()}} instead of after {{super.shutdown()}}.
> For example we see here the shutdown blocked the broker from processing more 
> replica changes for ~500 ms 
> {code}
> [2017-09-01 18:11:42,879] INFO [ReplicaFetcherThread-0-2], Shutting down 
> (kafka.server.ReplicaFetcherThread) 
> [2017-09-01 18:11:43,314] INFO [ReplicaFetcherThread-0-2], Stopped 
> (kafka.server.ReplicaFetcherThread) 
> [2017-09-01 18:11:43,314] INFO [ReplicaFetcherThread-0-2], Shutdown completed 
> (kafka.server.ReplicaFetcherThread)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown

2018-10-01 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-7464:


 Summary: Fail to shutdown ReplicaManager during broker cleaned 
shutdown
 Key: KAFKA-7464
 URL: https://issues.apache.org/jira/browse/KAFKA-7464
 Project: Kafka
  Issue Type: Bug
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


In 2.0 deployment, we saw the following log when shutting down the 
ReplicaManager in broker cleaned shutdown:
{noformat}
2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
java.lang.IllegalArgumentException: null
at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
~[?:1.8.0_121]
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
 ~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
 ~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
~[kafka-clients-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
 ~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
~[scala-library-2.11.12.jar:?]
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
~[scala-library-2.11.12.jar:?]
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) 
~[scala-library-2.11.12.jar:?]
at 
kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
 ~[kafka_2.11-2.0.0.22.jar:?]
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
~[kafka_2.11-2.0.0.22.jar:?]
{noformat}
After that, we noticed that some of the replica fetcher thread fail to shutdown:
{noformat}
2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
[ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
~[?:1.8.0_121]
at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
~[?:1.8.0_121]
at 
org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) 
~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) 
~[kafka-clients-2.0.0.22.jar:?]
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) 
~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
~[kafka_2.11-2.0.0.22.jar:?]
at 

[jira] [Commented] (KAFKA-7463) ConsoleProducer rejects producer property values with equals signs

2018-10-01 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634303#comment-16634303
 ] 

Manikumar commented on KAFKA-7463:
--

This might have fixed in KAFKA-7388.

> ConsoleProducer rejects producer property values with equals signs
> --
>
> Key: KAFKA-7463
> URL: https://issues.apache.org/jira/browse/KAFKA-7463
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Magnus Reftel
>Priority: Major
>
> The --producer-property option of ConsoleProducer rejects proprty values with 
> equals signs with the message "Invalid command line properties". This makes 
> specifying SASL configuration via the the sasl.jaas.config property 
> difficult, since these values often contain equals signs. It's possible to 
> work around the issue by writing the SASL configuration to a separate file, 
> and then specifying it via
> {code:java}
> KAFKA_OPTS=-Djava.security.auth.login.config=$filename{code}
> but there seems to be no good reason to not allow the more direct route.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7445) Branch one Stream in multiple Streams

2018-10-01 Thread Dennis Reiter (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633953#comment-16633953
 ] 

Dennis Reiter commented on KAFKA-7445:
--

[~mjsax]: Thanks for the hint! I will give it a try.

[~guozhang]: I didn't want to change the branching logic, just using it as an 
example.

> Branch one Stream in multiple Streams
> -
>
> Key: KAFKA-7445
> URL: https://issues.apache.org/jira/browse/KAFKA-7445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Dennis Reiter
>Priority: Minor
>
> Hi,
> I need to branch/split KStreams in multiple independent KStreams. I thought, 
> {{org.apache.kafka.streams.kstream.internals.KStreamImpl#branch}} is the 
> right one but in fact, its designed for another purpose.
> In contrast to {{branch}} I need to assign the record to *all* matching 
> streams, not only one stream.
> Speaking in code 
> ({{org.apache.kafka.streams.kstream.internals.KStreamBranch}}):
> {code:java}
> if (predicates[i].test(key, value)) {
>// use forward with childIndex here
>// and pipe the record to multiple streams
>context().forward(key, value, i);
> }
> {code}
> My question: is this still possible with features already included in 
> Streams? Or shall I propose a change?
> Thanks in advance
> Dennis



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7458) Avoid enforced processing during bootstrap phase

2018-10-01 Thread Antony Stubbs (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633951#comment-16633951
 ] 

Antony Stubbs commented on KAFKA-7458:
--

Related to KAFKA-4113

> Avoid enforced processing during bootstrap phase
> 
>
> Key: KAFKA-7458
> URL: https://issues.apache.org/jira/browse/KAFKA-7458
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> In KAFKA-3514, we introduced a new config for allowing users to delay 
> enforcing processing without all input topic partitions to have data. This 
> config's default value is 0, which means that as long as the first fetch does 
> not contains some records for all the partitions it will fall into enforced 
> processing immediately, which is a high risk especially under bootstrap case.
> We should consider leveraging on pause / resume to make sure that upon 
> starting, some partition indeed does not have any data before we fall into 
> enforced processing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7463) ConsoleProducer rejects producer property values with equals signs

2018-10-01 Thread Magnus Reftel (JIRA)
Magnus Reftel created KAFKA-7463:


 Summary: ConsoleProducer rejects producer property values with 
equals signs
 Key: KAFKA-7463
 URL: https://issues.apache.org/jira/browse/KAFKA-7463
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Magnus Reftel


The --producer-property option of ConsoleProducer rejects proprty values with 
equals signs with the message "Invalid command line properties". This makes 
specifying SASL configuration via the the sasl.jaas.config property difficult, 
since these values often contain equals signs. It's possible to work around the 
issue by writing the SASL configuration to a separate file, and then specifying 
it via
{code:java}
KAFKA_OPTS=-Djava.security.auth.login.config=$filename{code}
but there seems to be no good reason to not allow the more direct route.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5682) Consumer should include partition in exceptions raised during record parsing/validation

2018-10-01 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-5682:
---
Fix Version/s: (was: 2.1.0)
   2.2.0

> Consumer should include partition in exceptions raised during record 
> parsing/validation
> ---
>
> Key: KAFKA-5682
> URL: https://issues.apache.org/jira/browse/KAFKA-5682
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Stanislav Kozlovski
>Priority: Minor
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> When we encounter an exception when validating a fetched record or when 
> deserializing it, we raise it to the user and keep the consumer's current 
> position at the offset of the failed record. The expectation is that the user 
> will either propagate the exception and shutdown or seek past the failed 
> record. However, in the latter case, there is no way for the user to know 
> which topic partition had the failed record. We should consider exposing an 
> exception type to expose this information which users can catch. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2018-10-01 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633793#comment-16633793
 ] 

Rajini Sivaram commented on KAFKA-7304:
---

[~yuyang08] We saw the OOM during a load test, so CPU usage would have been 
high anyway. With a large number of clients producing a large amount of 
traffic, we would expect high CPU usage with SSL. We haven't done any 
comparisons with other SSL implementations like OpenSSL though to see if we can 
improve on this.

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at 
> 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot 
> 2018-08-29 at 10.50.47 AM.png, Screen Shot 2018-09-29 at 10.38.12 PM.png, 
> Screen Shot 2018-09-29 at 10.38.38 PM.png, Screen Shot 2018-09-29 at 8.34.50 
> PM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7450) "Handshake message sequence violation" related ssl handshake failure leads to high cpu usage

2018-10-01 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633779#comment-16633779
 ] 

Rajini Sivaram commented on KAFKA-7450:
---

The exceptions indicate an SSL configuration issue. If this is an issue only 
for inter-broker communication and clients have been connecting successfully, 
then it would be worth checking truststore etc. Enabling javax.net.debug=ssl 
should help identify the cause if it is a configuration issue.

> "Handshake message sequence violation" related ssl handshake failure leads to 
> high cpu usage
> 
>
> Key: KAFKA-7450
> URL: https://issues.apache.org/jira/browse/KAFKA-7450
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Yu Yang
>Priority: Major
>
> After updating security.inter.broker.protocol to SSL for our cluster, we 
> observed that the controller can get into almost 100% cpu usage. 
> {code}
> listeners=PLAINTEXT://:9092,SSL://:9093
> security.inter.broker.protocol=SSL
> {code}
> There is no obvious error in server.log. But in controller.log, there is 
> repetitive SSL handshare failure error as below: 
> {code}
> [2018-09-28 05:53:10,821] WARN [RequestSendThread controllerId=6042] 
> Controller 6042's connection to broker datakafka06176.ec2.pin220.com:9093 
> (id: 6176 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
> Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence 
> violation, 2
> at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1487)
> at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
> at 
> sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:813)
> at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:781)
> at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624)
> at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:468)
> at 
> org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:331)
> at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
> at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
> at 
> kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279)
> at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence 
> violation, 2
> at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:196)
> at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
> at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
> at sun.security.ssl.Handshaker$1.run(Handshaker.java:963)
> at java.security.AccessController.doPrivileged(Native Method)
> at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416)
> at 
> org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393)
> at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:473)
> ... 10 more
> {code}
> {code}
> [2018-09-30 00:30:13,609] WARN [ReplicaFetcher replicaId=59, leaderId=66, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=59, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={the_test_topic-18=(offset=462333447, logStartOffset=462286948, 
> maxBytes=4194304), the_test_topic-58=(offset=462312762, 
> logStartOffset=462295078, maxBytes=4194304)}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1991153671, 
> epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
> Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence 
> violation, 2
> at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1538)
> at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
> at sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:813)
> at 

[jira] [Created] (KAFKA-7462) Kafka brokers cannot provide OAuth without a token

2018-10-01 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-7462:
-

 Summary: Kafka brokers cannot provide OAuth without a token
 Key: KAFKA-7462
 URL: https://issues.apache.org/jira/browse/KAFKA-7462
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 2.0.0
Reporter: Rajini Sivaram
 Fix For: 2.1.0


Like with all other SASL mechanisms, OAUTHBEARER uses the same LoginModule 
class on both  server-side and the client-side. But unlike PLAIN or SCRAM where 
client credentials are optional, OAUTHBEARER requires always requires a token. 
So while with PLAIN/SCRAM, broker only needs to specify client credentials if 
the mechanism is used for inter-broker communication, with OAuth, broker 
requires client credentials even if OAuth is not used for inter-broker 
communication. This is an issue with the default 
`OAuthBearerUnsecuredLoginCallbackHandler` used on both client-side and 
server-side. But more critically, it is an issue with `OAuthBearerLoginModule` 
which doesn't commit if token == null (commit() returns false).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5836) Kafka Streams - API for specifying internal stream name on join

2018-10-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5836.

Resolution: Duplicate

> Kafka Streams - API for specifying internal stream name on join
> ---
>
> Key: KAFKA-5836
> URL: https://issues.apache.org/jira/browse/KAFKA-5836
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Lovro Pandžić
>Priority: Major
>  Labels: api, needs-kip
>
> Automatic topic name can be problematic in case of streams operation 
> change/migration.
> I'd like to be able to specify name of an internal topic so I can avoid 
> creation of new stream and data "loss" when changing the Stream building.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)