[jira] [Created] (KAFKA-7468) KafkaController no need to startup the inner kafkaScheducer when auto.leader.rebalance.enable is false
ZhangShuai created KAFKA-7468: - Summary: KafkaController no need to startup the inner kafkaScheducer when auto.leader.rebalance.enable is false Key: KAFKA-7468 URL: https://issues.apache.org/jira/browse/KAFKA-7468 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 2.0.0 Reporter: ZhangShuai KafkaController has its inner member KafkaScheduler instance to schedule auto leader rebalance tasks. But when the auto rebalance config set false, no need to startup the kafkaScheduler, because that created a non-used ScheduledThreadPoolExecutor with one thread. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value
[ https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benjamin Bianchi updated KAFKA-3832: Comment: was deleted (was: Hi, I'd like to try this ticket!) > Kafka Connect's JSON Converter never outputs a null value > - > > Key: KAFKA-3832 > URL: https://issues.apache.org/jira/browse/KAFKA-3832 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Assignee: Prasanna Subburaj >Priority: Major > Labels: newbie > > Kafka Connect's JSON Converter will never output a null value when > {{enableSchemas=true}}. This means that when a connector outputs a > {{SourceRecord}} with a null value, the JSON Converter will always produce a > message value with: > {code:javascript} > { > "schema": null, > "payload": null > } > {code} > And, this means that while Kafka log compaction will always be able to remove > earlier messages with the same key, log compaction will never remove _all_ of > the messages with the same key. > The JSON Connector's {{fromConnectData(...)}} should always return null when > it is supplied a null value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value
[ https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634894#comment-16634894 ] Benjamin Bianchi commented on KAFKA-3832: - Hi, I'd like to try this ticket! > Kafka Connect's JSON Converter never outputs a null value > - > > Key: KAFKA-3832 > URL: https://issues.apache.org/jira/browse/KAFKA-3832 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Assignee: Prasanna Subburaj >Priority: Major > Labels: newbie > > Kafka Connect's JSON Converter will never output a null value when > {{enableSchemas=true}}. This means that when a connector outputs a > {{SourceRecord}} with a null value, the JSON Converter will always produce a > message value with: > {code:javascript} > { > "schema": null, > "payload": null > } > {code} > And, this means that while Kafka log compaction will always be able to remove > earlier messages with the same key, log compaction will never remove _all_ of > the messages with the same key. > The JSON Connector's {{fromConnectData(...)}} should always return null when > it is supplied a null value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty
[ https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634878#comment-16634878 ] Ismael Juma commented on KAFKA-7467: How come this is considered minor? It seems severe since it kills the cleaner. > NoSuchElementException is raised because controlBatch is empty > -- > > Key: KAFKA-7467 > URL: https://issues.apache.org/jira/browse/KAFKA-7467 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0 >Reporter: Badai Aqrandista >Assignee: Bob Barrett >Priority: Minor > > Somehow, log cleaner died because of NoSuchElementException when it calls > onControlBatchRead: > {noformat} > [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log > __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into > 0, discarding deletes. (kafka.log.LogCleaner) > [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to > (kafka.log.LogCleaner) > java.util.NoSuchElementException > at java.util.Collections$EmptyIterator.next(Collections.java:4189) > at > kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) > at > kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) > at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:392) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped > (kafka.log.LogCleaner) > {noformat} > The following code does not seem to expect the controlBatch to be empty: > [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946] > {noformat} > def onControlBatchRead(controlBatch: RecordBatch): Boolean = { > consumeAbortedTxnsUpTo(controlBatch.lastOffset) > val controlRecord = controlBatch.iterator.next() > val controlType = ControlRecordType.parse(controlRecord.key) > val producerId = controlBatch.producerId > {noformat} > MemoryRecords.filterTo copies the original control attribute for empty > batches, which results in empty control batches. Trying to read the control > type of an empty batch causes the error. > {noformat} > else if (batchRetention == BatchRetention.RETAIN_EMPTY) { > if (batchMagic < RecordBatch.MAGIC_VALUE_V2) > throw new IllegalStateException("Empty batches are only supported for > magic v2 and above"); > > bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); > DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), > batchMagic, batch.producerId(), > batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), > batch.lastOffset(), > batch.partitionLeaderEpoch(), batch.timestampType(), > batch.maxTimestamp(), > batch.isTransactional(), batch.isControlBatch()); > filterResult.updateRetainedBatchMetadata(batch, 0, true); > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty
[ https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Barrett updated KAFKA-7467: --- Description: Somehow, log cleaner died because of NoSuchElementException when it calls onControlBatchRead: {noformat} [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, discarding deletes. (kafka.log.LogCleaner) [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner) java.util.NoSuchElementException at java.util.Collections$EmptyIterator.next(Collections.java:4189) at kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) at kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) at scala.collection.immutable.List.foreach(List.scala:392) at kafka.log.Cleaner.doClean(LogCleaner.scala:461) at kafka.log.Cleaner.clean(LogCleaner.scala:438) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner) {noformat} The following code does not seem to expect the controlBatch to be empty: [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946] {noformat} def onControlBatchRead(controlBatch: RecordBatch): Boolean = { consumeAbortedTxnsUpTo(controlBatch.lastOffset) val controlRecord = controlBatch.iterator.next() val controlType = ControlRecordType.parse(controlRecord.key) val producerId = controlBatch.producerId {noformat} MemoryRecords.filterTo copies the original control attribute for empty batches, which results in empty control batches. Trying to read the control type of an empty batch causes the error. {noformat} else if (batchRetention == BatchRetention.RETAIN_EMPTY) { if (batchMagic < RecordBatch.MAGIC_VALUE_V2) throw new IllegalStateException("Empty batches are only supported for magic v2 and above"); bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, batch.producerId(), batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(), batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(), batch.isTransactional(), batch.isControlBatch()); filterResult.updateRetainedBatchMetadata(batch, 0, true); {noformat} was: Somehow, log cleaner died because of NoSuchElementException when it calls onControlBatchRead: {noformat} [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, discarding deletes. (kafka.log.LogCleaner) [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner) java.util.NoSuchElementException at java.util.Collections$EmptyIterator.next(Collections.java:4189) at kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) at kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) at scala.collection.immutable.List.foreach(List.scala:392) at kafka.log.Cleaner.doClean(LogCleaner.scala:461) at kafka.log.Cleaner.clean(LogCleaner.scala:438) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner) {noformat} The following code does not seem to expect the controlBatch to be empty:
[jira] [Assigned] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty
[ https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Barrett reassigned KAFKA-7467: -- Assignee: Bob Barrett > NoSuchElementException is raised because controlBatch is empty > -- > > Key: KAFKA-7467 > URL: https://issues.apache.org/jira/browse/KAFKA-7467 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0 >Reporter: Badai Aqrandista >Assignee: Bob Barrett >Priority: Minor > > Somehow, log cleaner died because of NoSuchElementException when it calls > onControlBatchRead: > {noformat} > [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log > __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into > 0, discarding deletes. (kafka.log.LogCleaner) > [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to > (kafka.log.LogCleaner) > java.util.NoSuchElementException > at java.util.Collections$EmptyIterator.next(Collections.java:4189) > at > kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) > at > kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) > at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:392) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped > (kafka.log.LogCleaner) > {noformat} > The following code does not seem to expect the controlBatch to be empty: > [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946] > {noformat} > def onControlBatchRead(controlBatch: RecordBatch): Boolean = { > consumeAbortedTxnsUpTo(controlBatch.lastOffset) > val controlRecord = controlBatch.iterator.next() > val controlType = ControlRecordType.parse(controlRecord.key) > val producerId = controlBatch.producerId > {noformat} > MemoryRecords.filterTo copies the original control attribute for empty > batches, which results in empty control batches. Trying to read the control > type of an empty batch causes the error. > {noformat} > else if (batchRetention == BatchRetention.RETAIN_EMPTY) { > if (batchMagic < RecordBatch.MAGIC_VALUE_V2) > throw new IllegalStateException("Empty batches are only > supported for magic v2 and above"); > > bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); > > DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, > batch.producerId(), > batch.producerEpoch(), batch.baseSequence(), > batch.baseOffset(), batch.lastOffset(), > batch.partitionLeaderEpoch(), batch.timestampType(), > batch.maxTimestamp(), > batch.isTransactional(), batch.isControlBatch()); > filterResult.updateRetainedBatchMetadata(batch, 0, true); > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty
[ https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Barrett updated KAFKA-7467: --- Description: Somehow, log cleaner died because of NoSuchElementException when it calls onControlBatchRead: {noformat} [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, discarding deletes. (kafka.log.LogCleaner) [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner) java.util.NoSuchElementException at java.util.Collections$EmptyIterator.next(Collections.java:4189) at kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) at kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) at scala.collection.immutable.List.foreach(List.scala:392) at kafka.log.Cleaner.doClean(LogCleaner.scala:461) at kafka.log.Cleaner.clean(LogCleaner.scala:438) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner) {noformat} The following code does not seem to expect the controlBatch to be empty: [https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946] {noformat} def onControlBatchRead(controlBatch: RecordBatch): Boolean = { consumeAbortedTxnsUpTo(controlBatch.lastOffset) val controlRecord = controlBatch.iterator.next() val controlType = ControlRecordType.parse(controlRecord.key) val producerId = controlBatch.producerId {noformat} MemoryRecords.filterTo copies the original control attribute for empty batches, which results in empty control batches. Trying to read the control type of an empty batch causes the error. {noformat} else if (batchRetention == BatchRetention.RETAIN_EMPTY) { if (batchMagic < RecordBatch.MAGIC_VALUE_V2) throw new IllegalStateException("Empty batches are only supported for magic v2 and above"); bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, batch.producerId(), batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(), batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(), batch.isTransactional(), batch.isControlBatch()); filterResult.updateRetainedBatchMetadata(batch, 0, true); {noformat} was: Somehow, log cleaner died because of NoSuchElementException when it calls onControlBatchRead: {noformat} [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, discarding deletes. (kafka.log.LogCleaner) [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner) java.util.NoSuchElementException at java.util.Collections$EmptyIterator.next(Collections.java:4189) at kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) at kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) at scala.collection.immutable.List.foreach(List.scala:392) at kafka.log.Cleaner.doClean(LogCleaner.scala:461) at kafka.log.Cleaner.clean(LogCleaner.scala:438) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner) {noformat} The
[jira] [Commented] (KAFKA-4218) Enable access to key in ValueTransformer
[ https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634781#comment-16634781 ] Matthias J. Sax commented on KAFKA-4218: [~lindong] There was no progress on KIP-149 for 2.1 release. > Enable access to key in ValueTransformer > > > Key: KAFKA-4218 > URL: https://issues.apache.org/jira/browse/KAFKA-4218 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Jeyhun Karimov >Priority: Major > Labels: api, kip > Fix For: 1.1.0 > > > While transforming values via {{KStream.transformValues}} and > {{ValueTransformer}}, the key associated with the value may be needed, even > if it is not changed. For instance, it may be used to access stores. > As of now, the key is not available within these methods and interfaces, > leading to the use of {{KStream.transform}} and {{Transformer}}, and the > unnecessary creation of new {{KeyValue}} objects. > KIP-149: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7320) Provide ability to disable auto topic creation in KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-7320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634780#comment-16634780 ] Dhruvil Shah commented on KAFKA-7320: - PR is ready for review: https://github.com/apache/kafka/pull/5542 > Provide ability to disable auto topic creation in KafkaConsumer > --- > > Key: KAFKA-7320 > URL: https://issues.apache.org/jira/browse/KAFKA-7320 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Dhruvil Shah >Assignee: Dhruvil Shah >Priority: Major > > Consumers should have a configuration to control whether subscribing to > non-existent topics should automatically create the topic or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7456) Serde Inheritance in Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-7456. Resolution: Fixed > Serde Inheritance in Streams DSL > > > Key: KAFKA-7456 > URL: https://issues.apache.org/jira/browse/KAFKA-7456 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.1.0 > > > This is a prerequisite for further topology optimization in the Streams DSL: > we should let different operators inside the DSL to be able to pass along key > and value serdes if they are not explicitly specified by users. The serde > specification precedence should generally be: > 1) Overridden values via control objects (e.g. Materialized, Serialized, > Consumed, etc) > 2) Serdes that can be inferred from the operator itself (e.g. > groupBy().count(), where value serde can default to `LongSerde`). > 3) Serde inherited from parent operator if possible (note if the key / value > types have been changed, then the corresponding serde cannot be inherited). > 4) Default serde specified in the config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7456) Serde Inheritance in Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634766#comment-16634766 ] ASF GitHub Bot commented on KAFKA-7456: --- mjsax closed pull request #5521: KAFKA-7456: Serde Inheritance in DSL URL: https://github.com/apache/kafka/pull/5521 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java index 6a851a10d9d..2474860a049 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java @@ -24,7 +24,7 @@ static public class TimeWindowedSerde extends Serdes.WrapperSerde> { // Default constructor needed for reflection object creation public TimeWindowedSerde() { -super(new TimeWindowedSerializer(), new TimeWindowedDeserializer()); +super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>()); } public TimeWindowedSerde(final Serde inner) { @@ -35,7 +35,7 @@ public TimeWindowedSerde(final Serde inner) { static public class SessionWindowedSerde extends Serdes.WrapperSerde> { // Default constructor needed for reflection object creation public SessionWindowedSerde() { -super(new SessionWindowedSerializer(), new SessionWindowedDeserializer()); +super(new SessionWindowedSerializer<>(), new SessionWindowedDeserializer<>()); } public SessionWindowedSerde(final Serde inner) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index a0724ebc6d6..e8707513a07 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; @@ -31,32 +32,48 @@ import java.util.Objects; import java.util.Set; -public abstract class AbstractStream { +/* + * Any classes (KTable, KStream, etc) extending this class should follow the serde specification precedence ordering as: + * + * 1) Overridden values via control objects (e.g. Materialized, Serialized, Consumed, etc) + * 2) Serdes that can be inferred from the operator itself (e.g. groupBy().count(), where value serde can default to `LongSerde`). + * 3) Serde inherited from parent operator if possible (note if the key / value types have been changed, then the corresponding serde cannot be inherited). + * 4) Default serde specified in the config. + */ +public abstract class AbstractStream { -protected final InternalStreamsBuilder builder; protected final String name; +protected final Serde keySerde; +protected final Serde valSerde; protected final Set sourceNodes; protected final StreamsGraphNode streamsGraphNode; +protected final InternalStreamsBuilder builder; // This copy-constructor will allow to extend KStream // and KTable APIs with new methods without impacting the public interface. -public AbstractStream(final AbstractStream stream) { -this.builder = stream.builder; +public AbstractStream(final AbstractStream stream) { this.name = stream.name; +this.builder = stream.builder; +this.keySerde = stream.keySerde; +this.valSerde = stream.valSerde; this.sourceNodes = stream.sourceNodes; this.streamsGraphNode = stream.streamsGraphNode; } -AbstractStream(final InternalStreamsBuilder builder, - final String name, +AbstractStream(final String name, + final Serde keySerde, + final Serde valSerde, final Set sourceNodes, - final StreamsGraphNode streamsGraphNode) { + final StreamsGraphNode streamsGraphNode, + final InternalStreamsBuilder builder) { if (sourceNodes == null || sourceNodes.isEmpty()) { throw new IllegalArgumentException("parameter must not be null or empty"); } -this.builder = builder; this.name = name; +this.builder = builder; +this.keySerde = keySerde; +
[jira] [Reopened] (KAFKA-4218) Enable access to key in ValueTransformer
[ https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin reopened KAFKA-4218: - Thanks for working on this [~jeyhunkarimov]! For KIP-149, currently there are 4 PRs: - [https://github.com/apache/kafka/pull/3570] - [https://github.com/apache/kafka/pull/3599] - [https://github.com/apache/kafka/pull/3600] - [https://github.com/apache/kafka/pull/3601] And 3 JIRAs: - KAFKA-4218 - KAFKA-3745 - KAFKA-4726 Can we create an umbrella Jira for KIP-149 to include these PRs and JIRAs, so that we can easily reference and track the entire progress for KIP-149 in e.g. Apache Kafka 2.1.0 release https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=91554044? > Enable access to key in ValueTransformer > > > Key: KAFKA-4218 > URL: https://issues.apache.org/jira/browse/KAFKA-4218 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Jeyhun Karimov >Priority: Major > Labels: api, kip > Fix For: 1.1.0 > > > While transforming values via {{KStream.transformValues}} and > {{ValueTransformer}}, the key associated with the value may be needed, even > if it is not changed. For instance, it may be used to access stores. > As of now, the key is not available within these methods and interfaces, > leading to the use of {{KStream.transform}} and {{Transformer}}, and the > unnecessary creation of new {{KeyValue}} objects. > KIP-149: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7276) Consider using re2j to speed up regex operations
[ https://issues.apache.org/jira/browse/KAFKA-7276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634727#comment-16634727 ] Ted Yu commented on KAFKA-7276: --- For #1, see the following: http://hadoop.apache.org/docs/r3.0.2/api/org/apache/hadoop/metrics2/filter/GlobFilter.html#compile-java.lang.String- It is used by hadoop. > Consider using re2j to speed up regex operations > > > Key: KAFKA-7276 > URL: https://issues.apache.org/jira/browse/KAFKA-7276 > Project: Kafka > Issue Type: Task > Components: packaging >Reporter: Ted Yu >Assignee: kevin.chen >Priority: Major > > https://github.com/google/re2j > re2j claims to do linear time regular expression matching in Java. > Its benefit is most obvious for deeply nested regex (such as a | b | c | d). > We should consider using re2j to speed up regex operations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread
[ https://issues.apache.org/jira/browse/KAFKA-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333251#comment-16333251 ] Ted Yu edited comment on KAFKA-6303 at 10/1/18 10:42 PM: - +1 from me. was (Author: yuzhih...@gmail.com): +1 from me . > Potential lack of synchronization in NioEchoServer#AcceptorThread > - > > Key: KAFKA-6303 > URL: https://issues.apache.org/jira/browse/KAFKA-6303 > Project: Kafka > Issue Type: Bug > Components: network >Reporter: Ted Yu >Assignee: siva santhalingam >Priority: Minor > > In the run() method: > {code} > SocketChannel socketChannel = > ((ServerSocketChannel) key.channel()).accept(); > socketChannel.configureBlocking(false); > newChannels.add(socketChannel); > {code} > Modification to newChannels should be protected by synchronized block. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4436) Provide builder pattern for StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-4436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634702#comment-16634702 ] ASF GitHub Bot commented on KAFKA-4436: --- wykapedia opened a new pull request #5722: [WIP] KAFKA-4436: Configurtion Builders URL: https://github.com/apache/kafka/pull/5722 - Added `ProducerConfigBuilder` to `ProducerConfig` to add easier dynamic creation of `ProducerConfig` properties `Map`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide builder pattern for StreamsConfig > - > > Key: KAFKA-4436 > URL: https://issues.apache.org/jira/browse/KAFKA-4436 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: needs-kip > > Currently, {{StreamsConfig}} parameters must be set "manually" as key value > pairs. This has multiple disadvantages from a user point of view: > - mandatory arguments could be missing > - data types might be wrong > - producer/consumer config parameters could conflict as they might have the > same name (user needs to know to prefix them to avoid conflict) > Those problems have different impact: either a runtime exception is thrown if > the problem is detected (e.g. missing parameter or wrong type) or the > application is just not configured correctly (producer/consumer has wrong > config). > A builder pattern would avoid those problems by forcing the user in the first > place to specify thing correctly (otherwise, it won't compile). For example > something like this: > {noformat} > StreamsConfig config = StreamsConfig.builder() > .setApplicationId(String appId) > .addBootstrapServer(String host, int port) > .addBootstrapServer(String host, int port) > .addZookeeper(String host, int port) > .addZookeeper(String host, int port) > .setStateDirectory(File path) > .setConsumerConfig( > ConsumerConfig.builder() > .setAutoOffsetReset(...) > .build() > ) > .build(); > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7465) kafka-topics.sh command not honouring --disable-rack-aware property when adding partitions.
[ https://issues.apache.org/jira/browse/KAFKA-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634618#comment-16634618 ] Suman B N commented on KAFKA-7465: -- Fixed. Pull request is [here|https://github.com/apache/kafka/pull/5721]. > kafka-topics.sh command not honouring --disable-rack-aware property when > adding partitions. > --- > > Key: KAFKA-7465 > URL: https://issues.apache.org/jira/browse/KAFKA-7465 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.0, 1.0.0, 1.1.0, 2.0.0 >Reporter: Suman B N >Priority: Minor > Fix For: 2.0.1 > > > kafka-topics.sh command not honouring --disable-rack-aware property when > adding partitions. > Create topic honours --disable-rack-aware property. Where as alter topic > always honours default rackAwarMode(Enforced) to add partitions. > Steps: > * Start brokers. > 0 -> r1, 1 -> r1, 2 -> r2, 3 -> r2 > * Create topic _topic1_ with 8 partitions and 2 RF. No partition should have > replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be > created. Create topic _topic2_ with 8 partitions and 2 RF with > --disable-rack-aware. Partition can have replicas on same rack brokers. Ex: > 0,1 and 2,3 replica if present is acceptable. > * Add 8 more partitions to _topic1._ No newly added partition should have > replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be > created. > * Add 8 more partitions to _topic2_ with --disable-rack-aware_._ Newly added > partition can have replicas on same rack brokers. Ex: 0,1 and 2,3 replica if > present is acceptable. Try repeating this step, no matter what, partitions > are always rack-aware. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty
[ https://issues.apache.org/jira/browse/KAFKA-7467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Badai Aqrandista updated KAFKA-7467: Priority: Minor (was: Major) > NoSuchElementException is raised because controlBatch is empty > -- > > Key: KAFKA-7467 > URL: https://issues.apache.org/jira/browse/KAFKA-7467 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0 >Reporter: Badai Aqrandista >Priority: Minor > > Somehow, log cleaner died because of NoSuchElementException when it calls > onControlBatchRead: > {noformat} > [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log > __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into > 0, discarding deletes. (kafka.log.LogCleaner) > [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to > (kafka.log.LogCleaner) > java.util.NoSuchElementException > at java.util.Collections$EmptyIterator.next(Collections.java:4189) > at > kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) > at > kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) > at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) > at > org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) > at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:392) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped > (kafka.log.LogCleaner) > {noformat} > The following code does not seem to expect the controlBatch to be empty: > https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946 > {noformat} > def onControlBatchRead(controlBatch: RecordBatch): Boolean = { > consumeAbortedTxnsUpTo(controlBatch.lastOffset) > val controlRecord = controlBatch.iterator.next() > val controlType = ControlRecordType.parse(controlRecord.key) > val producerId = controlBatch.producerId > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7467) NoSuchElementException is raised because controlBatch is empty
Badai Aqrandista created KAFKA-7467: --- Summary: NoSuchElementException is raised because controlBatch is empty Key: KAFKA-7467 URL: https://issues.apache.org/jira/browse/KAFKA-7467 Project: Kafka Issue Type: Bug Components: core Affects Versions: 1.1.0 Reporter: Badai Aqrandista Somehow, log cleaner died because of NoSuchElementException when it calls onControlBatchRead: {noformat} [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, discarding deletes. (kafka.log.LogCleaner) [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner) java.util.NoSuchElementException at java.util.Collections$EmptyIterator.next(Collections.java:4189) at kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) at kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) at scala.collection.immutable.List.foreach(List.scala:392) at kafka.log.Cleaner.doClean(LogCleaner.scala:461) at kafka.log.Cleaner.clean(LogCleaner.scala:438) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner) {noformat} The following code does not seem to expect the controlBatch to be empty: https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946 {noformat} def onControlBatchRead(controlBatch: RecordBatch): Boolean = { consumeAbortedTxnsUpTo(controlBatch.lastOffset) val controlRecord = controlBatch.iterator.next() val controlType = ControlRecordType.parse(controlRecord.key) val producerId = controlBatch.producerId {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7465) kafka-topics.sh command not honouring --disable-rack-aware property when adding partitions.
[ https://issues.apache.org/jira/browse/KAFKA-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634538#comment-16634538 ] Suman B N commented on KAFKA-7465: -- Working on this fix. Please assgin the jira to me. [~cmccabe] [~omkreddy] > kafka-topics.sh command not honouring --disable-rack-aware property when > adding partitions. > --- > > Key: KAFKA-7465 > URL: https://issues.apache.org/jira/browse/KAFKA-7465 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.0, 1.0.0, 1.1.0, 2.0.0 >Reporter: Suman B N >Priority: Minor > Fix For: 2.0.1 > > > kafka-topics.sh command not honouring --disable-rack-aware property when > adding partitions. > Create topic honours --disable-rack-aware property. Where as alter topic > always honours default rackAwarMode(Enforced) to add partitions. > Steps: > * Start brokers. > 0 -> r1, 1 -> r1, 2 -> r2, 3 -> r2 > * Create topic _topic1_ with 8 partitions and 2 RF. No partition should have > replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be > created. Create topic _topic2_ with 8 partitions and 2 RF with > --disable-rack-aware. Partition can have replicas on same rack brokers. Ex: > 0,1 and 2,3 replica if present is acceptable. > * Add 8 more partitions to _topic1._ No newly added partition should have > replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be > created. > * Add 8 more partitions to _topic2_ with --disable-rack-aware_._ Newly added > partition can have replicas on same rack brokers. Ex: 0,1 and 2,3 replica if > present is acceptable. Try repeating this step, no matter what, partitions > are always rack-aware. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7465) kafka-topics.sh command not honouring --disable-rack-aware property when adding partitions.
Suman B N created KAFKA-7465: Summary: kafka-topics.sh command not honouring --disable-rack-aware property when adding partitions. Key: KAFKA-7465 URL: https://issues.apache.org/jira/browse/KAFKA-7465 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.0.0, 1.1.0, 1.0.0, 0.11.0.0 Reporter: Suman B N Fix For: 2.0.1 kafka-topics.sh command not honouring --disable-rack-aware property when adding partitions. Create topic honours --disable-rack-aware property. Where as alter topic always honours default rackAwarMode(Enforced) to add partitions. Steps: * Start brokers. 0 -> r1, 1 -> r1, 2 -> r2, 3 -> r2 * Create topic _topic1_ with 8 partitions and 2 RF. No partition should have replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be created. Create topic _topic2_ with 8 partitions and 2 RF with --disable-rack-aware. Partition can have replicas on same rack brokers. Ex: 0,1 and 2,3 replica if present is acceptable. * Add 8 more partitions to _topic1._ No newly added partition should have replicas on same rack brokers. Ex: 0,1 and 2,3 replica should never be created. * Add 8 more partitions to _topic2_ with --disable-rack-aware_._ Newly added partition can have replicas on same rack brokers. Ex: 0,1 and 2,3 replica if present is acceptable. Try repeating this step, no matter what, partitions are always rack-aware. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6914) Kafka Connect - Plugins class should have a constructor that can take in parent ClassLoader
[ https://issues.apache.org/jira/browse/KAFKA-6914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634495#comment-16634495 ] ASF GitHub Bot commented on KAFKA-6914: --- kkonstantine opened a new pull request #5720: KAFKA-6914: Set parent classloader of DelegatingClassLoader same as the worker's URL: https://github.com/apache/kafka/pull/5720 The parent classloader of the DelegatingClassLoader and therefore the classloading scheme used by Connect does not have to be fixed to the System classloader. Setting it the same as the one that was used to load the DelegatingClassLoader class itself is more flexible and, while in most cases will result in the System classloader to be used, it will also work in othr managed environments that control classloading differently (OSGi, and others). The fix is minimal and the mainstream use is tested via system tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Connect - Plugins class should have a constructor that can take in > parent ClassLoader > --- > > Key: KAFKA-6914 > URL: https://issues.apache.org/jira/browse/KAFKA-6914 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sriram KS >Assignee: Konstantine Karantasis >Priority: Minor > > Currently Plugins class has a single constructor that takes in map of props. > Please make Plugin class to have a constructor that takes in a classLoader as > well and use it to set DelegationClassLoader's parent classLoader. > Reason: > This will be useful if i am already having a managed class Loader environment > like a Spring boot app which resolves my class dependencies using my > maven/gradle dependency management. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7464: --- Affects Version/s: 2.0.0 > Fail to shutdown ReplicaManager during broker cleaned shutdown > -- > > Key: KAFKA-7464 > URL: https://issues.apache.org/jira/browse/KAFKA-7464 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Zhanxiang (Patrick) Huang >Assignee: Zhanxiang (Patrick) Huang >Priority: Critical > Fix For: 2.0.1, 2.1.0 > > > In 2.0 deployment, we saw the following log when shutting down the > ReplicaManager in broker cleaned shutdown: > {noformat} > 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null > java.lang.IllegalArgumentException: null > at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121] > at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121] > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > ~[?:1.8.0_121] > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.Selector.doClose(Selector.java:751) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:739) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:701) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:315) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) > ~[kafka-clients-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > ~[scala-library-2.11.12.jar:?] > at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > {noformat} > After that, we noticed that some of the replica fetcher thread fail to > shutdown: > {noformat} > 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] > [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log > segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches > java.nio.channels.ClosedChannelException: null > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) > ~[?:1.8.0_121] > at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) > ~[?:1.8.0_121] > at >
[jira] [Updated] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7464: --- Priority: Critical (was: Major) > Fail to shutdown ReplicaManager during broker cleaned shutdown > -- > > Key: KAFKA-7464 > URL: https://issues.apache.org/jira/browse/KAFKA-7464 > Project: Kafka > Issue Type: Bug >Reporter: Zhanxiang (Patrick) Huang >Assignee: Zhanxiang (Patrick) Huang >Priority: Critical > Fix For: 2.0.1, 2.1.0 > > > In 2.0 deployment, we saw the following log when shutting down the > ReplicaManager in broker cleaned shutdown: > {noformat} > 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null > java.lang.IllegalArgumentException: null > at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121] > at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121] > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > ~[?:1.8.0_121] > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.Selector.doClose(Selector.java:751) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:739) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:701) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:315) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) > ~[kafka-clients-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > ~[scala-library-2.11.12.jar:?] > at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > {noformat} > After that, we noticed that some of the replica fetcher thread fail to > shutdown: > {noformat} > 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] > [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log > segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches > java.nio.channels.ClosedChannelException: null > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) > ~[?:1.8.0_121] > at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) > ~[?:1.8.0_121] > at >
[jira] [Updated] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7464: --- Fix Version/s: 2.0.1 > Fail to shutdown ReplicaManager during broker cleaned shutdown > -- > > Key: KAFKA-7464 > URL: https://issues.apache.org/jira/browse/KAFKA-7464 > Project: Kafka > Issue Type: Bug >Reporter: Zhanxiang (Patrick) Huang >Assignee: Zhanxiang (Patrick) Huang >Priority: Critical > Fix For: 2.0.1, 2.1.0 > > > In 2.0 deployment, we saw the following log when shutting down the > ReplicaManager in broker cleaned shutdown: > {noformat} > 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null > java.lang.IllegalArgumentException: null > at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121] > at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121] > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > ~[?:1.8.0_121] > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.Selector.doClose(Selector.java:751) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:739) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:701) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:315) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) > ~[kafka-clients-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > ~[scala-library-2.11.12.jar:?] > at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > {noformat} > After that, we noticed that some of the replica fetcher thread fail to > shutdown: > {noformat} > 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] > [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log > segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches > java.nio.channels.ClosedChannelException: null > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) > ~[?:1.8.0_121] > at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) > ~[?:1.8.0_121] > at >
[jira] [Updated] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-7464: --- Fix Version/s: 2.1.0 > Fail to shutdown ReplicaManager during broker cleaned shutdown > -- > > Key: KAFKA-7464 > URL: https://issues.apache.org/jira/browse/KAFKA-7464 > Project: Kafka > Issue Type: Bug >Reporter: Zhanxiang (Patrick) Huang >Assignee: Zhanxiang (Patrick) Huang >Priority: Major > Fix For: 2.0.1, 2.1.0 > > > In 2.0 deployment, we saw the following log when shutting down the > ReplicaManager in broker cleaned shutdown: > {noformat} > 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null > java.lang.IllegalArgumentException: null > at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121] > at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121] > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > ~[?:1.8.0_121] > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.Selector.doClose(Selector.java:751) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:739) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:701) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:315) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) > ~[kafka-clients-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > ~[scala-library-2.11.12.jar:?] > at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > {noformat} > After that, we noticed that some of the replica fetcher thread fail to > shutdown: > {noformat} > 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] > [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log > segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches > java.nio.channels.ClosedChannelException: null > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) > ~[?:1.8.0_121] > at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) > ~[?:1.8.0_121] > at >
[jira] [Commented] (KAFKA-6051) ReplicaFetcherThread should close the ReplicaFetcherBlockingSend earlier on shutdown
[ https://issues.apache.org/jira/browse/KAFKA-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634331#comment-16634331 ] Zhanxiang (Patrick) Huang commented on KAFKA-6051: -- [~ijuma] Sure. I have created KAFKA-7464 for the issue. > ReplicaFetcherThread should close the ReplicaFetcherBlockingSend earlier on > shutdown > > > Key: KAFKA-6051 > URL: https://issues.apache.org/jira/browse/KAFKA-6051 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, > 0.10.2.1, 0.11.0.0 >Reporter: Maytee Chinavanichkit >Assignee: Maytee Chinavanichkit >Priority: Major > Fix For: 1.1.0 > > > The ReplicaFetcherBlockingSend works as designed and will blocks until it is > able to get data. This becomes a problem when we are gracefully shutting down > a broker. The controller will attempt to shutdown the fetchers and elect new > leaders. When the last fetch of partition is removed, as part of the > {{replicaManager.becomeLeaderOrFollower}} call will proceed to shut down any > idle ReplicaFetcherThread. The shutdown process here can block up to until > the last fetch request completes. This blocking delay is a big problem > because the {{replicaStateChangeLock}}, and {{mapLock}} in > {{AbstractFetcherManager}} is still locked causing latency spikes on multiple > brokers. > At this point in time, we do not need the last response as the fetcher is > shutting down. We should close the leaderEndpoint early during > {{initiateShutdown()}} instead of after {{super.shutdown()}}. > For example we see here the shutdown blocked the broker from processing more > replica changes for ~500 ms > {code} > [2017-09-01 18:11:42,879] INFO [ReplicaFetcherThread-0-2], Shutting down > (kafka.server.ReplicaFetcherThread) > [2017-09-01 18:11:43,314] INFO [ReplicaFetcherThread-0-2], Stopped > (kafka.server.ReplicaFetcherThread) > [2017-09-01 18:11:43,314] INFO [ReplicaFetcherThread-0-2], Shutdown completed > (kafka.server.ReplicaFetcherThread) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown
Zhanxiang (Patrick) Huang created KAFKA-7464: Summary: Fail to shutdown ReplicaManager during broker cleaned shutdown Key: KAFKA-7464 URL: https://issues.apache.org/jira/browse/KAFKA-7464 Project: Kafka Issue Type: Bug Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang In 2.0 deployment, we saw the following log when shutting down the ReplicaManager in broker cleaned shutdown: {noformat} 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null java.lang.IllegalArgumentException: null at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121] at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121] at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[?:1.8.0_121] at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.Selector.doClose(Selector.java:751) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.Selector.close(Selector.java:739) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.Selector.close(Selector.java:701) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.Selector.close(Selector.java:315) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) ~[kafka-clients-2.0.0.22.jar:?] at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182) ~[kafka_2.11-2.0.0.22.jar:?] at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) ~[scala-library-2.11.12.jar:?] at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) ~[scala-library-2.11.12.jar:?] at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) ~[scala-library-2.11.12.jar:?] at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) ~[scala-library-2.11.12.jar:?] at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) ~[scala-library-2.11.12.jar:?] at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) ~[scala-library-2.11.12.jar:?] at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) ~[scala-library-2.11.12.jar:?] at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) ~[kafka_2.11-2.0.0.22.jar:?] {noformat} After that, we noticed that some of the replica fetcher thread fail to shutdown: {noformat} 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches java.nio.channels.ClosedChannelException: null at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) ~[?:1.8.0_121] at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) ~[?:1.8.0_121] at org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) ~[kafka-clients-2.0.0.22.jar:?] at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) ~[kafka_2.11-2.0.0.22.jar:?] at
[jira] [Commented] (KAFKA-7463) ConsoleProducer rejects producer property values with equals signs
[ https://issues.apache.org/jira/browse/KAFKA-7463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634303#comment-16634303 ] Manikumar commented on KAFKA-7463: -- This might have fixed in KAFKA-7388. > ConsoleProducer rejects producer property values with equals signs > -- > > Key: KAFKA-7463 > URL: https://issues.apache.org/jira/browse/KAFKA-7463 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Magnus Reftel >Priority: Major > > The --producer-property option of ConsoleProducer rejects proprty values with > equals signs with the message "Invalid command line properties". This makes > specifying SASL configuration via the the sasl.jaas.config property > difficult, since these values often contain equals signs. It's possible to > work around the issue by writing the SASL configuration to a separate file, > and then specifying it via > {code:java} > KAFKA_OPTS=-Djava.security.auth.login.config=$filename{code} > but there seems to be no good reason to not allow the more direct route. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7445) Branch one Stream in multiple Streams
[ https://issues.apache.org/jira/browse/KAFKA-7445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633953#comment-16633953 ] Dennis Reiter commented on KAFKA-7445: -- [~mjsax]: Thanks for the hint! I will give it a try. [~guozhang]: I didn't want to change the branching logic, just using it as an example. > Branch one Stream in multiple Streams > - > > Key: KAFKA-7445 > URL: https://issues.apache.org/jira/browse/KAFKA-7445 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Dennis Reiter >Priority: Minor > > Hi, > I need to branch/split KStreams in multiple independent KStreams. I thought, > {{org.apache.kafka.streams.kstream.internals.KStreamImpl#branch}} is the > right one but in fact, its designed for another purpose. > In contrast to {{branch}} I need to assign the record to *all* matching > streams, not only one stream. > Speaking in code > ({{org.apache.kafka.streams.kstream.internals.KStreamBranch}}): > {code:java} > if (predicates[i].test(key, value)) { >// use forward with childIndex here >// and pipe the record to multiple streams >context().forward(key, value, i); > } > {code} > My question: is this still possible with features already included in > Streams? Or shall I propose a change? > Thanks in advance > Dennis -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7458) Avoid enforced processing during bootstrap phase
[ https://issues.apache.org/jira/browse/KAFKA-7458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633951#comment-16633951 ] Antony Stubbs commented on KAFKA-7458: -- Related to KAFKA-4113 > Avoid enforced processing during bootstrap phase > > > Key: KAFKA-7458 > URL: https://issues.apache.org/jira/browse/KAFKA-7458 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > In KAFKA-3514, we introduced a new config for allowing users to delay > enforcing processing without all input topic partitions to have data. This > config's default value is 0, which means that as long as the first fetch does > not contains some records for all the partitions it will fall into enforced > processing immediately, which is a high risk especially under bootstrap case. > We should consider leveraging on pause / resume to make sure that upon > starting, some partition indeed does not have any data before we fall into > enforced processing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7463) ConsoleProducer rejects producer property values with equals signs
Magnus Reftel created KAFKA-7463: Summary: ConsoleProducer rejects producer property values with equals signs Key: KAFKA-7463 URL: https://issues.apache.org/jira/browse/KAFKA-7463 Project: Kafka Issue Type: Bug Affects Versions: 2.0.0 Reporter: Magnus Reftel The --producer-property option of ConsoleProducer rejects proprty values with equals signs with the message "Invalid command line properties". This makes specifying SASL configuration via the the sasl.jaas.config property difficult, since these values often contain equals signs. It's possible to work around the issue by writing the SASL configuration to a separate file, and then specifying it via {code:java} KAFKA_OPTS=-Djava.security.auth.login.config=$filename{code} but there seems to be no good reason to not allow the more direct route. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5682) Consumer should include partition in exceptions raised during record parsing/validation
[ https://issues.apache.org/jira/browse/KAFKA-5682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-5682: --- Fix Version/s: (was: 2.1.0) 2.2.0 > Consumer should include partition in exceptions raised during record > parsing/validation > --- > > Key: KAFKA-5682 > URL: https://issues.apache.org/jira/browse/KAFKA-5682 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Stanislav Kozlovski >Priority: Minor > Labels: needs-kip > Fix For: 2.2.0 > > > When we encounter an exception when validating a fetched record or when > deserializing it, we raise it to the user and keep the consumer's current > position at the offset of the failed record. The expectation is that the user > will either propagate the exception and shutdown or seek past the failed > record. However, in the latter case, there is no way for the user to know > which topic partition had the failed record. We should consider exposing an > exception type to expose this information which users can catch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector
[ https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633793#comment-16633793 ] Rajini Sivaram commented on KAFKA-7304: --- [~yuyang08] We saw the OOM during a load test, so CPU usage would have been high anyway. With a large number of clients producing a large amount of traffic, we would expect high CPU usage with SSL. We haven't done any comparisons with other SSL implementations like OpenSSL though to see if we can improve on this. > memory leakage in org.apache.kafka.common.network.Selector > -- > > Key: KAFKA-7304 > URL: https://issues.apache.org/jira/browse/KAFKA-7304 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0, 1.1.1 >Reporter: Yu Yang >Priority: Critical > Fix For: 1.1.2, 2.0.1, 2.1.0 > > Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at > 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot > 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, > Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 > AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at > 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot > 2018-08-29 at 10.50.47 AM.png, Screen Shot 2018-09-29 at 10.38.12 PM.png, > Screen Shot 2018-09-29 at 10.38.38 PM.png, Screen Shot 2018-09-29 at 8.34.50 > PM.png > > > We are testing secured writing to kafka through ssl. Testing at small scale, > ssl writing to kafka was fine. However, when we enabled ssl writing at a > larger scale (>40k clients write concurrently), the kafka brokers soon hit > OutOfMemory issue with 4G memory setting. We have tried with increasing the > heap size to 10Gb, but encountered the same issue. > We took a few heap dumps , and found that most of the heap memory is > referenced through org.apache.kafka.common.network.Selector objects. There > are two Channel maps field in Selector. It seems that somehow the objects is > not deleted from the map in a timely manner. > One observation is that the memory leak seems relate to kafka partition > leader changes. If there is broker restart etc. in the cluster that caused > partition leadership change, the brokers may hit the OOM issue faster. > {code} > private final Map channels; > private final Map closingChannels; > {code} > Please see the attached images and the following link for sample gc > analysis. > http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0 > the command line for running kafka: > {code} > java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m > -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC > -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 > -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 > -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps > -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log > -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M > -Djava.awt.headless=true > -Dlog4j.configuration=file:/etc/kafka/log4j.properties > -Dcom.sun.management.jmxremote > -Dcom.sun.management.jmxremote.authenticate=false > -Dcom.sun.management.jmxremote.ssl=false > -Dcom.sun.management.jmxremote.port= > -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/* > kafka.Kafka /etc/kafka/server.properties > {code} > We use java 1.8.0_102, and has applied a TLS patch on reducing > X509Factory.certCache map size from 750 to 20. > {code} > java -version > java version "1.8.0_102" > Java(TM) SE Runtime Environment (build 1.8.0_102-b14) > Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7450) "Handshake message sequence violation" related ssl handshake failure leads to high cpu usage
[ https://issues.apache.org/jira/browse/KAFKA-7450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633779#comment-16633779 ] Rajini Sivaram commented on KAFKA-7450: --- The exceptions indicate an SSL configuration issue. If this is an issue only for inter-broker communication and clients have been connecting successfully, then it would be worth checking truststore etc. Enabling javax.net.debug=ssl should help identify the cause if it is a configuration issue. > "Handshake message sequence violation" related ssl handshake failure leads to > high cpu usage > > > Key: KAFKA-7450 > URL: https://issues.apache.org/jira/browse/KAFKA-7450 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 2.0.0 >Reporter: Yu Yang >Priority: Major > > After updating security.inter.broker.protocol to SSL for our cluster, we > observed that the controller can get into almost 100% cpu usage. > {code} > listeners=PLAINTEXT://:9092,SSL://:9093 > security.inter.broker.protocol=SSL > {code} > There is no obvious error in server.log. But in controller.log, there is > repetitive SSL handshare failure error as below: > {code} > [2018-09-28 05:53:10,821] WARN [RequestSendThread controllerId=6042] > Controller 6042's connection to broker datakafka06176.ec2.pin220.com:9093 > (id: 6176 rack: null) was unsuccessful (kafka.controller.RequestSendThread) > org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake > failed > Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence > violation, 2 > at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1487) > at > sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535) > at > sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:813) > at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:781) > at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624) > at > org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:468) > at > org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:331) > at > org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73) > at > kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence > violation, 2 > at > sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:196) > at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) > at sun.security.ssl.Handshaker$1.run(Handshaker.java:966) > at sun.security.ssl.Handshaker$1.run(Handshaker.java:963) > at java.security.AccessController.doPrivileged(Native Method) > at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416) > at > org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393) > at > org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:473) > ... 10 more > {code} > {code} > [2018-09-30 00:30:13,609] WARN [ReplicaFetcher replicaId=59, leaderId=66, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=59, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={the_test_topic-18=(offset=462333447, logStartOffset=462286948, > maxBytes=4194304), the_test_topic-58=(offset=462312762, > logStartOffset=462295078, maxBytes=4194304)}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1991153671, > epoch=INITIAL)) (kafka.server.ReplicaFetcherThread) > org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake > failed > Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence > violation, 2 > at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1538) > at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535) > at sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:813) > at
[jira] [Created] (KAFKA-7462) Kafka brokers cannot provide OAuth without a token
Rajini Sivaram created KAFKA-7462: - Summary: Kafka brokers cannot provide OAuth without a token Key: KAFKA-7462 URL: https://issues.apache.org/jira/browse/KAFKA-7462 Project: Kafka Issue Type: Bug Components: security Affects Versions: 2.0.0 Reporter: Rajini Sivaram Fix For: 2.1.0 Like with all other SASL mechanisms, OAUTHBEARER uses the same LoginModule class on both server-side and the client-side. But unlike PLAIN or SCRAM where client credentials are optional, OAUTHBEARER requires always requires a token. So while with PLAIN/SCRAM, broker only needs to specify client credentials if the mechanism is used for inter-broker communication, with OAuth, broker requires client credentials even if OAuth is not used for inter-broker communication. This is an issue with the default `OAuthBearerUnsecuredLoginCallbackHandler` used on both client-side and server-side. But more critically, it is an issue with `OAuthBearerLoginModule` which doesn't commit if token == null (commit() returns false). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5836) Kafka Streams - API for specifying internal stream name on join
[ https://issues.apache.org/jira/browse/KAFKA-5836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-5836. Resolution: Duplicate > Kafka Streams - API for specifying internal stream name on join > --- > > Key: KAFKA-5836 > URL: https://issues.apache.org/jira/browse/KAFKA-5836 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Lovro Pandžić >Priority: Major > Labels: api, needs-kip > > Automatic topic name can be problematic in case of streams operation > change/migration. > I'd like to be able to specify name of an internal topic so I can avoid > creation of new stream and data "loss" when changing the Stream building. -- This message was sent by Atlassian JIRA (v7.6.3#76005)