[jira] [Commented] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"
[ https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410894#comment-16410894 ] Jason Gustafson commented on KAFKA-6683: [~chema.sanchez] Thanks a lot for the logs. It really helped. We've merged a fix which will be included in 1.1. If you have time to try out the same scenario with the new RC when it is available, we'd appreciate it. > ReplicaFetcher crashes with "Attempted to complete a transaction which was > not started" > > > Key: KAFKA-6683 > URL: https://issues.apache.org/jira/browse/KAFKA-6683 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0 > Environment: os: GNU/Linux > arch: x86_64 > Kernel: 4.9.77 > jvm: OpenJDK 1.8.0 >Reporter: Chema Sanchez >Assignee: Jason Gustafson >Priority: Critical > Fix For: 1.1.0 > > Attachments: server.properties > > > We have been experiencing this issue lately when restarting or replacing > brokers of our Kafka clusters during maintenance operations. > Having restarted or replaced a broker, after some minutes performing normally > it may suddenly throw the following exception and stop replicating some > partitions: > {code:none} > 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > java.lang.IllegalArgumentException: Attempted to complete a transaction which > was not started > at > kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720) > at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540) > at > kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540) > at scala.collection.immutable.List.foreach(List.scala:389) > at > scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35) > at > scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35) > at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44) > at kafka.log.Log.loadProducersFromLog(Log.scala:540) > at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521) > at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514) > at scala.collection.Iterator.foreach(Iterator.scala:929) > at scala.collection.Iterator.foreach$(Iterator.scala:929) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1417) > at scala.collection.IterableLike.foreach(IterableLike.scala:71) > at scala.collection.IterableLike.foreach$(IterableLike.scala:70) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.log.Log.loadProducerState(Log.scala:514) > at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487) > at > scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.truncateTo(Log.scala:1467) > at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454) > at > kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) > at kafka.log.LogManager.truncateTo(LogManager.scala:445) > at > kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281) > at scala.collection.Iterator.foreach(Iterator.scala:929) > at scala.collection.Iterator.foreach$(Iterator.scala:929) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1417) > at scala.collection.IterableLike.foreach(IterableLike.scala:71) > at scala.collection.IterableLike.foreach$(IterableLike.scala:70) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > [2018-03-15 17:23:01
[jira] [Commented] (KAFKA-6705) producer.send() should be non-blocking
[ https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410876#comment-16410876 ] Dong Lin commented on KAFKA-6705: - [~ijuma] Thanks for the information. I will search for the previous discussion. Here is my thought before reading the previous discussion. I find it useful to block the request on the full buffer queue for up to max.block.ms. The original description is misleading in my proposal and it is updated now. But I am not sure we should block the producer.send() if metadata is unavailable. For example, say user wants producer.send() to return immediately and is OK to drop message if network speed is slower than the message generation rate. Currently user will have to set max.block.ms to achieve this. But that means the first few messages will always be lost due to unavailable metadata. There maybe ways to get around this depending on the use-case. For example we may argue that if user is OK to lose message then it is OK to drop the first few messages as well. But it still seems nicer to not block producer.send() if the buffer queue still has more space to hold new messages. > producer.send() should be non-blocking > -- > > Key: KAFKA-6705 > URL: https://issues.apache.org/jira/browse/KAFKA-6705 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently producer.send() may block on metadata for up to max.block.ms. This > behavior is well documented but it is a bit sub-optimal. Since we return a > future we should be able to make producer.send() completely non-blocking. One > idea is to simply insert the record into a global queue shared across all > partitions, and let the sender thread fetch record from this queue and send > to broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6705) producer.send() should be non-blocking
[ https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-6705: Description: Currently producer.send() may block on metadata for up to max.block.ms. This behavior is well documented but it is a bit sub-optimal. Since we return a future we should be able to make producer.send() completely non-blocking. One idea is to simply insert the record into a global queue shared across all partitions, and let the sender thread fetch record from this queue and send to broker. (was: Currently producer.send() may block on metadata or full buffer for up to max.block.ms. This behavior is well documented but it is a bit sub-optimal. Since we return a future we should be able to make producer.send() completely non-blocking. One idea is to simply insert the record into a global queue shared across all partitions, and let the sender thread fetch record from this queue and send to broker.) > producer.send() should be non-blocking > -- > > Key: KAFKA-6705 > URL: https://issues.apache.org/jira/browse/KAFKA-6705 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently producer.send() may block on metadata for up to max.block.ms. This > behavior is well documented but it is a bit sub-optimal. Since we return a > future we should be able to make producer.send() completely non-blocking. One > idea is to simply insert the record into a global queue shared across all > partitions, and let the sender thread fetch record from this queue and send > to broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"
[ https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-6683. Resolution: Fixed Fix Version/s: 1.1.0 > ReplicaFetcher crashes with "Attempted to complete a transaction which was > not started" > > > Key: KAFKA-6683 > URL: https://issues.apache.org/jira/browse/KAFKA-6683 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0 > Environment: os: GNU/Linux > arch: x86_64 > Kernel: 4.9.77 > jvm: OpenJDK 1.8.0 >Reporter: Chema Sanchez >Assignee: Jason Gustafson >Priority: Critical > Fix For: 1.1.0 > > Attachments: server.properties > > > We have been experiencing this issue lately when restarting or replacing > brokers of our Kafka clusters during maintenance operations. > Having restarted or replaced a broker, after some minutes performing normally > it may suddenly throw the following exception and stop replicating some > partitions: > {code:none} > 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > java.lang.IllegalArgumentException: Attempted to complete a transaction which > was not started > at > kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720) > at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540) > at > kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540) > at scala.collection.immutable.List.foreach(List.scala:389) > at > scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35) > at > scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35) > at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44) > at kafka.log.Log.loadProducersFromLog(Log.scala:540) > at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521) > at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514) > at scala.collection.Iterator.foreach(Iterator.scala:929) > at scala.collection.Iterator.foreach$(Iterator.scala:929) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1417) > at scala.collection.IterableLike.foreach(IterableLike.scala:71) > at scala.collection.IterableLike.foreach$(IterableLike.scala:70) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.log.Log.loadProducerState(Log.scala:514) > at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487) > at > scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.truncateTo(Log.scala:1467) > at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454) > at > kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) > at kafka.log.LogManager.truncateTo(LogManager.scala:445) > at > kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281) > at scala.collection.Iterator.foreach(Iterator.scala:929) > at scala.collection.Iterator.foreach$(Iterator.scala:929) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1417) > at scala.collection.IterableLike.foreach(IterableLike.scala:71) > at scala.collection.IterableLike.foreach$(IterableLike.scala:70) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > [2018-03-15 17:23:01,497] INFO [ReplicaFetcher replicaId=12, leaderId=10, > fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread) > {code} > As during system updates all brokers in a cluster are restarted, it happened > some times the issue to man
[jira] [Commented] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"
[ https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410800#comment-16410800 ] ASF GitHub Bot commented on KAFKA-6683: --- ijuma closed pull request #4755: KAFKA-6683; Ensure producer state not mutated prior to append URL: https://github.com/apache/kafka/pull/4755 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index cc693375079..de4bb2912e9 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -166,6 +166,8 @@ class Log(@volatile var dir: File, import kafka.log.Log._ + this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] " + /* A lock that guards all modifications to the log */ private val lock = new Object // The memory mapped buffer for index files of this log will be closed for index files of this log will be closed with either delete() or closeHandlers() @@ -242,8 +244,8 @@ class Log(@volatile var dir: File, loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile) -info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms" - .format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs)) +info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " + + s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms") } private val tags = { @@ -457,21 +459,20 @@ class Log(@volatile var dir: File, val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator while (unflushed.hasNext) { val segment = unflushed.next -info("Recovering unflushed segment %d in log %s.".format(segment.baseOffset, name)) +info(s"Recovering unflushed segment ${segment.baseOffset}") val truncatedBytes = try { recoverSegment(segment, Some(_leaderEpochCache)) } catch { case _: InvalidOffsetException => val startOffset = segment.baseOffset - warn("Found invalid offset during recovery for log " + dir.getName + ". Deleting the corrupt segment and " + -"creating an empty one with starting offset " + startOffset) + warn("Found invalid offset during recovery. Deleting the corrupt segment and " + +s"creating an empty one with starting offset $startOffset") segment.truncateTo(startOffset) } if (truncatedBytes > 0) { // we had an invalid message, delete all remaining log - warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name, -segment.readNextOffset)) + warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}") unflushed.foreach(deleteSegment) } } @@ -483,8 +484,7 @@ class Log(@volatile var dir: File, private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized { checkIfMemoryMappedBufferClosed() val messageFormatVersion = config.messageFormatVersion.messageFormatVersion.value -info(s"Loading producer state from offset $lastOffset for partition $topicPartition with message " + - s"format version $messageFormatVersion") +info(s"Loading producer state from offset $lastOffset with message format version $messageFormatVersion") // We want to avoid unnecessary scanning of the log to build the producer state when the broker is being // upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case, @@ -571,7 +571,7 @@ class Log(@volatile var dir: File, * The memory mapped buffer for index files of this log will be left open until the log is deleted. */ def close() { -debug(s"Closing log $name") +debug("Closing log") lock synchronized { checkIfMemoryMappedBufferClosed() maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") { @@ -610,7 +610,7 @@ class Log(@volatile var dir: File, * Close file handlers used by log but don't write to disk. This is called if the log directory is offline */ def closeHandlers() { -debug(s"Closing handlers of log $name") +debug("Closing handlers") lock synchronized { logSegments.foreach(_.closeHandlers()) isMemoryMappedBufferClosed = true @@ -775,8 +775,10 @@ class Log(@volatile var d
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410797#comment-16410797 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4761: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4761 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 i
[jira] [Updated] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6127: --- Description: Streams uses three consumer APIs that can block infinite: {{commitSync()}}, {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. If we block within one operation, the whole {{StreamThread}} would block, and the instance does not make any progress, becomes unresponsive (for example, {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer group. We might consider to use {{wakeup()}} calls to unblock those operations to keep {{StreamThread}} in a responsive state. Note: there are discussion to add timeout to those calls, and thus, we could get {{TimeoutExceptions}}. This would be easier to handle than using {{wakeup()}}. Thus, we should keep an eye on those discussions. was: Streams uses three consumer APIs that can block infinite: {{commitSync()}}, {{committed()}}, and {{position()}}. If we block within one operation, the whole {{StreamThread}} would block, and the instance does not make any progress, becomes unresponsive (for example, {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer group. We might consider to use {{wakeup()}} calls to unblock those operations to keep {{StreamThread}} in a responsive state. Note: there are discussion to add timeout to those calls, and thus, we could get {{TimeoutExceptions}}. This would be easier to handle than using {{wakeup()}}. Thus, we should keep an eye on those discussions. > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Major > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. > Also {{KafkaProducer#send()}} can block. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > We might consider to use {{wakeup()}} calls to unblock those operations to > keep {{StreamThread}} in a responsive state. > Note: there are discussion to add timeout to those calls, and thus, we could > get {{TimeoutExceptions}}. This would be easier to handle than using > {{wakeup()}}. Thus, we should keep an eye on those discussions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6127: --- Description: Streams uses three consumer APIs that can block infinite: {{commitSync()}}, {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. If we block within one operation, the whole {{StreamThread}} would block, and the instance does not make any progress, becomes unresponsive (for example, {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer group. We might consider to use {{wakeup()}} calls to unblock those operations to keep {{StreamThread}} in a responsive state. Note: there are discussion to add timeout to those calls, and thus, we could get {{TimeoutExceptions}}. This would be easier to handle than using {{wakeup()}}. Thus, we should keep an eye on those discussions. was: Streams uses three consumer APIs that can block infinite: {{commitSync()}}, {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. If we block within one operation, the whole {{StreamThread}} would block, and the instance does not make any progress, becomes unresponsive (for example, {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer group. We might consider to use {{wakeup()}} calls to unblock those operations to keep {{StreamThread}} in a responsive state. Note: there are discussion to add timeout to those calls, and thus, we could get {{TimeoutExceptions}}. This would be easier to handle than using {{wakeup()}}. Thus, we should keep an eye on those discussions. > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Major > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > We might consider to use {{wakeup()}} calls to unblock those operations to > keep {{StreamThread}} in a responsive state. > Note: there are discussion to add timeout to those calls, and thus, we could > get {{TimeoutExceptions}}. This would be easier to handle than using > {{wakeup()}}. Thus, we should keep an eye on those discussions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6705) producer.send() should be non-blocking
[ https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410765#comment-16410765 ] Ismael Juma commented on KAFKA-6705: [~lindong], there's a long-standing Jira about this, it may be worth searching for it. One reason why the current interface blocks is to provide back-pressure in case you have exhausted the buffer pool. If `send` is completely non-blocking, then this would no longer be possible. In any case, feel free to add your thoughts on the original JIRA. > producer.send() should be non-blocking > -- > > Key: KAFKA-6705 > URL: https://issues.apache.org/jira/browse/KAFKA-6705 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently producer.send() may block on metadata or full buffer for up to > max.block.ms. This behavior is well documented but it is a bit sub-optimal. > Since we return a future we should be able to make producer.send() completely > non-blocking. One idea is to simply insert the record into a global queue > shared across all partitions, and let the sender thread fetch record from > this queue and send to broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6685) Connect deserialization log message should distinguish key from value
[ https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jagadesh Adireddi reassigned KAFKA-6685: Assignee: Jagadesh Adireddi > Connect deserialization log message should distinguish key from value > - > > Key: KAFKA-6685 > URL: https://issues.apache.org/jira/browse/KAFKA-6685 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yeva Byzek >Assignee: Jagadesh Adireddi >Priority: Minor > Labels: newbie > > Connect was configured for Avro key and value but data had String key and > Avro value. The resulting error message was misleading because it didn't > distinguish key from value, and so I was chasing problems with the value > instead of the key. > tl;dr Connect should at least tell you whether the problem is with > deserializing the key or value of a record > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6705) producer.send() should be non-blocking
[ https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410672#comment-16410672 ] Dong Lin commented on KAFKA-6705: - Ping [~ijuma] and [~jasong35] for comments. > producer.send() should be non-blocking > -- > > Key: KAFKA-6705 > URL: https://issues.apache.org/jira/browse/KAFKA-6705 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently producer.send() may block on metadata or full buffer for up to > max.block.ms. This behavior is well documented but it is a bit sub-optimal. > Since we return a future we should be able to make producer.send() completely > non-blocking. One idea is to simply insert the record into a global queue > shared across all partitions, and let the sender thread fetch record from > this queue and send to broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6705) producer.send() should be non-blocking
Dong Lin created KAFKA-6705: --- Summary: producer.send() should be non-blocking Key: KAFKA-6705 URL: https://issues.apache.org/jira/browse/KAFKA-6705 Project: Kafka Issue Type: Improvement Reporter: Dong Lin Assignee: Dong Lin Currently producer.send() may block on metadata or full buffer for up to max.block.ms. This behavior is well documented but it is a bit sub-optimal. Since we return a future we should be able to make producer.send() completely non-blocking. One idea is to simply insert the record into a global queue shared across all partitions, and let the sender thread fetch record from this queue and send to broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410621#comment-16410621 ] Guozhang Wang commented on KAFKA-6474: -- Filipe would like you to take a look at https://github.com/apache/kafka/pull/4760. > Rewrite test to use new public TopologyTestDriver > - > > Key: KAFKA-6474 > URL: https://issues.apache.org/jira/browse/KAFKA-6474 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Filipe Agapito >Priority: Major > Labels: beginner, newbie > > With KIP-247 we added public TopologyTestDriver. We should rewrite out own > test to use this new test driver and remove the two classes > ProcessorTopoogyTestDriver and KStreamTestDriver. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3591) JmxTool should exit out if a provided query matches no values
[ https://issues.apache.org/jira/browse/KAFKA-3591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410575#comment-16410575 ] Guozhang Wang commented on KAFKA-3591: -- This is fixed as part of KAFKA-6611. > JmxTool should exit out if a provided query matches no values > - > > Key: KAFKA-3591 > URL: https://issues.apache.org/jira/browse/KAFKA-3591 > Project: Kafka > Issue Type: Improvement > Components: tools >Affects Versions: 0.9.0.0 >Reporter: Harsh J >Priority: Trivial > Fix For: 1.2.0 > > > Running {{kafka.tools.JmxTool}} with an invalid query, such as > {{--object-name "foobar"}} would produce just "time" field outputs given no > such object. If there are no matched objects when a query has been explicitly > provided, it should exit out instead of just printing the time data. > (We should not exit if no filter is provided though, i.e. if the KAFKA-2278 > feature is used) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6611) Re-write simple benchmark in system tests with JMXTool
[ https://issues.apache.org/jira/browse/KAFKA-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410573#comment-16410573 ] ASF GitHub Bot commented on KAFKA-6611: --- guozhangwang closed pull request #4650: KAFKA-6611: PART I, Use JMXTool in SimpleBenchmark URL: https://github.com/apache/kafka/pull/4650 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 4a6a348d9a6..27e46319e49 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -55,12 +55,17 @@ object JmxTool extends Logging { .withRequiredArg .describedAs("name") .ofType(classOf[String]) -val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats.") +val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " + + "Value of -1 equivalent to setting one-time to true") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(2000) -val helpOpt = parser.accepts("help", "Print usage information.") +val oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.") + .withRequiredArg + .describedAs("one-time") + .ofType(classOf[java.lang.Boolean]) + .defaultsTo(false) val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + "See java.text.SimpleDateFormat for options.") .withRequiredArg @@ -72,8 +77,15 @@ object JmxTool extends Logging { .describedAs("service-url") .ofType(classOf[String]) .defaultsTo("service:jmx:rmi:///jndi/rmi://:/jmxrmi") +val reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ") + .withRequiredArg + .describedAs("report-format") + .ofType(classOf[java.lang.String]) + .defaultsTo("original") val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " + "Only supported when the list of objects is non-empty and contains no object name patterns.") +val helpOpt = parser.accepts("help", "Print usage information.") + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.") @@ -87,12 +99,16 @@ object JmxTool extends Logging { val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt)) val interval = options.valueOf(reportingIntervalOpt).intValue +var oneTime = interval < 0 || options.has(oneTimeOpt) val attributesWhitelistExists = options.has(attributesOpt) -val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",")) else None +val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",").filterNot(_.equals(""))) else None val dateFormatExists = options.has(dateFormatOpt) val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None val wait = options.has(waitOpt) +val reportFormat = parseFormat(options.valueOf(reportFormatOpt).toLowerCase) +val reportFormatOriginal = reportFormat.equals("original") + var jmxc: JMXConnector = null var mbsc: MBeanServerConnection = null var connected = false @@ -150,33 +166,57 @@ object JmxTool extends Logging { val numExpectedAttributes: Map[ObjectName, Int] = if (attributesWhitelistExists) -queries.map((_, attributesWhitelist.get.size)).toMap +queries.map((_, attributesWhitelist.get.length)).toMap else { names.map{(name: ObjectName) => val mbean = mbsc.getMBeanInfo(name) (name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap } +if(numExpectedAttributes.isEmpty) { + CommandLineUtils.printUsageAndDie(parser, s"No matched attributes for the queried objects $queries.") +} + // print csv header val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted -if(keys.size == numExpectedAttributes.values.sum + 1) +if(reportFormatOriginal && keys.size == numExpectedAttributes.values.sum + 1) { println(keys.map("\"" + _ + "\"").mkString(",")) +} -while(true) { +var keepGoing = true +while (keepGoing) { val start = System.currentTimeMillis val att
[jira] [Commented] (KAFKA-6685) Connect deserialization log message should distinguish key from value
[ https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410544#comment-16410544 ] Matthias J. Sax commented on KAFKA-6685: Done. [~adireddijagad...@gmail.com] You can assign Jiras to yourself now. > Connect deserialization log message should distinguish key from value > - > > Key: KAFKA-6685 > URL: https://issues.apache.org/jira/browse/KAFKA-6685 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yeva Byzek >Priority: Minor > Labels: newbie > > Connect was configured for Avro key and value but data had String key and > Avro value. The resulting error message was misleading because it didn't > distinguish key from value, and so I was chasing problems with the value > instead of the key. > tl;dr Connect should at least tell you whether the problem is with > deserializing the key or value of a record > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410534#comment-16410534 ] Guozhang Wang commented on KAFKA-6535: -- I'd vote to still start a KIP though it is a very small one, as it is still a public change and for people who're relying on the default retention it would be less of a surprise. > Set default retention ms for Streams repartition topics to Long.MAX_VALUE > - > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Khaireddine Rezgui >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. > More specifically, in {{RepartitionTopicConfig}} we have a few default > overrides for repartition topic configs, we should just add the override for > {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still > allows users to override themselves if they want via > {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this > update takes effect. > In addition to the code change, we also need to have doc changes in > streams/upgrade_guide.html specifying this default value change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6685) Connect deserialization log message should distinguish key from value
[ https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410527#comment-16410527 ] Randall Hauch commented on KAFKA-6685: -- [~adireddijagad...@gmail.com], please ask the Kafka committers to add you to the contributor list. See http://kafka.apache.org/contributing.html for details. Then you will be able to assign this to yourself. Feel free to submit a PR, per those guidelines. For example, the PR title and commits should begin with "KAFKA-6685: " followed by a meaningful one-sentence summary. > Connect deserialization log message should distinguish key from value > - > > Key: KAFKA-6685 > URL: https://issues.apache.org/jira/browse/KAFKA-6685 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yeva Byzek >Priority: Minor > Labels: newbie > > Connect was configured for Avro key and value but data had String key and > Avro value. The resulting error message was misleading because it didn't > distinguish key from value, and so I was chasing problems with the value > instead of the key. > tl;dr Connect should at least tell you whether the problem is with > deserializing the key or value of a record > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410522#comment-16410522 ] John Roesler commented on KAFKA-6437: - I sort of feel like the existing behavior is better than failing, and [~k1th] was mostly complaining of a visibility problem, so it would be best to just address the visibility of this situation. Logging a warning seems appropriate. It's certainly not an error. It could also be info. I guess someone with more context could advise as to whether it seems like a warn vs. info. A log message in conjunction with a lag metric (I'm planning to submit a KIP for that...) would really have prevented this from biting them, so maybe we just focus on exposing better information rather than failing fast in this case. > Streams does not warn about missing input topics, but hangs > --- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 > Environment: Single client on single node broker >Reporter: Chris Schwarzfischer >Priority: Minor > Labels: newbie > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the middle" of the topology (at …9, see below). Only parts of > the intermediate topics are created (up to …9) > When the missing input topic is created, the streams application resumes > processing. > {noformat} > Topology: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-11: > topics: > [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] > children: [KTABLE-AGGREGATE-12] > KTABLE-AGGREGATE-12: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KTABLE-TOSTREAM-20] > KTABLE-TOSTREAM-20: > children: [KSTREAM-SINK-21] > KSTREAM-SINK-21: > topic: data_udr_month_customer_aggregration > KSTREAM-SOURCE-17: > topics: > [mystreams_app-KSTREAM-MAP-14-repartition] > children: [KSTREAM-LEFTJOIN-18] > KSTREAM-LEFTJOIN-18: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KSTREAM-SINK-19] > KSTREAM-SINK-19: > topic: data_UDR_joined > Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, > mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] > {noformat} > *Why this matters* > The applications does quite a lot of preprocessing before joining with the > missing input topic. This preprocessing won't happen without the topic, > creating a huge backlog of data. > *Fix* > Issue an `warn` or `error` level message at start to inform about the missing > topic and it's consequences. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6659) Improve error message if state store is not found
[ https://issues.apache.org/jira/browse/KAFKA-6659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410517#comment-16410517 ] ASF GitHub Bot commented on KAFKA-6659: --- mjsax closed pull request #4732: KAFKA-6659: Improve error message if state store is not found URL: https://github.com/apache/kafka/pull/4732 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 3761bfb0ee6..44a25c1213f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -72,7 +72,13 @@ public StateStore getStateStore(final String name) { } if (!currentNode().stateStores.contains(name)) { -throw new StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + name); +throw new StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + name + +" as the store is not connected to the processor. If you add stores manually via '.addStateStore()' " + +"make sure to connect the added store to the processor by providing the processor name to " + +"'.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. " + +"DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' " + +"to connect the store to the corresponding operator. If you do not add stores manually, " + +"please file a bug report at https://issues.apache.org/jira/projects/KAFKA.";); } return stateManager.getStore(name); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve error message if state store is not found > - > > Key: KAFKA-6659 > URL: https://issues.apache.org/jira/browse/KAFKA-6659 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Stuart Perks >Priority: Trivial > Labels: beginner, easy-fix, newbie > > If a processor tries to access a store but the store is not connected to the > processor, Streams fails with > {quote}Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: > Invalid topology building: Processor KSTREAM-TRANSFORM-36 has no > access to StateStore questions-awaiting-answers-store > {quote} > We should improve this error message and give a hint to the user how to fix > the issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410509#comment-16410509 ] John Roesler commented on KAFKA-6535: - It seems unlikely that there might be folks who just use the default and are adversely affected by the new default, so I'd "vote" not to bother with a KIP. But I'm also new in town... > Set default retention ms for Streams repartition topics to Long.MAX_VALUE > - > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Khaireddine Rezgui >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. > More specifically, in {{RepartitionTopicConfig}} we have a few default > overrides for repartition topic configs, we should just add the override for > {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still > allows users to override themselves if they want via > {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this > update takes effect. > In addition to the code change, we also need to have doc changes in > streams/upgrade_guide.html specifying this default value change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410487#comment-16410487 ] Gunnar Morling commented on KAFKA-3821: --- Hey [~ewencp], thanks for your extensive reply! I agree this discussion shouldn't be centered around saving or not saving that one allocation. I just added this to point out that the receiver approach leaves Kafka Connect more freedom in terms of implementing this. I.e. that way, in theory, you could even do crazy things such as storing incoming records off-heap. But yeah, most likely you'd just use a list yourselves. But that's not why I was suggesting this. The more substantial advantage I see is that it allows to add further methods down the road without breaking existing implementors of this contract. [~rhauch] e.g. mentioned methods for TX handling. Assuming these calls must be done in the correct ordering when submitting records, such methods couldn't really be added to {{ConnectorSourceTask}} itself. It's the same for offset handling which is the original use case we're after here. On the offset handling itself, there's two aspects to this. One is that we'd like to submit an offset once a snapshot is completed. Currently, we're doing what you described with {quote} would another option be having Debezium read 1 record forward to determine before returning the record and/or constructing its offset whether this is the final record of the snapshot {quote} But this leads back to complexity of this contract for implementors. Having a dedicated way for submitting the offset once the snapshot is done is arguably simpler to implement (and reason about when reading the code) than doing this "read forward" and delayed processing of it. In terms of uniqueness of offsets, they _are_ unique also during snapshotting (binlog position), but we need a way to say that a snapshot has been completed. I see though how that could be considered as a "style thing" mostly. There's another case though where we'd benefit - substantially - from being able to submit offsets explicitly. This is where a connector supports whitelisting of captured tables and no changes are done to the whitelisted tables in a while (a [comment above|https://issues.apache.org/jira/browse/KAFKA-3821?focusedCommentId=15973506&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15973506] touches on this). In this case the offset in the event source progresses (e.g. MySQL's binlog position), but as no record is of interest as per the whitelist config, we have no way to ever submit these offsets. Then, after a connector restart, we'd be forced to re-read much larger portions of the binlog than actually required. We currently work around this in Debezium by regularly emitting records to a heartbeat topic. This allows us to submit these offsets, also if no changes to the whitelisted tables are applied. Now the original idea above was to emit specific subclasses of {{SourceRecord}} for such "offset-only" records, but I wanted to bring up the receiver parameter idea because a) it feels less hackish to me and b) opens the door for further API additions as described above. I hope this makes sense; what's proposed is the best coming to my mind right for the issues we try resolve (while keeping the API reasonably abstract so to cater for other use cases, too). Happy about any alternative proposals of course. Thanks for this great discussion! > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6696) Trogdor should support destroying tasks
[ https://issues.apache.org/jira/browse/KAFKA-6696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410366#comment-16410366 ] ASF GitHub Bot commented on KAFKA-6696: --- cmccabe opened a new pull request #4759: KAFKA-6696 Trogdor should support destroying tasks URL: https://github.com/apache/kafka/pull/4759 KAFKA-6696 Trogdor should support destroying tasks Implement destroying tasks and workers. This means erasing all record of them on the Coordinator and the Agent. Workers should be identified by unique 64-bit worker IDs, rather than by the names of the tasks they are implementing. This ensures that when a task is destroyed and re-created with the same task ID, the old workers will be not be treated as part of the new task instance. Fix some return results from RPCs. In some cases RPCs were returning values that were never used. Attempting to re-create the same task ID with different arguments should fail. Add RequestConflictException to represent HTTP error code 409 (CONFLICT) for this scenario. If only one worker in a task stops, don't stop all the other workers for that task, unless the worker that stopped had an error. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Trogdor should support destroying tasks > --- > > Key: KAFKA-6696 > URL: https://issues.apache.org/jira/browse/KAFKA-6696 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > > Trogdor should support destroying tasks. This will make it more practical to > have very long running Trogdor instances. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6704) Checking hasNext from SegementIterator could throw InvalidStateStoreException
Bill Bejeck created KAFKA-6704: -- Summary: Checking hasNext from SegementIterator could throw InvalidStateStoreException Key: KAFKA-6704 URL: https://issues.apache.org/jira/browse/KAFKA-6704 Project: Kafka Issue Type: Bug Components: streams Reporter: Bill Bejeck Assignee: Bill Bejeck Fix For: 1.2.0 When using Interactive Queries to do a range query on a Windowed store ,{{SegmentIterator.hasNext}} could throw an {{InvalidStateStoreException}} if the {{StreamThread}} has closed the store. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410162#comment-16410162 ] John Roesler commented on KAFKA-6474: - Yeah, thanks [~h314to]! I had tried it with just half of that change, and it obviously didn't work. I'm really glad you figured that out! FYI, there are other sub-projects in kafka that are going to follow this pattern, so I'm going to start a dev mailing list discussion to share your pattern. Also, for completeness, I had to augment your build.gradle patch slightly to get './gradlew streams:test-utils:test' to pass: {noformat} diff --git a/build.gradle b/build.gradle index 5e4c35643..2f38bf28b 100644 --- a/build.gradle +++ b/build.gradle @@ -921,6 +921,7 @@ project(':streams') { testCompile project(':clients').sourceSets.test.output testCompile project(':core') testCompile project(':core').sourceSets.test.output + testCompile project(':streams:test-utils').sourceSets.main.output testCompile libs.junit testCompile libs.easymock testCompile libs.bcpkix @@ -965,11 +966,12 @@ project(':streams:test-utils') { archivesBaseName = "kafka-streams-test-utils" dependencies { - compile project(':streams') + compile project(':streams').sourceSets.main.output compile project(':clients') testCompile project(':clients').sourceSets.test.output testCompile libs.junit + testCompile libs.rocksDBJni testRuntime libs.slf4jlog4j }{noformat} The reason is that we are skipping :streams:copyDependantLibs during test-utils:compile now (to avoid the circular dependency), so we have to explictly depend in testCompile on any libs that would have been transitively pulled in from :streams (and are in the test's run-time code path). In case you run into a similar error when you run the full test suite, the error I saw was: {noformat} java.lang.ClassNotFoundException: org.rocksdb.RocksDBException{noformat} > Rewrite test to use new public TopologyTestDriver > - > > Key: KAFKA-6474 > URL: https://issues.apache.org/jira/browse/KAFKA-6474 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Filipe Agapito >Priority: Major > Labels: beginner, newbie > > With KIP-247 we added public TopologyTestDriver. We should rewrite out own > test to use this new test driver and remove the two classes > ProcessorTopoogyTestDriver and KStreamTestDriver. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6703) MirrorMaker cannot make progress when any matched topic from a whitelist regexp has -1 leader
[ https://issues.apache.org/jira/browse/KAFKA-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Attila Sasvari updated KAFKA-6703: -- Description: Scenario: - MM whitelabel regexp matches multiple topics - destination cluster has 5 brokers with multiple topics replication factor 3 - without partition reassign shut down 2 brokers - suppose a topic has no leader any more because it was off-sync and the leader and the rest of the replicas are hosted on the downed brokers. - so we have 1 topic with some partitions with leader -1 - the rest of the matching topics has 3 replicas with leaders MM will not produce into any of the matched topics until: - the "orphaned" topic removed or - the partition reassign carried out from the downed brokers (suppose you can turn these back on) In the MirrorMaker logs, there are a lot of messages like the following ones: {code} [2018-03-22 19:55:32,522] DEBUG [Consumer clientId=consumer-1, groupId=console-consumer-43054] Coordinator discovery failed, refreshing metadata (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 19:55:32,522] DEBUG [Consumer clientId=consumer-1, groupId=console-consumer-43054] Sending metadata request (type=MetadataRequest, topics=) to node 192.168.1.102:9092 (id: 0 rack: null) (org.apache.kafka.clients.NetworkClient) [2018-03-22 19:55:32,525] DEBUG Updated cluster metadata version 10 to Cluster(id = Y-qtoFP-RMq2uuVnkEKAAw, nodes = [192.168.1.102:9092 (id: 0 rack: null)], partitions = [Partition(topic = testR1P2, partition = 1, leader = none, replicas = [42], isr = [], offlineReplicas = [42]), Partition(topic = testR1P1, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = testAlive, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = testERRR, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = testR1P2, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])]) (org.apache.kafka.clients.Metadata) [2018-03-22 19:55:32,525] DEBUG [Consumer clientId=consumer-1, groupId=console-consumer-43054] Sending FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 19:55:32,525] DEBUG [Consumer clientId=consumer-1, groupId=console-consumer-43054] Received FindCoordinator response ClientResponse(receivedTimeMs=1521744932525, latencyMs=0, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=19), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 19:55:32,526] DEBUG [Consumer clientId=consumer-1, groupId=console-consumer-43054] Group coordinator lookup failed: The coordinator is not available. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) {code} Interestingly, if MirrorMaker uses {{zookeeper.connect}} in its consumer properties file, then an OldConsumer is created, and it can make progress. was: Scenario: - MM whitelabel regexp matches multiple topics - destination cluster has 5 brokers with multiple topics replication factor 3 - without partition reassign shut down 2 brokers - suppose a topic has no leader any more because it was off-sync and the leader and the rest of the replicas are hosted on the downed brokers. - so we have 1 topic with some partitions with leader -1 - the rest of the matching topics has 3 replicas with leaders MM will not produce into any of the matched topics until: - the "orphaned" topic removed or - the partition reassign carried out from the downed brokers (suppose you can turn these back on) In the MirrorMaker logs, there are a lot of messages like the following ones: {code} [2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-1, groupId=1] Sending FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-0, groupId=1] Sending FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Received FindCoordinator response ClientResponse(receivedTimeMs=1521741547782, latencyMs=1, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=1-0, correlationId=71), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 18:59:
[jira] [Comment Edited] (KAFKA-6679) Random corruption (CRC validation issues)
[ https://issues.apache.org/jira/browse/KAFKA-6679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410079#comment-16410079 ] Ari Uka edited comment on KAFKA-6679 at 3/22/18 6:36 PM: - So I attempted to re-generate one of the topics we had and I've been able to actually find a corrupt message in the `.log` file. There exists 3 brokers: kafka-01, kafka-02, kafka-03 and the topic influxdb-telemetry which contains 161,760,969 (161.7M) messages. The topic description: {noformat} Topic:influxdb-telemetry PartitionCount:6 ReplicationFactor:3 Configs: Topic: influxdb-telemetry Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2 Topic: influxdb-telemetry Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 3 Topic: influxdb-telemetry Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 1 Topic: influxdb-telemetry Partition: 3 Leader: 2 Replicas: 2,3,1 Isr: 2 Topic: influxdb-telemetry Partition: 4 Leader: 3 Replicas: 3,1,2 Isr: 3 Topic: influxdb-telemetry Partition: 5 Leader: 1 Replicas: 1,2,3 Isr: 1{noformat} After inserting only 71,235 messages, influxdb-telemetry-0 becomes corrupt on kafka-02 and kafka-01 starts to complain: {noformat} [2018-03-22 17:00:00,690] ERROR [ReplicaManager broker=2] Error processing fetch operation on partition influxdb-telemetry-0, offset 71236 (kafka.server.ReplicaManager) {noformat} The last good written RecordSet looks like this: {noformat} baseOffset: 71233 lastOffset: 71235 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 10285560 CreateTime: -1 isvalid: true size: 471 magic: 2 compresscodec: NONE crc: 491186814{noformat} The header of this RecordSet looks like so: $ hd -s 10285560 -n 471 /var/db/kafka/influxdb-telemetry-0/.log {noformat} 009ce30d 00 00 00 00 00 01 16 27 00 00 01 4b 00 00 00 00 |...'...K| 009ce31d 02 ee 03 fd 04 00 00 00 00 00 01 ff ff ff ff ff || 009ce32d ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff || 009ce33d ff ff ff ff ff ff ff ff ff 00 00 00 02 a8 02 00 || 009ce34d 00 00 01 9a 02 00 8b 09 89 54 4f d9 dd 82 b2 14 |.TO.| {noformat} If we're looking at the log files on `kafka-01` (who is trying to replicate), it hasn't replicated past message 71235 and it's just sitting there complaining. If we go on the leader of this partition `kafka-02` and dump the next message after the good message, it's indeed corrupt, this is what it looks like: $ hd -s 10286031 -n 471 /var/db/kafka/influxdb-telemetry-0/.log {noformat} 009cf3cf 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 || * 009cf59f {noformat} The asterisk ( * ) here means that every line is the same, so basically message 71,236 contains 471 zeroes. I picked 471 arbitrarily seen I don't actually know how big the RecordSet is for the next Record. If we try to dump `influxdb-telemetry-0` on the leader (kafka-02), it barfs and ends early because of this exception: {noformat} $ ./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files /var/db/kafka/influxdb-telemetry-0/.log | tail -n 5 Exception in thread "main" org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller than minimum record overhead (14). {noformat} So it only tails these messages when in reality the log is 1GB and message 71,236 is only about 10MB through the file. {noformat} offset: 71231 position: 10283251 CreateTime: -1 isvalid: true keysize: -1 valuesize: 130 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] offset: 71232 position: 10283251 CreateTime: -1 isvalid: true keysize: -1 valuesize: 125 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] offset: 71233 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 valuesize: 130 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] offset: 71234 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 valuesize: 126 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] offset: 71235 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 valuesize: 127 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] {noformat} What could cause kafka to dump 471 zeroes for a RecordSet? was (Author: ari6123): So I attempted to re-generate one of the topics we had and I've been able to actually find a corrupt message in the `.log` file. There exists 3 brokers: kafka-01, kafka-02, kafka-03 and the topic influxdb-telemetry which contains 161,760,969 (161.7M) messages. The topic description:
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410082#comment-16410082 ] Matthias J. Sax commented on KAFKA-6535: This ticket is labels as "need kip" -- not sure about this. Thoughts? > Set default retention ms for Streams repartition topics to Long.MAX_VALUE > - > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Khaireddine Rezgui >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. > More specifically, in {{RepartitionTopicConfig}} we have a few default > overrides for repartition topic configs, we should just add the override for > {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still > allows users to override themselves if they want via > {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this > update takes effect. > In addition to the code change, we also need to have doc changes in > streams/upgrade_guide.html specifying this default value change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6679) Random corruption (CRC validation issues)
[ https://issues.apache.org/jira/browse/KAFKA-6679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410079#comment-16410079 ] Ari Uka commented on KAFKA-6679: So I attempted to re-generate one of the topics we had and I've been able to actually find a corrupt message in the `.log` file. There exists 3 brokers: kafka-01, kafka-02, kafka-03 and the topic influxdb-telemetry which contains 161,760,969 (161.7M) messages. The topic description: {noformat} Topic:influxdb-telemetry PartitionCount:6 ReplicationFactor:3 Configs: Topic: influxdb-telemetry Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2 Topic: influxdb-telemetry Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 3 Topic: influxdb-telemetry Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 1 Topic: influxdb-telemetry Partition: 3 Leader: 2 Replicas: 2,3,1 Isr: 2 Topic: influxdb-telemetry Partition: 4 Leader: 3 Replicas: 3,1,2 Isr: 3 Topic: influxdb-telemetry Partition: 5 Leader: 1 Replicas: 1,2,3 Isr: 1{noformat} After inserting only 71,235 messages, influxdb-telemetry-0 becomes corrupt on kafka-02 and kafka-01 starts to complain: {noformat} [2018-03-22 17:00:00,690] ERROR [ReplicaManager broker=2] Error processing fetch operation on partition influxdb-telemetry-0, offset 71236 (kafka.server.ReplicaManager) {noformat} The last good written RecordSet looks like this: {noformat} baseOffset: 71233 lastOffset: 71235 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 10285560 CreateTime: -1 isvalid: true size: 471 magic: 2 compresscodec: NONE crc: 491186814{noformat} The header of this RecordSet looks like so: $ hd -s 10285560 -n 471 /var/db/kafka/influxdb-telemetry-0/.log {noformat} 009ce30d 00 00 00 00 00 01 16 27 00 00 01 4b 00 00 00 00 |...'...K| 009ce31d 02 ee 03 fd 04 00 00 00 00 00 01 ff ff ff ff ff || 009ce32d ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff || 009ce33d ff ff ff ff ff ff ff ff ff 00 00 00 02 a8 02 00 || 009ce34d 00 00 01 9a 02 00 8b 09 89 54 4f d9 dd 82 b2 14 |.TO.| {noformat} If we're looking at the log files on `kafka-01` (who is trying to replicate), it hasn't replicated past message 71235 and it's just sitting there complaining. If we go on the leader of this partition and dump the next message after the good message, it's indeed corrupt, this is what it looks like: $ hd -s 10286031 -n 471 /var/db/kafka/influxdb-telemetry-0/.log {noformat} 009cf3cf 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 || * 009cf59f {noformat} The asterisk ( * ) here means that every line is the same, so basically message 71236 contains 471 zeroes. I picked 471 arbitrarily seen I don't actually know how big the RecordSet is for the next Record. If we try to dump `influxdb-telemetry-0` on the leader, it barfs and ends early because of this exception: {noformat} $ ./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files /var/db/kafka/influxdb-telemetry-0/.log | tail -n 5 Exception in thread "main" org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller than minimum record overhead (14). {noformat} So it only tails these messages when in reality the log is 1GB and message 71,236 is only about 10MB through the file. {noformat} offset: 71231 position: 10283251 CreateTime: -1 isvalid: true keysize: -1 valuesize: 130 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] offset: 71232 position: 10283251 CreateTime: -1 isvalid: true keysize: -1 valuesize: 125 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] offset: 71233 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 valuesize: 130 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] offset: 71234 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 valuesize: 126 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] offset: 71235 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 valuesize: 127 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] {noformat} What could cause kafka to dump 471 zeroes for a RecordSet? > Random corruption (CRC validation issues) > -- > > Key: KAFKA-6679 > URL: https://issues.apache.org/jira/browse/KAFKA-6679 > Project: Kafka > Issue Type: Bug > Components: consumer, replication >Affects Versions: 0.10.2.0, 1.0.1 > Envir
[jira] [Commented] (KAFKA-6703) MirrorMaker cannot make progress when any matched topic from a whitelist regexp has -1 leader
[ https://issues.apache.org/jira/browse/KAFKA-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410078#comment-16410078 ] Attila Sasvari commented on KAFKA-6703: --- There is a call to [ensureCoordinatorReady()|https://github.com/apache/kafka/blob/f0a29a693548efe539cba04807e21862c8dfc1bf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L279] in {{ConsumerCoordinator}}. It tries to poll the coordinator in a [loop|https://github.com/apache/kafka/blob/f0a29a693548efe539cba04807e21862c8dfc1bf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L217-L219]. It does not succeed, so it will retry the connection, and other matched topics are ignored until the failing coordinator becomes healthy. > MirrorMaker cannot make progress when any matched topic from a whitelist > regexp has -1 leader > - > > Key: KAFKA-6703 > URL: https://issues.apache.org/jira/browse/KAFKA-6703 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: Attila Sasvari >Priority: Major > > Scenario: > - MM whitelabel regexp matches multiple topics > - destination cluster has 5 brokers with multiple topics replication factor 3 > - without partition reassign shut down 2 brokers > - suppose a topic has no leader any more because it was off-sync and the > leader and the rest of the replicas are hosted on the downed brokers. > - so we have 1 topic with some partitions with leader -1 > - the rest of the matching topics has 3 replicas with leaders > MM will not produce into any of the matched topics until: > - the "orphaned" topic removed or > - the partition reassign carried out from the downed brokers (suppose you > can turn these back on) > In the MirrorMaker logs, there are a lot of messages like the following ones: > {code} > [2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-1, groupId=1] Sending > FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-0, groupId=1] Sending > FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Received > FindCoordinator response ClientResponse(receivedTimeMs=1521741547782, > latencyMs=1, disconnected=false, > requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, > clientId=1-0, correlationId=71), > responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', > error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] Received > FindCoordinator response ClientResponse(receivedTimeMs=1521741547782, > latencyMs=1, disconnected=false, > requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, > clientId=1-1, correlationId=71), > responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', > error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Group > coordinator lookup failed: The coordinator is not available. > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] Group > coordinator lookup failed: The coordinator is not available. > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] > Coordinator discovery failed, refreshing metadata > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] > Coordinator discovery failed, refreshing metadata > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > {code} > Interestingly, if MirrorMaker uses {{zookeeper.connect}} in its consumer > properties file, then an OldConsumer is created, and it can make progress. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6703) MirrorMaker cannot make progress when any matched topic from a whitelist regexp has -1 leader
Attila Sasvari created KAFKA-6703: - Summary: MirrorMaker cannot make progress when any matched topic from a whitelist regexp has -1 leader Key: KAFKA-6703 URL: https://issues.apache.org/jira/browse/KAFKA-6703 Project: Kafka Issue Type: Bug Affects Versions: 1.1.0 Reporter: Attila Sasvari Scenario: - MM whitelabel regexp matches multiple topics - destination cluster has 5 brokers with multiple topics replication factor 3 - without partition reassign shut down 2 brokers - suppose a topic has no leader any more because it was off-sync and the leader and the rest of the replicas are hosted on the downed brokers. - so we have 1 topic with some partitions with leader -1 - the rest of the matching topics has 3 replicas with leaders MM will not produce into any of the matched topics until: - the "orphaned" topic removed or - the partition reassign carried out from the downed brokers (suppose you can turn these back on) In the MirrorMaker logs, there are a lot of messages like the following ones: {code} [2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-1, groupId=1] Sending FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-0, groupId=1] Sending FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Received FindCoordinator response ClientResponse(receivedTimeMs=1521741547782, latencyMs=1, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=1-0, correlationId=71), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] Received FindCoordinator response ClientResponse(receivedTimeMs=1521741547782, latencyMs=1, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=1-1, correlationId=71), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Group coordinator lookup failed: The coordinator is not available. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] Group coordinator lookup failed: The coordinator is not available. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Coordinator discovery failed, refreshing metadata (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] Coordinator discovery failed, refreshing metadata (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) {code} Interestingly, if MirrorMaker uses {{zookeeper.connect}} in its consumer properties file, then an OldConsumer is created, and it can make progress. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410019#comment-16410019 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4758: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4758 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 in
[jira] [Commented] (KAFKA-6679) Random corruption (CRC validation issues)
[ https://issues.apache.org/jira/browse/KAFKA-6679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409993#comment-16409993 ] Ari Uka commented on KAFKA-6679: One thing I'd like to also say is that we are using: FreeBSD with ZFS, I'm unsure if that's relevant here. The cluster has been pretty healthy for about a year straight, so I'm unsure if that's an issue I did find someone else complaining about ZFS + FreeBSD [https://mail-archives.apache.org/mod_mbox/kafka-dev/201602.mbox/%3cjira.12939477.1455623036000.60174.1455625758...@atlassian.jira%3E] > Random corruption (CRC validation issues) > -- > > Key: KAFKA-6679 > URL: https://issues.apache.org/jira/browse/KAFKA-6679 > Project: Kafka > Issue Type: Bug > Components: consumer, replication >Affects Versions: 0.10.2.0, 1.0.1 > Environment: FreeBSD 11.0-RELEASE-p8 >Reporter: Ari Uka >Priority: Major > > I'm running into a really strange issue on production. I have 3 brokers and > randomly consumers will start to fail with an error message saying the CRC > does not match. The brokers are all on 1.0.1, but the issue started on 0.10.2 > with the hope that upgrading would help fix the issue. > On the kafka side, I see errors related to this across all 3 brokers: > ``` > [2018-03-17 20:59:58,967] ERROR [ReplicaFetcher replicaId=3, leaderId=1, > fetcherId=0] Error for partition topic-a-0 to broker > 1:org.apache.kafka.common.errors.CorruptRecordException: This message has > failed its CRC checksum, exceeds the valid size, or is otherwise corrupt. > (kafka.server.ReplicaFetcherThread) > [2018-03-17 20:59:59,411] ERROR [ReplicaManager broker=3] Error processing > fetch operation on partition topic-b-0, offset 23848795 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller > than minimum record overhead (14). > [2018-03-17 20:59:59,411] ERROR [ReplicaManager broker=3] Error processing > fetch operation on partition topic-b-0, offset 23848795 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller > than minimum record overhead (14) > [2018-03-17 20:59:59,490] ERROR [ReplicaFetcher replicaId=3, leaderId=2, > fetcherId=0] Error for partition topic-c-2 to broker > 2:org.apache.kafka.common.errors.CorruptRecordException: This message has > failed its CRC checksum, exceeds the valid size, or is otherwise corrupt. > (kafka.server.ReplicaFetcherThread) > ``` > > To fix this, I have to use the kafka-consumer-groups.sh command line tool and > do a binary search until I can find a non corrupt message and push the > offsets forward. It's annoying because I can't actually push to a specific > date because kafka-consumer-groups.sh starts to emit the same error, > ErrInvalidMessage, CRC does not match. > The error popped up again the next day after fixing it tho, so I'm trying to > find the root cause. > I'm using the Go consumer [https://github.com/Shopify/sarama] and > [https://github.com/bsm/sarama-cluster]. > At first, I thought it could be the consumer libraries, but the error happens > with kafka-console-consumer.sh as well when a specific message is corrupted > in Kafka. I don't think it's possible for Kafka producers to actually push > corrupt messages to Kafka and then cause all consumers to break right? I > assume Kafka would reject corrupt messages, so I'm not sure what's going on > here. > Should I just re-create the cluster, I don't think it's hardware failure > across the 3 machines tho. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6699) When one of two Kafka nodes are dead, streaming API cannot handle messaging
[ https://issues.apache.org/jira/browse/KAFKA-6699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409957#comment-16409957 ] Antony Stubbs commented on KAFKA-6699: -- Also, what is the replication factor for your topics? Input and intermediate topics? A describe all will help. If replication is one then that would cause you problems if any broker fails. > When one of two Kafka nodes are dead, streaming API cannot handle messaging > --- > > Key: KAFKA-6699 > URL: https://issues.apache.org/jira/browse/KAFKA-6699 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.2 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Major > > Dears, > I am observing quite often, when Kafka Broker is partly dead(*), then > application, which uses streaming API are doing nothing. > (*) Partly dead in my case it means that one of two Kafka nodes are out of > order. > Especially when disk is full on one machine, then Broker is going in some > strange state, where streaming API goes vacations. It seems like regular > producer/consumer API has no problem in such a case. > Can you have a look on that matter? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409886#comment-16409886 ] Matthias J. Sax commented on KAFKA-6437: Sounds reasonable to me. The question is, what we want to do when topics are missing? Should we stop the world and fail (ie, throw an exception) or just log a warning? Or maybe make it even configurable? WDYT? > Streams does not warn about missing input topics, but hangs > --- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 > Environment: Single client on single node broker >Reporter: Chris Schwarzfischer >Priority: Minor > Labels: newbie > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the middle" of the topology (at …9, see below). Only parts of > the intermediate topics are created (up to …9) > When the missing input topic is created, the streams application resumes > processing. > {noformat} > Topology: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-11: > topics: > [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] > children: [KTABLE-AGGREGATE-12] > KTABLE-AGGREGATE-12: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KTABLE-TOSTREAM-20] > KTABLE-TOSTREAM-20: > children: [KSTREAM-SINK-21] > KSTREAM-SINK-21: > topic: data_udr_month_customer_aggregration > KSTREAM-SOURCE-17: > topics: > [mystreams_app-KSTREAM-MAP-14-repartition] > children: [KSTREAM-LEFTJOIN-18] > KSTREAM-LEFTJOIN-18: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KSTREAM-SINK-19] > KSTREAM-SINK-19: > topic: data_UDR_joined > Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, > mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] > {noformat} > *Why this matters* > The applications does quite a lot of preprocessing before joining with the > missing input topic. This preprocessing won't happen without the topic, > creating a huge backlog of data. > *Fix* > Issue an `warn` or `error` level message at start to inform about the missing > topic and it's consequences. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409759#comment-16409759 ] Mariam John commented on KAFKA-6437: In the KafkaStreams we can use the admin client to list all topics and go through the source and sink topics to see which all topics are missing and print an error message like a MissingTopicException. Is there something more we want to do for a fix or is there more to this bug I am missing. > Streams does not warn about missing input topics, but hangs > --- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 > Environment: Single client on single node broker >Reporter: Chris Schwarzfischer >Priority: Minor > Labels: newbie > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the middle" of the topology (at …9, see below). Only parts of > the intermediate topics are created (up to …9) > When the missing input topic is created, the streams application resumes > processing. > {noformat} > Topology: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-11: > topics: > [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] > children: [KTABLE-AGGREGATE-12] > KTABLE-AGGREGATE-12: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KTABLE-TOSTREAM-20] > KTABLE-TOSTREAM-20: > children: [KSTREAM-SINK-21] > KSTREAM-SINK-21: > topic: data_udr_month_customer_aggregration > KSTREAM-SOURCE-17: > topics: > [mystreams_app-KSTREAM-MAP-14-repartition] > children: [KSTREAM-LEFTJOIN-18] > KSTREAM-LEFTJOIN-18: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KSTREAM-SINK-19] > KSTREAM-SINK-19: > topic: data_UDR_joined > Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, > mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] > {noformat} > *Why this matters* > The applications does quite a lot of preprocessing before joining with the > missing input topic. This preprocessing won't happen without the topic, > creating a huge backlog of data. > *Fix* > Issue an `warn` or `error` level message at start to inform about the missing > topic and it's consequences. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409695#comment-16409695 ] UYAR Farid commented on KAFKA-4107: --- There is also another issue I dont know if it's related. When I delete a connector and recreates it with the same name, the previously stored offset is retrieved. I also have to resort to increment the name of the connector. That's unintuitive and I think not intended. > Support offset reset capability in Kafka Connect > > > Key: KAFKA-4107 > URL: https://issues.apache.org/jira/browse/KAFKA-4107 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jason Gustafson >Priority: Major > > It would be useful in some cases to be able to reset connector offsets. For > example, if a topic in Kafka corresponding to a source database is > accidentally deleted (or deleted because of corrupt data), an administrator > may want to reset offsets and reproduce the log from the beginning. It may > also be useful to have support for overriding offsets, but that seems like a > less likely use case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6670) Implement a Scala wrapper library for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409504#comment-16409504 ] ASF GitHub Bot commented on KAFKA-6670: --- debasishg opened a new pull request #4756: KAFKA-6670: Implement a Scala wrapper library for Kafka Streams URL: https://github.com/apache/kafka/pull/4756 This PR implements a Scala wrapper library for Kafka Streams. The library is implemented as a project under streams, namely `:streams:streams-scala`. The PR contains the following: * the library implementation of the wrapper abstractions * the test suite * the changes in `build.gradle` to build the library jar The library has been tested running the tests as follows: ``` $ ./gradlew -Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdes streams:streams-scala:test $ ./gradlew -Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro streams:streams-scala:test $ ./gradlew -Dtest.single=WordCountTest streams:streams-scala:test ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement a Scala wrapper library for Kafka Streams > --- > > Key: KAFKA-6670 > URL: https://issues.apache.org/jira/browse/KAFKA-6670 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Debasish Ghosh >Assignee: Debasish Ghosh >Priority: Major > Labels: api, kip > > Implement a Scala wrapper library for Kafka Streams. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6685) Connect deserialization log message should distinguish key from value
[ https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406081#comment-16406081 ] Jagadesh Adireddi edited comment on KAFKA-6685 at 3/22/18 12:28 PM: Hi [~rhauch], I would like to pick this Jira up. Wanted some advice on the error message .I am thinking , if the exception occurred during: a) keyAndSchema conversion, then error msg logged as : Failed to convert Record Key to Kafka Connect format. b) valueAndSchema conversion, then error msg logged as : Failed to convert Record Value to Kafka Connect format. And Main ConnectException thrown as : Exiting WorkerSinkTask due to unconvertedmessage exception. Any help on the hint message to fix the issue would be great? was (Author: adireddijagad...@gmail.com): Hi [~rhauch], I would like to pick this Jira up. Wanted some advice on the error message .I am thinking , if the exception occurred during: a) keyAndSchema conversion, then error msg logged as : Failed to convert message Key And Schema to Kafka Connect format. b) valueAndSchema conversion, then error msg logged as : Failed to convert message Value And Schema to Kafka Connect format. And Main ConnectException thrown as : Exiting WorkerSinkTask due to unconvertedmessage exception. Any help on the hint message to fix the issue would be great? > Connect deserialization log message should distinguish key from value > - > > Key: KAFKA-6685 > URL: https://issues.apache.org/jira/browse/KAFKA-6685 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yeva Byzek >Priority: Minor > Labels: newbie > > Connect was configured for Avro key and value but data had String key and > Avro value. The resulting error message was misleading because it didn't > distinguish key from value, and so I was chasing problems with the value > instead of the key. > tl;dr Connect should at least tell you whether the problem is with > deserializing the key or value of a record > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6673) Segment and Stamped implement Comparable, but don't override equals.
[ https://issues.apache.org/jira/browse/KAFKA-6673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409214#comment-16409214 ] Koen De Groote edited comment on KAFKA-6673 at 3/22/18 8:32 AM: That implementation would just do an identity comparison. Because Stamped and Segment don't have an explicit super class, just Object. And Object's equals is this: {code:java} public boolean equals(Object obj) { return (this == obj); } {code} The whole idea behind this ticket is that the equals method should act logically the same as the compareTo method, as to have only 1 kind of behavior. was (Author: koendg): That implementation would just do an identity comparison. Because Stamped and Segment don't have an explicit super class, just Object. And Object's equals is this: {code:java} public boolean equals(Object obj) { return (this == obj); } {code} The whole idea behind this is that the equals method should act logically the same as the compareTo method, as to have only 1 kind of behavior. > Segment and Stamped implement Comparable, but don't override equals. > > > Key: KAFKA-6673 > URL: https://issues.apache.org/jira/browse/KAFKA-6673 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Koen De Groote >Priority: Minor > Attachments: KAFKA_6673.patch > > > The classes in question: > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java > and > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java > This came up while doing static analysis on the codebase on the trunk branch. > As described by the analysis tool built into Intellij: > {quote} > Reports classes which implement java.lang.Comparable which do not override > equals(). If equals() is not overridden, the equals() implementation is not > consistent with the compareTo() implementation. If an object of such a class > is added to a collection such as java.util.SortedSet, this collection will > violate the contract of java.util.Set, which is defined in terms of equals(). > {quote} > > Implementing an equals for an object is generally a best practice, especially > considering this caveat, where it's not the compareTo that will be used but > the equals method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6673) Segment and Stamped implement Comparable, but don't override equals.
[ https://issues.apache.org/jira/browse/KAFKA-6673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409214#comment-16409214 ] Koen De Groote edited comment on KAFKA-6673 at 3/22/18 8:31 AM: That implementation would just do an identity comparison. Because Stamped and Segment don't have an explicit super class, just Object. And Object's equals is this: {code:java} public boolean equals(Object obj) { return (this == obj); } {code} The whole idea behind this is that the equals method should act logically the same as the compareTo method, as to have only 1 kind of behavior. was (Author: koendg): That implementation would just do an identity comparison. Because Stamped and Segment don't have an explicit super class, just Object. And Object's equals is this: public boolean equals(Object obj) { return (this == obj); } The whole idea behind this is that the equals method should act logically the same as the compareTo method, as to have only 1 kind of behavior. > Segment and Stamped implement Comparable, but don't override equals. > > > Key: KAFKA-6673 > URL: https://issues.apache.org/jira/browse/KAFKA-6673 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Koen De Groote >Priority: Minor > Attachments: KAFKA_6673.patch > > > The classes in question: > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java > and > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java > This came up while doing static analysis on the codebase on the trunk branch. > As described by the analysis tool built into Intellij: > {quote} > Reports classes which implement java.lang.Comparable which do not override > equals(). If equals() is not overridden, the equals() implementation is not > consistent with the compareTo() implementation. If an object of such a class > is added to a collection such as java.util.SortedSet, this collection will > violate the contract of java.util.Set, which is defined in terms of equals(). > {quote} > > Implementing an equals for an object is generally a best practice, especially > considering this caveat, where it's not the compareTo that will be used but > the equals method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6673) Segment and Stamped implement Comparable, but don't override equals.
[ https://issues.apache.org/jira/browse/KAFKA-6673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409214#comment-16409214 ] Koen De Groote commented on KAFKA-6673: --- That implementation would just do an identity comparison. Because Stamped and Segment don't have an explicit super class, just Object. And Object's equals is this: public boolean equals(Object obj) { return (this == obj); } The whole idea behind this is that the equals method should act logically the same as the compareTo method, as to have only 1 kind of behavior. > Segment and Stamped implement Comparable, but don't override equals. > > > Key: KAFKA-6673 > URL: https://issues.apache.org/jira/browse/KAFKA-6673 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Koen De Groote >Priority: Minor > Attachments: KAFKA_6673.patch > > > The classes in question: > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java > and > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java > This came up while doing static analysis on the codebase on the trunk branch. > As described by the analysis tool built into Intellij: > {quote} > Reports classes which implement java.lang.Comparable which do not override > equals(). If equals() is not overridden, the equals() implementation is not > consistent with the compareTo() implementation. If an object of such a class > is added to a collection such as java.util.SortedSet, this collection will > violate the contract of java.util.Set, which is defined in terms of equals(). > {quote} > > Implementing an equals for an object is generally a best practice, especially > considering this caveat, where it's not the compareTo that will be used but > the equals method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"
[ https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409185#comment-16409185 ] ASF GitHub Bot commented on KAFKA-6683: --- hachikuji opened a new pull request #4755: KAFKA-6683; Ensure producer state not mutated prior to append URL: https://github.com/apache/kafka/pull/4755 We were unintentionally mutating the cached queue of batches prior to appending to the log. This could have several bad consequences if the append ultimately failed. In the reporter's case, it caused the snapshot to be invalid after a segment roll. The snapshot contained producer state at offsets higher than the snapshot offset. If we ever had to load from that snapshot, the state was left inconsistent, which led to an error that ultimately crashed the replica fetcher. The fix required some refactoring to avoid sharing the same underlying queue inside `ProducerAppendInfo`. I have added test cases which reproduce the invalid snapshot state. I have also made an effort to clean up logging since it was not easy to track this problem down. One final note: I have removed the duplicate check inside `ProducerStateManager` since it was both redundant and incorrect. The redundancy was in the checking of the cached batches: we already check these in `Log.analyzeAndValidateProducerState`. The incorrectness was the handling of sequence number overflow: we were only handling one very specific case of overflow, but others would have resulted in an invalid assertion. Instead, we now throw `OutOfOrderSequenceException`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ReplicaFetcher crashes with "Attempted to complete a transaction which was > not started" > > > Key: KAFKA-6683 > URL: https://issues.apache.org/jira/browse/KAFKA-6683 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0 > Environment: os: GNU/Linux > arch: x86_64 > Kernel: 4.9.77 > jvm: OpenJDK 1.8.0 >Reporter: Chema Sanchez >Assignee: Jason Gustafson >Priority: Critical > Attachments: server.properties > > > We have been experiencing this issue lately when restarting or replacing > brokers of our Kafka clusters during maintenance operations. > Having restarted or replaced a broker, after some minutes performing normally > it may suddenly throw the following exception and stop replicating some > partitions: > {code:none} > 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > java.lang.IllegalArgumentException: Attempted to complete a transaction which > was not started > at > kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720) > at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540) > at > kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540) > at scala.collection.immutable.List.foreach(List.scala:389) > at > scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35) > at > scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35) > at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44) > at kafka.log.Log.loadProducersFromLog(Log.scala:540) > at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521) > at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514) > at scala.collection.Iterator.foreach(Iterator.scala:929) > at scala.collection.Iterator.foreach$(Iterator.scala:929) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1417) > at scala.collection.IterableLike.foreach(IterableLike.scala:71) > at scala.collection.IterableLike.foreach$(IterableLike.scala:70) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.log.Log.loadProducerState(Log.scala:514) > at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487) > at > scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.truncateTo(Log.scala:1467)