[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468303#comment-16468303 ] Chia-Ping Tsai commented on KAFKA-6870: --- After reading the Sensor and KafkaMetrics, I feel we should make KafkaMetric#measurableValue thread-safe to resolve this issue. We have passed a lock to KafkaMetrics and KafkaMetrics should use the lock when trying to get the measure value from metricValueProvider. [~ijuma] [~rsivaram] Any suggestions? > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468244#comment-16468244 ] ASF GitHub Bot commented on KAFKA-6761: --- bbejeck opened a new pull request #4983: KAFKA-6761 [WIP]- reduce streams footprint part II URL: https://github.com/apache/kafka/pull/4983 This version is a WIP and intentionally leaves out some additional required changes to keep the reviewing effort more manageable. This version of the process includes 1. Cleaning up the graph objects to reduce the number of parameters and make the naming conventions more clear. 2. Intercepting all calls to the `InternalToplogyBuilder` and capturing all details required for possible optimizations and building the final topology. This PR does not include writing out the current physical plan, so no tests included. The next PR will include additional changes to building the graph and writing the topology out without optimizations, using the current streams tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce Kafka Streams Footprint > -- > > Key: KAFKA-6761 > URL: https://issues.apache.org/jira/browse/KAFKA-6761 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.0.0 > > > The persistent storage footprint of a Kafka Streams application contains the > following aspects: > # The internal topics created on the Kafka cluster side. > # The materialized state stores on the Kafka Streams application instances > side. > There have been some questions about reducing these footprints, especially > since many of them are not necessary. For example, there are redundant > internal topics, as well as unnecessary state stores that takes up space but > also affect performance. When people are pushing Streams to production with > high traffic, this issue would be more common and severe. Reducing the > footprint of Streams have clear benefits for reducing resource utilization of > Kafka Streams applications, and also not creating pressure on broker's > capacities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6786) Remove additional configs in StreamsBrokerDownResilienceTest
[ https://issues.apache.org/jira/browse/KAFKA-6786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468243#comment-16468243 ] ASF GitHub Bot commented on KAFKA-6786: --- abhishek7 opened a new pull request #4982: KAFKA-6786 URL: https://github.com/apache/kafka/pull/4982 First iteration of KAFKA-6786. Currently a WIP as it is my first time contributing; want to make sure that we implemented this correctly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove additional configs in StreamsBrokerDownResilienceTest > > > Key: KAFKA-6786 > URL: https://issues.apache.org/jira/browse/KAFKA-6786 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Abhishek Sharma >Priority: Minor > Labels: newbie > > Since we are now passing in a property file into the streams service > initialization code, we do not need to pass in those configs as additional > properties in StreamsBrokerDownResilienceTest. We can refactor this test to > get rid of the additional properties. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6877) Remove completedFetch upon a failed parse if it contains no records.
[ https://issues.apache.org/jira/browse/KAFKA-6877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468221#comment-16468221 ] ASF GitHub Bot commented on KAFKA-6877: --- becketqin closed pull request #4974: KAFKA-6877; Remove completedFetch upon a failed parse if it contains no records. URL: https://github.com/apache/kafka/pull/4974 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 6d8fb6c2466..b3791ffb514 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -472,6 +472,7 @@ private ListOffsetResult fetchOffsetsByTimes(Maptimestamp * @return The fetched records per partition * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and * the defaultResetPolicy is NONE + * @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse. */ public Map >> fetchedRecords() { Map >> fetched = new HashMap<>(); @@ -483,7 +484,20 @@ private ListOffsetResult fetchOffsetsByTimes(Map timestamp CompletedFetch completedFetch = completedFetches.peek(); if (completedFetch == null) break; -nextInLineRecords = parseCompletedFetch(completedFetch); +try { +nextInLineRecords = parseCompletedFetch(completedFetch); +} catch (Exception e) { +// Remove a completedFetch upon a parse with exception if (1) it contains no records, and +// (2) there are no fetched records with actual content preceding this exception. +// The first condition ensures that the completedFetches is not stuck with the same completedFetch +// in cases such as the TopicAuthorizationException, and the second condition ensures that no +// potential data loss due to an exception in a following record. +FetchResponse.PartitionData partition = completedFetch.partitionData; +if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) { +completedFetches.poll(); +} +throw e; +} completedFetches.poll(); } else { List > records = fetchRecords(nextInLineRecords, recordsRemaining); @@ -945,7 +959,7 @@ private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) { this.metadata.requestUpdate(); } else if (error == Errors.OFFSET_OUT_OF_RANGE) { if (fetchOffset != subscriptions.position(tp)) { -log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" + +log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp)); } else if (subscriptions.hasDefaultOffsetResetPolicy()) { log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 6de11866ec4..7157470b760 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -59,6 +59,7 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.AbstractRequest; @@ -117,6 +118,8 @@ private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; private TopicPartition tp0 = new TopicPartition(topicName, 0);
[jira] [Resolved] (KAFKA-6292) KafkaConsumer ran into Unknown error fetching data for topic-partition caused by integer overflow in FileLogInputStream
[ https://issues.apache.org/jira/browse/KAFKA-6292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6292. Resolution: Fixed > KafkaConsumer ran into Unknown error fetching data for topic-partition caused > by integer overflow in FileLogInputStream > > > Key: KAFKA-6292 > URL: https://issues.apache.org/jira/browse/KAFKA-6292 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2 > Environment: OS:Red Hat Enterprise Linux Server release 7.3 (Maipo) > Kafka: kafka_2.12-0.11.0.0 > JDK: jdk1.8.0_121 >Reporter: Terence Yi >Priority: Major > Labels: easyfix, reliability > Fix For: 2.0.0 > > > Steps to reproduce: > * Broker config to reproduce this bug: > {code:java} > # The maximum size of a log segment file. When this size is reached a new > log segment will be created. > #2G > log.segment.bytes=2147483647 > {code} > * Setups: > producer sends messages constantly. > consumer polling > topic has 1 partitions and replication factor 1. > min.insync.replicas=1 > producer has "acks=all" > consumer has default "enable.auto.commit=false" > consumer manually commitSync offsets after handling messages. > kafka in standalone > * Observe log in consumer side(for me running 12 hours) > {code:java} > 2017-12-18 07:11:01.013 WARN sep105v1 > [app-consumer-subscription-pool-4-thread-20] > org.apache.kafka.clients.consumer.internals.Fetcher {} Unknown error fetching > data for topic-partition DDI.DISPATCHER.P_TVIN.W_SL.P_appx.P_ul.P_pos-0 > {code} > * Observe server.log in Kafka/logs > {code:java} > [2017-12-14 04:52:21,144] ERROR [Replica Manager on Broker 3]: Error > processing fetch operation on partition > DDI.DISPATCHER.P_TVIN.W_SL.P_appx.P_ul.P_pos-0, offset 4043314339 > (kafka.server.ReplicaManager) > org.apache.kafka.common.KafkaException: java.io.EOFException: Failed to read > `log header` from file channel `sun.nio.ch.FileChannelImpl@5604ea91`. > Expected to read 17 bytes, but reached end of file after reading 0 bytes. > Started read from position 2147483643. > at > org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:40) > at > org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24) > at > org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) > at > org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) > at > org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:279) > at kafka.log.LogSegment.translateOffset(LogSegment.scala:176) > at kafka.log.LogSegment.read(LogSegment.scala:228) > at kafka.log.Log.read(Log.scala:938) > at kafka.server.ReplicaManager.read$1(ReplicaManager.scala:719) > at > kafka.server.ReplicaManager.$anonfun$readFromLocalLog$6(ReplicaManager.scala:780) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:779) > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:617) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:615) > at kafka.server.KafkaApis.handle(KafkaApis.scala:98) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.EOFException: Failed to read `log header` from file > channel `sun.nio.ch.FileChannelImpl@5604ea91`. Expected to read 17 bytes, but > reached end of file after reading 0 bytes. Started read from position > 2147483643. > at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:751) > at > org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:66) > at > org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:40) > at > org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35) > ... 18 more > {code} > * Impact: > # After EOF exception occurs, the consumer will failed to consume the remain > message > # After the segments log files which cause the EOF exception has been deleted > by the log Cleaner thread. Consumer recovered to consumer message. > # Have no impact from the view of producer > * Analysis: > # Kafka log file list: >
[jira] [Commented] (KAFKA-6292) KafkaConsumer ran into Unknown error fetching data for topic-partition caused by integer overflow in FileLogInputStream
[ https://issues.apache.org/jira/browse/KAFKA-6292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468133#comment-16468133 ] ASF GitHub Bot commented on KAFKA-6292: --- hachikuji closed pull request #4928: KAFKA-6292: Improved FileLogInputStream batch position checks in order to avoid type overflow related errors URL: https://github.com/apache/kafka/pull/4928 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java index a1e3a2f6541..92e8864a183 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java @@ -60,7 +60,7 @@ @Override public FileChannelRecordBatch nextBatch() throws IOException { FileChannel channel = fileRecords.channel(); -if (position + HEADER_SIZE_UP_TO_MAGIC >= end) +if (position >= end - HEADER_SIZE_UP_TO_MAGIC) return null; logHeaderBuffer.rewind(); @@ -75,7 +75,7 @@ public FileChannelRecordBatch nextBatch() throws IOException { throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " + "overhead (%d) in file %s.", size, LegacyRecord.RECORD_OVERHEAD_V0, fileRecords.file())); -if (position + LOG_OVERHEAD + size > end) +if (position > end - LOG_OVERHEAD - size) return null; byte magic = logHeaderBuffer.get(MAGIC_OFFSET); diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java index 95b2a0c89c6..77aaae86f5f 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java @@ -112,8 +112,8 @@ public void testBatchIterationWithMultipleRecordsPerBatch() throws IOException { SimpleRecord[] firstBatchRecords = new SimpleRecord[]{ new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()), new SimpleRecord(234280L, "b".getBytes(), "2".getBytes()) - }; + SimpleRecord[] secondBatchRecords = new SimpleRecord[]{ new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()), new SimpleRecord(897839L, null, "4".getBytes()), @@ -152,8 +152,8 @@ public void testBatchIterationV2() throws IOException { SimpleRecord[] firstBatchRecords = new SimpleRecord[]{ new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()), new SimpleRecord(234280L, "b".getBytes(), "2".getBytes()) - }; + SimpleRecord[] secondBatchRecords = new SimpleRecord[]{ new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()), new SimpleRecord(897839L, null, "4".getBytes()), @@ -204,6 +204,22 @@ public void testBatchIterationIncompleteBatch() throws IOException { } } +@Test +public void testNextBatchSelectionWithMaxedParams() throws IOException { +try (FileRecords fileRecords = FileRecords.open(tempFile())) { +FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, Integer.MAX_VALUE, Integer.MAX_VALUE); +assertNull(logInputStream.nextBatch()); +} +} + +@Test +public void testNextBatchSelectionWithZeroedParams() throws IOException { +try (FileRecords fileRecords = FileRecords.open(tempFile())) { +FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, 0); +assertNull(logInputStream.nextBatch()); +} +} + private void assertProducerData(RecordBatch batch, long producerId, short producerEpoch, int baseSequence, boolean isTransactional, SimpleRecord ... records) { assertEquals(producerId, batch.producerId()); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaConsumer ran into Unknown error fetching data for topic-partition caused > by integer overflow in FileLogInputStream >
[jira] [Commented] (KAFKA-6879) Controller deadlock following session expiration
[ https://issues.apache.org/jira/browse/KAFKA-6879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468075#comment-16468075 ] ASF GitHub Bot commented on KAFKA-6879: --- ijuma closed pull request #4977: KAFKA-6879; Invoke session init callbacks outside lock to avoid deadlock URL: https://github.com/apache/kafka/pull/4977 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index bc721e39f96..d3d1a81c41f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -160,7 +160,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti override def beforeInitializingSession(): Unit = { val expireEvent = new Expire eventManager.clearAndPut(expireEvent) -expireEvent.waitUntilProcessed() + +// Block initialization of the new session until the expiration event is being handled, +// which ensures that all pending events have been processed before creating the new session +expireEvent.waitUntilProcessingStarted() } }) eventManager.put(Startup) @@ -1518,17 +1521,17 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti // We can't make this a case object due to the countDownLatch field class Expire extends ControllerEvent { -private val countDownLatch = new CountDownLatch(1) +private val processingStarted = new CountDownLatch(1) override def state = ControllerState.ControllerChange override def process(): Unit = { - countDownLatch.countDown() + processingStarted.countDown() activeControllerId = -1 onControllerResignation() } -def waitUntilProcessed(): Unit = { - countDownLatch.await() +def waitUntilProcessingStarted(): Unit = { + processingStarted.await() } } diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 5407934e987..24eb17770ec 100755 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -99,11 +99,15 @@ class KafkaScheduler(val threads: Int, } } - def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) { + def scheduleOnce(name: String, fun: () => Unit): Unit = { +schedule(name, fun, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS) + } + + def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit) { debug("Scheduling task %s with initial delay %d ms and period %d ms." .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit))) this synchronized { - ensureRunning + ensureRunning() val runnable = CoreUtils.runnable { try { trace("Beginning execution of scheduled task '%s'.".format(name)) diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 5c4cd685565..5cb127c3c7e 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -59,7 +59,7 @@ class ZooKeeperClient(connectString: String, private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala private val inFlightRequests = new Semaphore(maxInFlightRequests) private val stateChangeHandlers = new ConcurrentHashMap[String, StateChangeHandler]().asScala - private[zookeeper] val expiryScheduler = new KafkaScheduler(0, "zk-session-expiry-handler") + private[zookeeper] val expiryScheduler = new KafkaScheduler(threads = 1, "zk-session-expiry-handler") private val metricNames = Set[String]() @@ -325,43 +325,65 @@ class ZooKeeperClient(connectString: String, zooKeeper } - private def initialize(): Unit = { -if (!connectionState.isAlive) { - zooKeeper.close() - info(s"Initializing a new session to $connectString.") - // retry forever until ZooKeeper can be instantiated - var connected = false - while (!connected) { -try { - zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) - connected = true -} catch { - case e: Exception => -info("Error when recreating ZooKeeper, retrying after a short sleep", e) -Thread.sleep(1000) + private def reinitialize(): Unit = { +//
[jira] [Created] (KAFKA-6886) Externalize Secrets for Kafka Connect Configurations
Robert Yokota created KAFKA-6886: Summary: Externalize Secrets for Kafka Connect Configurations Key: KAFKA-6886 URL: https://issues.apache.org/jira/browse/KAFKA-6886 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Robert Yokota Assignee: Robert Yokota Fix For: 2.0.0 Kafka Connect's connector configurations have plaintext passwords, and Connect stores these in cleartext either on the filesystem (for standalone mode) or in internal topics (for distributed mode). Connect should not store or transmit cleartext passwords in connector configurations. Secrets in stored connector configurations should be allowed to be replaced with references to values stored in external secret management systems. Connect should provide an extension point for adding customized integrations, as well as provide a file-based extension as an example. Second, a Connect runtime should be allowed to be configured to use one or more of these extensions, and allow connector configurations to use placeholders that will be resolved by the runtime before passing the complete connector configurations to connectors. This will allow existing connectors to not see any difference in the configurations that Connect provides to them at startup. And third, Connect's API should be changed to allow a connector to obtain the latest connector configuration at any time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6885) DescribeConfigs synonyms are are identical to parent entry for BROKER resources
[ https://issues.apache.org/jira/browse/KAFKA-6885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467896#comment-16467896 ] Manikumar commented on KAFKA-6885: -- In case of topics, there is a difference between names of topic override configs and server default configs (ex: "max.message.bytes" is topic override config name for "message.max.bytes" server property). In this case synonym name will be different. But in case of broker there is no name difference between Dynamic broker configs and server default configs (Both places we use "message.max.bytes" config name. We can differentiate by using synonyms source property (DEFAULT_CONFIG, STATIC_BROKER_CONFIG etc..) > DescribeConfigs synonyms are are identical to parent entry for BROKER > resources > --- > > Key: KAFKA-6885 > URL: https://issues.apache.org/jira/browse/KAFKA-6885 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 1.1.0 >Reporter: Magnus Edenhill >Priority: Major > > The DescribeConfigs protocol response for BROKER resources returns synonyms > for various configuration entries, such as "log.dir". > The list of synonyms returned are identical to their parent configuration > entry, rather than the actual synonyms. > For example, for the broker "log.dir" config entry it returns one synonym, > also named "log.dir" rather than "log.dirs" or whatever the synonym is > supposed to be. > > This does not seem to happen for TOPIC resources. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6264) Log cleaner thread may die on legacy segment containing messages whose offsets are too large
[ https://issues.apache.org/jira/browse/KAFKA-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467795#comment-16467795 ] ASF GitHub Bot commented on KAFKA-6264: --- dhruvilshah3 closed pull request #4692: KAFKA-6264: Log cleaner thread may die on legacy segment containing messages whose offsets are too large URL: https://github.com/apache/kafka/pull/4692 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index 44083c186c8..4377c6f32a6 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -49,7 +49,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon protected val lock = new ReentrantLock @volatile - protected var mmap: MappedByteBuffer = { + protected[log] var mmap: MappedByteBuffer = { val newlyCreated = file.createNewFile() val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r") try { @@ -88,7 +88,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon /** The number of entries in this index */ @volatile - protected var _entries = mmap.position() / entrySize + protected[log] var _entries = mmap.position() / entrySize /** * True iff there are no more slots available in this index diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index cc693375079..39871872a63 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -351,7 +351,10 @@ class Log(@volatile var dir: File, time = time, fileAlreadyExists = true) -try segment.sanityCheck(timeIndexFileNewlyCreated) +try { + segment.sanityCheck(timeIndexFileNewlyCreated) + segment.initializeTimestampMetadata() +} catch { case _: NoSuchFileException => error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " + @@ -1120,15 +1123,15 @@ class Log(@volatile var dir: File, s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " + s"required version $KAFKA_0_10_0_IV0") - // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides - // constant time access while being safe to use with concurrent collections unlike `toArray`. - val segmentsCopy = logSegments.toBuffer // For the earliest and latest, we do not need to return the timestamp. if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset)) else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset)) + // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides + // constant time access while being safe to use with concurrent collections unlike `toArray`. + val segmentsCopy = logSegments.toBuffer val targetSeg = { // Get all the segments whose largest timestamp is smaller than target timestamp val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 0ee994252a1..c552d82710f 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -502,7 +502,9 @@ private[log] class Cleaner(val id: Int, try { // clean segments into the new destination segment val iter = segments.iterator + val numSegments = segments.length var currentSegmentOpt: Option[LogSegment] = Some(iter.next()) + while (currentSegmentOpt.isDefined) { val currentSegment = currentSegmentOpt.get val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None @@ -516,7 +518,7 @@ private[log] class Cleaner(val id: Int, info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new Date(currentSegment.largestTimestamp)}) " + s"into ${cleaned.baseOffset}, ${if(retainDeletes) "retaining" else "discarding"} deletes.") cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletes, log.config.maxMessageSize, - transactionMetadata, log.activeProducersWithLastSequence, stats) + transactionMetadata,
[jira] [Created] (KAFKA-6885) DescribeConfigs synonyms are are identical to parent entry for BROKER resources
Magnus Edenhill created KAFKA-6885: -- Summary: DescribeConfigs synonyms are are identical to parent entry for BROKER resources Key: KAFKA-6885 URL: https://issues.apache.org/jira/browse/KAFKA-6885 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 1.1.0 Reporter: Magnus Edenhill The DescribeConfigs protocol response for BROKER resources returns synonyms for various configuration entries, such as "log.dir". The list of synonyms returned are identical to their parent configuration entry, rather than the actual synonyms. For example, for the broker "log.dir" config entry it returns one synonym, also named "log.dir" rather than "log.dirs" or whatever the synonym is supposed to be. This does not seem to happen for TOPIC resources. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6878) NPE when querying global state store not in READY state
[ https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467708#comment-16467708 ] ASF GitHub Bot commented on KAFKA-6878: --- tedyu opened a new pull request #4978: KAFKA-6878 NPE when querying global state store not in READY state URL: https://github.com/apache/kafka/pull/4978 Check whether cache is null before retrieving from cache. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > NPE when querying global state store not in READY state > --- > > Key: KAFKA-6878 > URL: https://issues.apache.org/jira/browse/KAFKA-6878 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Salazar >Priority: Major > > Info: using kafka 1.1.0 - confluent 4.1.0 > We're trying to query a global state store, but if we query too quickly after > the application is started we get a > NullPointerException(CachingKeyValueStore.java:166). > We have verified the key is nonNull. > If we wait long enough (with our current amount of data 5 minutes after > start) we get an answer but before then we get NPEs. > > Looks like there isn't a check to ensure `initInternal` actually sets the > cache before you're allowed to use it in the get method > > stacktrace: > {code:java} > java.lang.NullPointerException: null > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112) > at > org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55) > at > io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6878) NPE when querying global state store not in READY state
[ https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467700#comment-16467700 ] Matthias J. Sax commented on KAFKA-6878: Feel free to assign the Jira to yourself and open a PR [~yuzhih...@gmail.com] > NPE when querying global state store not in READY state > --- > > Key: KAFKA-6878 > URL: https://issues.apache.org/jira/browse/KAFKA-6878 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Salazar >Priority: Major > > Info: using kafka 1.1.0 - confluent 4.1.0 > We're trying to query a global state store, but if we query too quickly after > the application is started we get a > NullPointerException(CachingKeyValueStore.java:166). > We have verified the key is nonNull. > If we wait long enough (with our current amount of data 5 minutes after > start) we get an answer but before then we get NPEs. > > Looks like there isn't a check to ensure `initInternal` actually sets the > cache before you're allowed to use it in the get method > > stacktrace: > {code:java} > java.lang.NullPointerException: null > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112) > at > org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55) > at > io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6878) NPE when querying global state store not in READY state
[ https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467689#comment-16467689 ] Ted Yu commented on KAFKA-6878: --- How about this: {code} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueSto index 45f606f..d1080f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -163,6 +163,9 @@ class CachingKeyValueStoreextends WrappedStateStore.AbstractStateStore im } private byte[] getInternal(final Bytes key) { +if (cache == null) { +return null; +} final LRUCacheEntry entry = cache.get(cacheName, key); if (entry == null) { final byte[] rawValue = underlying.get(key); {code} > NPE when querying global state store not in READY state > --- > > Key: KAFKA-6878 > URL: https://issues.apache.org/jira/browse/KAFKA-6878 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Salazar >Priority: Major > > Info: using kafka 1.1.0 - confluent 4.1.0 > We're trying to query a global state store, but if we query too quickly after > the application is started we get a > NullPointerException(CachingKeyValueStore.java:166). > We have verified the key is nonNull. > If we wait long enough (with our current amount of data 5 minutes after > start) we get an answer but before then we get NPEs. > > Looks like there isn't a check to ensure `initInternal` actually sets the > cache before you're allowed to use it in the get method > > stacktrace: > {code:java} > java.lang.NullPointerException: null > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112) > at > org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55) > at > io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-6878) NPE when querying global state store not in READY state
[ https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6878: -- Comment: was deleted (was: How about the following change ? {code} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKe index 45f606f..1ceb13b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -163,6 +163,9 @@ class CachingKeyValueStoreextends WrappedStateStore.AbstractStateStore im } private byte[] getInternal(final Bytes key) { +if (key == null) { +return null; +} final LRUCacheEntry entry = cache.get(cacheName, key); if (entry == null) { final byte[] rawValue = underlying.get(key); {code} Client can check against null and re-poll if null is encountered.) > NPE when querying global state store not in READY state > --- > > Key: KAFKA-6878 > URL: https://issues.apache.org/jira/browse/KAFKA-6878 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Salazar >Priority: Major > > Info: using kafka 1.1.0 - confluent 4.1.0 > We're trying to query a global state store, but if we query too quickly after > the application is started we get a > NullPointerException(CachingKeyValueStore.java:166). > We have verified the key is nonNull. > If we wait long enough (with our current amount of data 5 minutes after > start) we get an answer but before then we get NPEs. > > Looks like there isn't a check to ensure `initInternal` actually sets the > cache before you're allowed to use it in the get method > > stacktrace: > {code:java} > java.lang.NullPointerException: null > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112) > at > org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55) > at > io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6878) NPE when querying global state store not in READY state
[ https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467676#comment-16467676 ] Salazar commented on KAFKA-6878: Hi Ted, We did verify that the key is not null and the top layer class CompositeReadOnlyKeyValueStore already has an Objects.nonNull assertion in it that appropriately throws an NPE (as documented). In this scenario the cache is actually null b/c the initInternal has not completed/been called before the getInternal was called. > NPE when querying global state store not in READY state > --- > > Key: KAFKA-6878 > URL: https://issues.apache.org/jira/browse/KAFKA-6878 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Salazar >Priority: Major > > Info: using kafka 1.1.0 - confluent 4.1.0 > We're trying to query a global state store, but if we query too quickly after > the application is started we get a > NullPointerException(CachingKeyValueStore.java:166). > We have verified the key is nonNull. > If we wait long enough (with our current amount of data 5 minutes after > start) we get an answer but before then we get NPEs. > > Looks like there isn't a check to ensure `initInternal` actually sets the > cache before you're allowed to use it in the get method > > stacktrace: > {code:java} > java.lang.NullPointerException: null > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112) > at > org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55) > at > io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6884) ConsumerGroupCommand should use new AdminClient
Jason Gustafson created KAFKA-6884: -- Summary: ConsumerGroupCommand should use new AdminClient Key: KAFKA-6884 URL: https://issues.apache.org/jira/browse/KAFKA-6884 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Now that we have KIP-222, we should update consumer-groups.sh to use the new AdminClient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6883) KafkaShortnamer should allow to convert Kerberos principal name to upper case user name
[ https://issues.apache.org/jira/browse/KAFKA-6883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467207#comment-16467207 ] Attila Sasvari commented on KAFKA-6883: --- HADOOP-13984 is a similar issue. > KafkaShortnamer should allow to convert Kerberos principal name to upper case > user name > --- > > Key: KAFKA-6883 > URL: https://issues.apache.org/jira/browse/KAFKA-6883 > Project: Kafka > Issue Type: Improvement >Reporter: Attila Sasvari >Priority: Major > > KAFKA-5764 implemented support to convert Kerberos principal name to lower > case Linux user name via auth_to_local rules. > As a follow-up, KafkaShortnamer could be further extended to allow converting > principal names to uppercase by appending /U to the rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6883) KafkaShortnamer should allow to convert Kerberos principal name to upper case user name
Attila Sasvari created KAFKA-6883: - Summary: KafkaShortnamer should allow to convert Kerberos principal name to upper case user name Key: KAFKA-6883 URL: https://issues.apache.org/jira/browse/KAFKA-6883 Project: Kafka Issue Type: Improvement Reporter: Attila Sasvari KAFKA-5764 implemented support to convert Kerberos principal name to lower case Linux user name via auth_to_local rules. As a follow-up, KafkaShortnamer could be further extended to allow converting principal names to uppercase by appending /U to the rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-1614) Partition log directory name and segments information exposed via JMX
[ https://issues.apache.org/jira/browse/KAFKA-1614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467164#comment-16467164 ] Matthias Meerhof commented on KAFKA-1614: - I second [~cmpsoares] request. Is it possible to include this patch and make this information available out of the box? > Partition log directory name and segments information exposed via JMX > - > > Key: KAFKA-1614 > URL: https://issues.apache.org/jira/browse/KAFKA-1614 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.1.1 >Reporter: Alexander Demidko >Assignee: Alexander Demidko >Priority: Major > Attachments: log_segments_dir_jmx_info.patch > > > Makes partition log directory and single segments information exposed via > JMX. This is useful to: > - monitor disks usage in a cluster and on single broker > - calculate disk space taken by different topics > - estimate space to be freed when segments are expired > Patch attached. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()
[ https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467121#comment-16467121 ] Anurag Jain commented on KAFKA-2200: HI [~becket_qin], Please assign me this Jira story, I shall start working and updating you about it. Thanks, > kafkaProducer.send() should not call callback.onCompletion() > > > Key: KAFKA-2200 > URL: https://issues.apache.org/jira/browse/KAFKA-2200 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.0 >Reporter: Jiangjie Qin >Priority: Major > Labels: newbie > Fix For: 2.0.0 > > > KafkaProducer.send() should not call callback.onCompletion() because this > might break the callback firing order. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()
[ https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467022#comment-16467022 ] Jiangjie Qin commented on KAFKA-2200: - [~anurag_iit2k] Sure, please feel free to take this ticket. > kafkaProducer.send() should not call callback.onCompletion() > > > Key: KAFKA-2200 > URL: https://issues.apache.org/jira/browse/KAFKA-2200 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.0 >Reporter: Jiangjie Qin >Priority: Major > Labels: newbie > Fix For: 2.0.0 > > > KafkaProducer.send() should not call callback.onCompletion() because this > might break the callback firing order. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()
[ https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467020#comment-16467020 ] Jiangjie Qin commented on KAFKA-2200: - [~PnPie] Sorry about the belated response. Yes, you are right, I don't think the TimeoutException on waiting for metadata is an ApiException. That is a pure client side behavior. RecordTooLargeException is a little trickier, conceptually it is an ApiException, but the producer may throw it without talking to the server. I wonder if it is simpler to just throw an IllegalArgumentException when the records are too large in producer.send() if we want to make sure all the ApiExceptions are handled in the callback. > kafkaProducer.send() should not call callback.onCompletion() > > > Key: KAFKA-2200 > URL: https://issues.apache.org/jira/browse/KAFKA-2200 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.0 >Reporter: Jiangjie Qin >Priority: Major > Labels: newbie > Fix For: 2.0.0 > > > KafkaProducer.send() should not call callback.onCompletion() because this > might break the callback firing order. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6882) Wrong producer settings may lead to DoS on Kafka Server
Seweryn Habdank-Wojewodzki created KAFKA-6882: - Summary: Wrong producer settings may lead to DoS on Kafka Server Key: KAFKA-6882 URL: https://issues.apache.org/jira/browse/KAFKA-6882 Project: Kafka Issue Type: Bug Components: core, producer Affects Versions: 1.0.1, 1.1.0 Reporter: Seweryn Habdank-Wojewodzki The documentation of the following parameters “linger.ms” and “batch.size” is a bit confusing. In fact those parameters wrongly set on the producer side might completely destroy BROKER throughput. I see, that smart developers they are reading documentation of those parameters. Then they want to have super performance and super safety, so they set something like this below: {code} kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, 1); kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 0); {code} Then we have situation, when each and every message is send separately. TCP/IP protocol is really busy in that case and when they needed high throughput they got much less throughput, as every message is goes separately causing all network communication and TCP/IP overhead significant. Those settings are good only if someone sends critical messages like once a while (e.g. one message per minute) and not when throughput is important by sending thousands messages per second. Situation is even worse when smart developers are reading that for safety they need acknowledges from all cluster nodes. So they are adding: {code} kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all"); {code} And this is the end of Kafka performance! Even worse it is not a problem for the Kafka producer. The problem remains at the server (cluster, broker) side. The server is so busy by acknowledging *each and every* message from all nodes, that other work is NOT performed, so the end to end performance is almost none. I would like to ask you to improve documentation of this parameters. And consider corner cases is case of providing detailed information how extreme values of parameters - namely lowest and highest – may influence work of the cluster. This was documentation issue. On the other hand it is security/safetly matter. Technically the problem is that __commit_offsets topic is loaded with enormous amount of messages. It leads to the situation, when Kafka Broker is exposed to *DoS *due to the Producer settings. Three lines of code a bit load and the Kafka cluster is dead. I suppose there are ways to prevent such a situation on the cluster side, but it require some loginc to be implemented to detect such a simple but efficient DoS. BTW. Do Kafka Admin Tools provide any kind of "kill" connection, when one or the other producer makes problems? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6881) Kafka 1.1 Broker version crashes when deleting log
[ https://issues.apache.org/jira/browse/KAFKA-6881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466939#comment-16466939 ] K B Parthasarathy edited comment on KAFKA-6881 at 5/8/18 6:24 AM: -- The server was never restarted after Kafka was installed. Still we got this exception and Kafka broker crashed. was (Author: kbpartha): The server was never restarted after Kafka was installed. Still we got thsi exception and Kafka broker crashed. > Kafka 1.1 Broker version crashes when deleting log > -- > > Key: KAFKA-6881 > URL: https://issues.apache.org/jira/browse/KAFKA-6881 > Project: Kafka > Issue Type: Bug > Environment: Linux >Reporter: K B Parthasarathy >Priority: Critical > > Hello > We are running Kafka 1.1 version in Linux from past 3 weeks. Today Kafka > crashed. When we checked server.log file the following log was found > [2018-05-07 16:53:06,721] ERROR Failed to clean up log for > __consumer_offsets-24 in dir /tmp/kafka-logs due to IOException > (kafka.server.LogDirFailureChannel) > java.nio.file.NoSuchFileException: > /tmp/kafka-logs/__consumer_offsets-24/.log > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409) > at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) > at java.nio.file.Files.move(Files.java:1395) > at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) > at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) > at kafka.log.Log.asyncDeleteSegment(Log.scala:1601) > at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653) > at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648) > at scala.collection.immutable.List.foreach(List.scala:389) > at kafka.log.Log.replaceSegments(Log.scala:1648) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535) > at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462) > at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:389) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Suppressed: java.nio.file.NoSuchFileException: > /tmp/kafka-logs/__consumer_offsets-24/.log -> > /tmp/kafka-logs/__consumer_offsets-24/.log.deleted > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396) > at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) > at java.nio.file.Files.move(Files.java:1395) > at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694) > ... 16 more > [2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving > replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager) > [2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs > (kafka.log.LogManager) > [2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in > /tmp/kafka-logs have failed (kafka.log.LogManager) > > Please let me know what may be the issue > > Partha -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6881) Kafka 1.1 Broker version crashes when deleting log
[ https://issues.apache.org/jira/browse/KAFKA-6881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466939#comment-16466939 ] K B Parthasarathy commented on KAFKA-6881: -- The server was never restarted after Kafka was installed. Still we got thsi exception and Kafka broker crashed. > Kafka 1.1 Broker version crashes when deleting log > -- > > Key: KAFKA-6881 > URL: https://issues.apache.org/jira/browse/KAFKA-6881 > Project: Kafka > Issue Type: Bug > Environment: Linux >Reporter: K B Parthasarathy >Priority: Critical > > Hello > We are running Kafka 1.1 version in Linux from past 3 weeks. Today Kafka > crashed. When we checked server.log file the following log was found > [2018-05-07 16:53:06,721] ERROR Failed to clean up log for > __consumer_offsets-24 in dir /tmp/kafka-logs due to IOException > (kafka.server.LogDirFailureChannel) > java.nio.file.NoSuchFileException: > /tmp/kafka-logs/__consumer_offsets-24/.log > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409) > at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) > at java.nio.file.Files.move(Files.java:1395) > at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) > at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) > at kafka.log.Log.asyncDeleteSegment(Log.scala:1601) > at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653) > at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648) > at scala.collection.immutable.List.foreach(List.scala:389) > at kafka.log.Log.replaceSegments(Log.scala:1648) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535) > at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462) > at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:389) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Suppressed: java.nio.file.NoSuchFileException: > /tmp/kafka-logs/__consumer_offsets-24/.log -> > /tmp/kafka-logs/__consumer_offsets-24/.log.deleted > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396) > at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) > at java.nio.file.Files.move(Files.java:1395) > at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694) > ... 16 more > [2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving > replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager) > [2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs > (kafka.log.LogManager) > [2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in > /tmp/kafka-logs have failed (kafka.log.LogManager) > > Please let me know what may be the issue > > Partha -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466929#comment-16466929 ] Liju commented on KAFKA-6788: - Is this still open to be worked on ? if no one else is working on this I can take this up > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)