[jira] [Commented] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"

2018-03-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410894#comment-16410894
 ] 

Jason Gustafson commented on KAFKA-6683:


[~chema.sanchez] Thanks a lot for the logs. It really helped. We've merged a 
fix which will be included in 1.1. If you have time to try out the same 
scenario with the new RC when it is available, we'd appreciate it.

> ReplicaFetcher crashes with "Attempted to complete a transaction which was 
> not started" 
> 
>
> Key: KAFKA-6683
> URL: https://issues.apache.org/jira/browse/KAFKA-6683
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0
> Environment: os: GNU/Linux 
> arch: x86_64
> Kernel: 4.9.77
> jvm: OpenJDK 1.8.0
>Reporter: Chema Sanchez
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 1.1.0
>
> Attachments: server.properties
>
>
> We have been experiencing this issue lately when restarting or replacing 
> brokers of our Kafka clusters during maintenance operations.
> Having restarted or replaced a broker, after some minutes performing normally 
> it may suddenly throw the following exception and stop replicating some 
> partitions:
> {code:none}
> 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalArgumentException: Attempted to complete a transaction which 
> was not started
>     at 
> kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720)
>     at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540)
>     at 
> kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540)
>     at scala.collection.immutable.List.foreach(List.scala:389)
>     at 
> scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35)
>     at 
> scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35)
>     at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44)
>     at kafka.log.Log.loadProducersFromLog(Log.scala:540)
>     at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521)
>     at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514)
>     at scala.collection.Iterator.foreach(Iterator.scala:929)
>     at scala.collection.Iterator.foreach$(Iterator.scala:929)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at kafka.log.Log.loadProducerState(Log.scala:514)
>     at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487)
>     at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
>     at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>     at kafka.log.Log.truncateTo(Log.scala:1467)
>     at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454)
>     at 
> kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445)
>     at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
>     at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
>     at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
>     at kafka.log.LogManager.truncateTo(LogManager.scala:445)
>     at 
> kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281)
>     at scala.collection.Iterator.foreach(Iterator.scala:929)
>     at scala.collection.Iterator.foreach$(Iterator.scala:929)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at 
> kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265)
>     at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135)
>     at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
>     at 
> kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132)
>     at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> [2018-03-15 17:23:01

[jira] [Commented] (KAFKA-6705) producer.send() should be non-blocking

2018-03-22 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410876#comment-16410876
 ] 

Dong Lin commented on KAFKA-6705:
-

[~ijuma] Thanks for the information. I will search for the previous discussion. 
Here is my thought before reading the previous discussion. I find it useful to 
block the request on the full buffer queue for up to max.block.ms. The original 
description is misleading in my proposal and it is updated now. But I am not 
sure we should block the producer.send() if metadata is unavailable.

For example, say user wants producer.send() to return immediately and is OK to 
drop message if network speed is slower than the message generation rate. 
Currently user will have to set max.block.ms to achieve this. But that means 
the first few messages will always be lost due to unavailable metadata.

There maybe ways to get around this depending on the use-case. For example we 
may argue that if user is OK to lose message then it is OK to drop the first 
few messages as well. But it still seems nicer to not block producer.send() if 
the buffer queue still has more space to hold new messages.

 

 

> producer.send() should be non-blocking
> --
>
> Key: KAFKA-6705
> URL: https://issues.apache.org/jira/browse/KAFKA-6705
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently producer.send() may block on metadata for up to max.block.ms. This 
> behavior is well documented but it is a bit sub-optimal. Since we return a 
> future we should be able to make producer.send() completely non-blocking. One 
> idea is to simply insert the record into a global queue shared across all 
> partitions, and let the sender thread fetch record from this queue and send 
> to broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6705) producer.send() should be non-blocking

2018-03-22 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6705:

Description: Currently producer.send() may block on metadata for up to 
max.block.ms. This behavior is well documented but it is a bit sub-optimal. 
Since we return a future we should be able to make producer.send() completely 
non-blocking. One idea is to simply insert the record into a global queue 
shared across all partitions, and let the sender thread fetch record from this 
queue and send to broker.  (was: Currently producer.send() may block on 
metadata or full buffer for up to max.block.ms. This behavior is well 
documented but it is a bit sub-optimal. Since we return a future we should be 
able to make producer.send() completely non-blocking. One idea is to simply 
insert the record into a global queue shared across all partitions, and let the 
sender thread fetch record from this queue and send to broker.)

> producer.send() should be non-blocking
> --
>
> Key: KAFKA-6705
> URL: https://issues.apache.org/jira/browse/KAFKA-6705
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently producer.send() may block on metadata for up to max.block.ms. This 
> behavior is well documented but it is a bit sub-optimal. Since we return a 
> future we should be able to make producer.send() completely non-blocking. One 
> idea is to simply insert the record into a global queue shared across all 
> partitions, and let the sender thread fetch record from this queue and send 
> to broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"

2018-03-22 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-6683.

   Resolution: Fixed
Fix Version/s: 1.1.0

> ReplicaFetcher crashes with "Attempted to complete a transaction which was 
> not started" 
> 
>
> Key: KAFKA-6683
> URL: https://issues.apache.org/jira/browse/KAFKA-6683
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0
> Environment: os: GNU/Linux 
> arch: x86_64
> Kernel: 4.9.77
> jvm: OpenJDK 1.8.0
>Reporter: Chema Sanchez
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 1.1.0
>
> Attachments: server.properties
>
>
> We have been experiencing this issue lately when restarting or replacing 
> brokers of our Kafka clusters during maintenance operations.
> Having restarted or replaced a broker, after some minutes performing normally 
> it may suddenly throw the following exception and stop replicating some 
> partitions:
> {code:none}
> 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalArgumentException: Attempted to complete a transaction which 
> was not started
>     at 
> kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720)
>     at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540)
>     at 
> kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540)
>     at scala.collection.immutable.List.foreach(List.scala:389)
>     at 
> scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35)
>     at 
> scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35)
>     at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44)
>     at kafka.log.Log.loadProducersFromLog(Log.scala:540)
>     at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521)
>     at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514)
>     at scala.collection.Iterator.foreach(Iterator.scala:929)
>     at scala.collection.Iterator.foreach$(Iterator.scala:929)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at kafka.log.Log.loadProducerState(Log.scala:514)
>     at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487)
>     at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
>     at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>     at kafka.log.Log.truncateTo(Log.scala:1467)
>     at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454)
>     at 
> kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445)
>     at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
>     at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
>     at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
>     at kafka.log.LogManager.truncateTo(LogManager.scala:445)
>     at 
> kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281)
>     at scala.collection.Iterator.foreach(Iterator.scala:929)
>     at scala.collection.Iterator.foreach$(Iterator.scala:929)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at 
> kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265)
>     at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135)
>     at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
>     at 
> kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132)
>     at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> [2018-03-15 17:23:01,497] INFO [ReplicaFetcher replicaId=12, leaderId=10, 
> fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
> {code}
> As during system updates all brokers in a cluster are restarted, it happened 
> some times the issue to man

[jira] [Commented] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410800#comment-16410800
 ] 

ASF GitHub Bot commented on KAFKA-6683:
---

ijuma closed pull request #4755: KAFKA-6683; Ensure producer state not mutated 
prior to append
URL: https://github.com/apache/kafka/pull/4755
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index cc693375079..de4bb2912e9 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -166,6 +166,8 @@ class Log(@volatile var dir: File,
 
   import kafka.log.Log._
 
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
   /* A lock that guards all modifications to the log */
   private val lock = new Object
   // The memory mapped buffer for index files of this log will be closed for 
index files of this log will be closed with either delete() or closeHandlers()
@@ -242,8 +244,8 @@ class Log(@volatile var dir: File,
 
 loadProducerState(logEndOffset, reloadFromCleanShutdown = 
hasCleanShutdownFile)
 
-info("Completed load of log %s with %d log segments, log start offset %d 
and log end offset %d in %d ms"
-  .format(name, segments.size(), logStartOffset, logEndOffset, 
time.milliseconds - startMs))
+info(s"Completed load of log with ${segments.size} segments, log start 
offset $logStartOffset and " +
+  s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")
   }
 
   private val tags = {
@@ -457,21 +459,20 @@ class Log(@volatile var dir: File,
   val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
   while (unflushed.hasNext) {
 val segment = unflushed.next
-info("Recovering unflushed segment %d in log 
%s.".format(segment.baseOffset, name))
+info(s"Recovering unflushed segment ${segment.baseOffset}")
 val truncatedBytes =
   try {
 recoverSegment(segment, Some(_leaderEpochCache))
   } catch {
 case _: InvalidOffsetException =>
   val startOffset = segment.baseOffset
-  warn("Found invalid offset during recovery for log " + 
dir.getName + ". Deleting the corrupt segment and " +
-"creating an empty one with starting offset " + startOffset)
+  warn("Found invalid offset during recovery. Deleting the corrupt 
segment and " +
+s"creating an empty one with starting offset $startOffset")
   segment.truncateTo(startOffset)
   }
 if (truncatedBytes > 0) {
   // we had an invalid message, delete all remaining log
-  warn("Corruption found in segment %d of log %s, truncating to offset 
%d.".format(segment.baseOffset, name,
-segment.readNextOffset))
+  warn(s"Corruption found in segment ${segment.baseOffset}, truncating 
to offset ${segment.readNextOffset}")
   unflushed.foreach(deleteSegment)
 }
   }
@@ -483,8 +484,7 @@ class Log(@volatile var dir: File,
   private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: 
Boolean): Unit = lock synchronized {
 checkIfMemoryMappedBufferClosed()
 val messageFormatVersion = 
config.messageFormatVersion.messageFormatVersion.value
-info(s"Loading producer state from offset $lastOffset for partition 
$topicPartition with message " +
-  s"format version $messageFormatVersion")
+info(s"Loading producer state from offset $lastOffset with message format 
version $messageFormatVersion")
 
 // We want to avoid unnecessary scanning of the log to build the producer 
state when the broker is being
 // upgraded. The basic idea is to use the absence of producer snapshot 
files to detect the upgrade case,
@@ -571,7 +571,7 @@ class Log(@volatile var dir: File,
* The memory mapped buffer for index files of this log will be left open 
until the log is deleted.
*/
   def close() {
-debug(s"Closing log $name")
+debug("Closing log")
 lock synchronized {
   checkIfMemoryMappedBufferClosed()
   maybeHandleIOException(s"Error while renaming dir for $topicPartition in 
dir ${dir.getParent}") {
@@ -610,7 +610,7 @@ class Log(@volatile var dir: File,
* Close file handlers used by log but don't write to disk. This is called 
if the log directory is offline
*/
   def closeHandlers() {
-debug(s"Closing handlers of log $name")
+debug("Closing handlers")
 lock synchronized {
   logSegments.foreach(_.closeHandlers())
   isMemoryMappedBufferClosed = true
@@ -775,8 +775,10 @@ class Log(@volatile var d

[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410797#comment-16410797
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---

mjsax opened a new pull request #4761:  KAFKA-6054: Fix upgrade path from Kafka 
Streams v0.10.0
URL: https://github.com/apache/kafka/pull/4761
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -
>
> Key: KAFKA-6054
> URL: https://issues.apache.org/jira/browse/KAFKA-6054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: James Cheng
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: kip
> Fix For: 1.2.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade]
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running.
> We observed the following stack trace:
> {code:java}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> 
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 i

[jira] [Updated] (KAFKA-6127) Streams should never block infinitely

2018-03-22 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6127:
---
Description: 
Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
{{committed()}}, and {{position()}}.
Also {{KafkaProducer#send()}} can block.

If we block within one operation, the whole {{StreamThread}} would block, and 
the instance does not make any progress, becomes unresponsive (for example, 
{{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
group.

We might consider to use {{wakeup()}} calls to unblock those operations to keep 
{{StreamThread}} in a responsive state.

Note: there are discussion to add timeout to those calls, and thus, we could 
get {{TimeoutExceptions}}. This would be easier to handle than using 
{{wakeup()}}. Thus, we should keep an eye on those discussions. 

  was:
Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
{{committed()}}, and {{position()}}.

If we block within one operation, the whole {{StreamThread}} would block, and 
the instance does not make any progress, becomes unresponsive (for example, 
{{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
group.

We might consider to use {{wakeup()}} calls to unblock those operations to keep 
{{StreamThread}} in a responsive state.

Note: there are discussion to add timeout to those calls, and thus, we could 
get {{TimeoutExceptions}}. This would be easier to handle than using 
{{wakeup()}}. Thus, we should keep an eye on those discussions. 


> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}.
> Also {{KafkaProducer#send()}} can block.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> We might consider to use {{wakeup()}} calls to unblock those operations to 
> keep {{StreamThread}} in a responsive state.
> Note: there are discussion to add timeout to those calls, and thus, we could 
> get {{TimeoutExceptions}}. This would be easier to handle than using 
> {{wakeup()}}. Thus, we should keep an eye on those discussions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6127) Streams should never block infinitely

2018-03-22 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6127:
---
Description: 
Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
{{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block.

If we block within one operation, the whole {{StreamThread}} would block, and 
the instance does not make any progress, becomes unresponsive (for example, 
{{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
group.

We might consider to use {{wakeup()}} calls to unblock those operations to keep 
{{StreamThread}} in a responsive state.

Note: there are discussion to add timeout to those calls, and thus, we could 
get {{TimeoutExceptions}}. This would be easier to handle than using 
{{wakeup()}}. Thus, we should keep an eye on those discussions. 

  was:
Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
{{committed()}}, and {{position()}}.
Also {{KafkaProducer#send()}} can block.

If we block within one operation, the whole {{StreamThread}} would block, and 
the instance does not make any progress, becomes unresponsive (for example, 
{{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
group.

We might consider to use {{wakeup()}} calls to unblock those operations to keep 
{{StreamThread}} in a responsive state.

Note: there are discussion to add timeout to those calls, and thus, we could 
get {{TimeoutExceptions}}. This would be easier to handle than using 
{{wakeup()}}. Thus, we should keep an eye on those discussions. 


> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> We might consider to use {{wakeup()}} calls to unblock those operations to 
> keep {{StreamThread}} in a responsive state.
> Note: there are discussion to add timeout to those calls, and thus, we could 
> get {{TimeoutExceptions}}. This would be easier to handle than using 
> {{wakeup()}}. Thus, we should keep an eye on those discussions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6705) producer.send() should be non-blocking

2018-03-22 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410765#comment-16410765
 ] 

Ismael Juma commented on KAFKA-6705:


[~lindong], there's a long-standing Jira about this, it may be worth searching 
for it. One reason why the current interface blocks is to provide back-pressure 
in case you have exhausted the buffer pool. If `send` is completely 
non-blocking, then this would no longer be possible. In any case, feel free to 
add your thoughts on the original JIRA.

> producer.send() should be non-blocking
> --
>
> Key: KAFKA-6705
> URL: https://issues.apache.org/jira/browse/KAFKA-6705
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently producer.send() may block on metadata or full buffer for up to 
> max.block.ms. This behavior is well documented but it is a bit sub-optimal. 
> Since we return a future we should be able to make producer.send() completely 
> non-blocking. One idea is to simply insert the record into a global queue 
> shared across all partitions, and let the sender thread fetch record from 
> this queue and send to broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6685) Connect deserialization log message should distinguish key from value

2018-03-22 Thread Jagadesh Adireddi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jagadesh Adireddi reassigned KAFKA-6685:


Assignee: Jagadesh Adireddi

> Connect deserialization log message should distinguish key from value
> -
>
> Key: KAFKA-6685
> URL: https://issues.apache.org/jira/browse/KAFKA-6685
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yeva Byzek
>Assignee: Jagadesh Adireddi
>Priority: Minor
>  Labels: newbie
>
> Connect was configured for Avro key and value but data had String key and 
> Avro value. The resulting error message was misleading because it didn't 
> distinguish key from value, and so I was chasing problems with the value 
> instead of the key.
> tl;dr Connect should at least tell you whether the problem is with 
> deserializing the key or value of a record
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6705) producer.send() should be non-blocking

2018-03-22 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410672#comment-16410672
 ] 

Dong Lin commented on KAFKA-6705:
-

Ping [~ijuma] and [~jasong35] for comments.

> producer.send() should be non-blocking
> --
>
> Key: KAFKA-6705
> URL: https://issues.apache.org/jira/browse/KAFKA-6705
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently producer.send() may block on metadata or full buffer for up to 
> max.block.ms. This behavior is well documented but it is a bit sub-optimal. 
> Since we return a future we should be able to make producer.send() completely 
> non-blocking. One idea is to simply insert the record into a global queue 
> shared across all partitions, and let the sender thread fetch record from 
> this queue and send to broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6705) producer.send() should be non-blocking

2018-03-22 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-6705:
---

 Summary: producer.send() should be non-blocking
 Key: KAFKA-6705
 URL: https://issues.apache.org/jira/browse/KAFKA-6705
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin


Currently producer.send() may block on metadata or full buffer for up to 
max.block.ms. This behavior is well documented but it is a bit sub-optimal. 
Since we return a future we should be able to make producer.send() completely 
non-blocking. One idea is to simply insert the record into a global queue 
shared across all partitions, and let the sender thread fetch record from this 
queue and send to broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-03-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410621#comment-16410621
 ] 

Guozhang Wang commented on KAFKA-6474:
--

Filipe would like you to take a look at 
https://github.com/apache/kafka/pull/4760.

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
>  Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3591) JmxTool should exit out if a provided query matches no values

2018-03-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410575#comment-16410575
 ] 

Guozhang Wang commented on KAFKA-3591:
--

This is fixed as part of KAFKA-6611.

> JmxTool should exit out if a provided query matches no values
> -
>
> Key: KAFKA-3591
> URL: https://issues.apache.org/jira/browse/KAFKA-3591
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 0.9.0.0
>Reporter: Harsh J
>Priority: Trivial
> Fix For: 1.2.0
>
>
> Running {{kafka.tools.JmxTool}} with an invalid query, such as 
> {{--object-name "foobar"}} would produce just "time" field outputs given no 
> such object. If there are no matched objects when a query has been explicitly 
> provided, it should exit out instead of just printing the time data.
> (We should not exit if no filter is provided though, i.e. if the KAFKA-2278 
> feature is used)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6611) Re-write simple benchmark in system tests with JMXTool

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410573#comment-16410573
 ] 

ASF GitHub Bot commented on KAFKA-6611:
---

guozhangwang closed pull request #4650: KAFKA-6611: PART I, Use JMXTool in 
SimpleBenchmark
URL: https://github.com/apache/kafka/pull/4650
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala 
b/core/src/main/scala/kafka/tools/JmxTool.scala
index 4a6a348d9a6..27e46319e49 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -55,12 +55,17 @@ object JmxTool extends Logging {
 .withRequiredArg
 .describedAs("name")
 .ofType(classOf[String])
-val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval 
in MS with which to poll jmx stats.")
+val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval 
in MS with which to poll jmx stats; default value is 2 seconds. " +
+  "Value of -1 equivalent to setting one-time to true")
   .withRequiredArg
   .describedAs("ms")
   .ofType(classOf[java.lang.Integer])
   .defaultsTo(2000)
-val helpOpt = parser.accepts("help", "Print usage information.")
+val oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once 
only.")
+  .withRequiredArg
+  .describedAs("one-time")
+  .ofType(classOf[java.lang.Boolean])
+  .defaultsTo(false)
 val dateFormatOpt = parser.accepts("date-format", "The date format to use 
for formatting the time field. " +
   "See java.text.SimpleDateFormat for options.")
   .withRequiredArg
@@ -72,8 +77,15 @@ object JmxTool extends Logging {
 .describedAs("service-url")
 .ofType(classOf[String])
 .defaultsTo("service:jmx:rmi:///jndi/rmi://:/jmxrmi")
+val reportFormatOpt = parser.accepts("report-format", "output format name: 
either 'original', 'properties', 'csv', 'tsv' ")
+  .withRequiredArg
+  .describedAs("report-format")
+  .ofType(classOf[java.lang.String])
+  .defaultsTo("original")
 val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to 
become available before starting output. " +
   "Only supported when the list of objects is non-empty and contains no 
object name patterns.")
+val helpOpt = parser.accepts("help", "Print usage information.")
+
 
 if(args.length == 0)
   CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard 
output.")
@@ -87,12 +99,16 @@ object JmxTool extends Logging {
 
 val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))
 val interval = options.valueOf(reportingIntervalOpt).intValue
+var oneTime = interval < 0 || options.has(oneTimeOpt)
 val attributesWhitelistExists = options.has(attributesOpt)
-val attributesWhitelist = if(attributesWhitelistExists) 
Some(options.valueOf(attributesOpt).split(",")) else None
+val attributesWhitelist = if(attributesWhitelistExists) 
Some(options.valueOf(attributesOpt).split(",").filterNot(_.equals(""))) else 
None
 val dateFormatExists = options.has(dateFormatOpt)
 val dateFormat = if(dateFormatExists) Some(new 
SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
 val wait = options.has(waitOpt)
 
+val reportFormat = 
parseFormat(options.valueOf(reportFormatOpt).toLowerCase)
+val reportFormatOriginal = reportFormat.equals("original")
+
 var jmxc: JMXConnector = null
 var mbsc: MBeanServerConnection = null
 var connected = false
@@ -150,33 +166,57 @@ object JmxTool extends Logging {
 
 val numExpectedAttributes: Map[ObjectName, Int] =
   if (attributesWhitelistExists)
-queries.map((_, attributesWhitelist.get.size)).toMap
+queries.map((_, attributesWhitelist.get.length)).toMap
   else {
 names.map{(name: ObjectName) =>
   val mbean = mbsc.getMBeanInfo(name)
   (name, mbsc.getAttributes(name, 
mbean.getAttributes.map(_.getName)).size)}.toMap
   }
 
+if(numExpectedAttributes.isEmpty) {
+  CommandLineUtils.printUsageAndDie(parser, s"No matched attributes for 
the queried objects $queries.")
+}
+
 // print csv header
 val keys = List("time") ++ queryAttributes(mbsc, names, 
attributesWhitelist).keys.toArray.sorted
-if(keys.size == numExpectedAttributes.values.sum + 1)
+if(reportFormatOriginal && keys.size == numExpectedAttributes.values.sum + 
1) {
   println(keys.map("\"" + _ + "\"").mkString(","))
+}
 
-while(true) {
+var keepGoing = true
+while (keepGoing) {
   val start = System.currentTimeMillis
   val att

[jira] [Commented] (KAFKA-6685) Connect deserialization log message should distinguish key from value

2018-03-22 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410544#comment-16410544
 ] 

Matthias J. Sax commented on KAFKA-6685:


Done. [~adireddijagad...@gmail.com] You can assign Jiras to yourself now.

> Connect deserialization log message should distinguish key from value
> -
>
> Key: KAFKA-6685
> URL: https://issues.apache.org/jira/browse/KAFKA-6685
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yeva Byzek
>Priority: Minor
>  Labels: newbie
>
> Connect was configured for Avro key and value but data had String key and 
> Avro value. The resulting error message was misleading because it didn't 
> distinguish key from value, and so I was chasing problems with the value 
> instead of the key.
> tl;dr Connect should at least tell you whether the problem is with 
> deserializing the key or value of a record
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE

2018-03-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410534#comment-16410534
 ] 

Guozhang Wang commented on KAFKA-6535:
--

I'd vote to still start a KIP though it is a very small one, as it is still a 
public change and for people who're relying on the default retention it would 
be less of a surprise.

> Set default retention ms for Streams repartition topics to Long.MAX_VALUE
> -
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Khaireddine Rezgui
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.
> More specifically, in {{RepartitionTopicConfig}} we have a few default 
> overrides for repartition topic configs, we should just add the override for 
> {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still 
> allows users to override themselves if they want via 
> {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this 
> update takes effect.
> In addition to the code change, we also need to have doc changes in 
> streams/upgrade_guide.html specifying this default value change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6685) Connect deserialization log message should distinguish key from value

2018-03-22 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410527#comment-16410527
 ] 

Randall Hauch commented on KAFKA-6685:
--

[~adireddijagad...@gmail.com], please ask the Kafka committers to add you to 
the contributor list. See http://kafka.apache.org/contributing.html for 
details. Then you will be able to assign this to yourself.

Feel free to submit a PR, per those guidelines. For example, the PR title and 
commits should begin with "KAFKA-6685: " followed by a meaningful one-sentence 
summary.

> Connect deserialization log message should distinguish key from value
> -
>
> Key: KAFKA-6685
> URL: https://issues.apache.org/jira/browse/KAFKA-6685
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yeva Byzek
>Priority: Minor
>  Labels: newbie
>
> Connect was configured for Avro key and value but data had String key and 
> Avro value. The resulting error message was misleading because it didn't 
> distinguish key from value, and so I was chasing problems with the value 
> instead of the key.
> tl;dr Connect should at least tell you whether the problem is with 
> deserializing the key or value of a record
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-03-22 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410522#comment-16410522
 ] 

John Roesler commented on KAFKA-6437:
-

I sort of feel like the existing behavior is better than failing, and [~k1th] 
was mostly complaining of a visibility problem, so it would be best to just 
address the visibility of this situation.

Logging a warning seems appropriate. It's certainly not an error. It could also 
be info. I guess someone with more context could advise as to whether it seems 
like a warn vs. info.

A log message in conjunction with a lag metric (I'm planning to submit a KIP 
for that...) would really have prevented this from biting them, so maybe we 
just focus on exposing better information rather than failing fast in this case.

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Priority: Minor
>  Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6659) Improve error message if state store is not found

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410517#comment-16410517
 ] 

ASF GitHub Bot commented on KAFKA-6659:
---

mjsax closed pull request #4732: KAFKA-6659: Improve error message if state 
store is not found
URL: https://github.com/apache/kafka/pull/4732
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 3761bfb0ee6..44a25c1213f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -72,7 +72,13 @@ public StateStore getStateStore(final String name) {
 }
 
 if (!currentNode().stateStores.contains(name)) {
-throw new StreamsException("Processor " + currentNode().name() + " 
has no access to StateStore " + name);
+throw new StreamsException("Processor " + currentNode().name() + " 
has no access to StateStore " + name +
+" as the store is not connected to the processor. If you 
add stores manually via '.addStateStore()' " +
+"make sure to connect the added store to the processor by 
providing the processor name to " +
+"'.addStateStore()' or connect them via 
'.connectProcessorAndStateStores()'. " +
+"DSL users need to provide the store name to '.process()', 
'.transform()', or '.transformValues()' " +
+"to connect the store to the corresponding operator. If 
you do not add stores manually, " +
+"please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA.";);
 }
 
 return stateManager.getStore(name);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve error message if state store is not found
> -
>
> Key: KAFKA-6659
> URL: https://issues.apache.org/jira/browse/KAFKA-6659
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Stuart Perks
>Priority: Trivial
>  Labels: beginner, easy-fix, newbie
>
> If a processor tries to access a store but the store is not connected to the 
> processor, Streams fails with
> {quote}Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: 
> Invalid topology building: Processor KSTREAM-TRANSFORM-36 has no 
> access to StateStore questions-awaiting-answers-store
> {quote}
> We should improve this error message and give a hint to the user how to fix 
> the issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE

2018-03-22 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410509#comment-16410509
 ] 

John Roesler commented on KAFKA-6535:
-

It seems unlikely that there might be folks who just use the default and are 
adversely affected by the new default, so I'd "vote" not to bother with a KIP. 
But I'm also new in town...

> Set default retention ms for Streams repartition topics to Long.MAX_VALUE
> -
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Khaireddine Rezgui
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.
> More specifically, in {{RepartitionTopicConfig}} we have a few default 
> overrides for repartition topic configs, we should just add the override for 
> {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still 
> allows users to override themselves if they want via 
> {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this 
> update takes effect.
> In addition to the code change, we also need to have doc changes in 
> streams/upgrade_guide.html specifying this default value change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-03-22 Thread Gunnar Morling (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410487#comment-16410487
 ] 

Gunnar Morling commented on KAFKA-3821:
---

Hey [~ewencp], thanks for your extensive reply! I agree this discussion 
shouldn't be centered around saving or not saving that one allocation. I just 
added this to point out that the receiver approach leaves Kafka Connect more 
freedom in terms of implementing this. I.e. that way, in theory, you could even 
do crazy things such as storing incoming records off-heap. But yeah, most 
likely you'd just use a list yourselves.

But that's not why I was suggesting this. The more substantial advantage I see 
is that it allows to add further methods down the road without breaking 
existing implementors of this contract. [~rhauch] e.g. mentioned methods for TX 
handling. Assuming these calls must be done in the correct ordering when 
submitting records, such methods couldn't really be added to 
{{ConnectorSourceTask}} itself. It's the same for offset handling which is the 
original use case we're after here.

On the offset handling itself, there's two aspects to this. One is that we'd 
like to submit an offset once a snapshot is completed. Currently, we're doing 
what you described with

{quote}
would another option be having Debezium read 1 record forward to determine 
before returning the record and/or constructing its offset whether this is the 
final record of the snapshot
{quote}

But this leads back to complexity of this contract for implementors. Having a 
dedicated way for submitting the offset once the snapshot is done is arguably 
simpler to implement (and reason about when reading the code) than doing this 
"read forward" and delayed processing of it.

In terms of uniqueness of offsets, they _are_ unique also during snapshotting 
(binlog position), but we need a way to say that a snapshot has been completed.

I see though how that could be considered as a "style thing" mostly. There's 
another case though where we'd benefit - substantially - from being able to 
submit offsets explicitly. This is where a connector supports whitelisting of 
captured tables and no changes are done to the whitelisted tables in a while (a 
[comment 
above|https://issues.apache.org/jira/browse/KAFKA-3821?focusedCommentId=15973506&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15973506]
 touches on this). In this case the offset in the event source progresses (e.g. 
MySQL's binlog position), but as no record is of interest as per the whitelist 
config, we have no way to ever submit these offsets. Then, after a connector 
restart, we'd be forced to re-read much larger portions of the binlog than 
actually required.

We currently work around this in Debezium by regularly emitting records to a 
heartbeat topic. This allows us to submit these offsets, also if no changes to 
the whitelisted tables are applied.

Now the original idea above was to emit specific subclasses of {{SourceRecord}} 
for such "offset-only" records, but I wanted to bring up the receiver parameter 
idea because a) it feels less hackish to me and b) opens the door for further 
API additions as described above.

I hope this makes sense; what's proposed is the best coming to my mind right 
for the issues we try resolve (while keeping the API reasonably abstract so to 
cater for other use cases, too). Happy about any alternative proposals of 
course. Thanks for this great discussion!

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6696) Trogdor should support destroying tasks

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410366#comment-16410366
 ] 

ASF GitHub Bot commented on KAFKA-6696:
---

cmccabe opened a new pull request #4759: KAFKA-6696 Trogdor should support 
destroying tasks
URL: https://github.com/apache/kafka/pull/4759
 
 
   KAFKA-6696 Trogdor should support destroying tasks
   
   Implement destroying tasks and workers.  This means erasing all record of 
them on the Coordinator and the Agent.
   Workers should be identified by unique 64-bit worker IDs, rather than by the 
names of the tasks they are implementing.  This ensures that when a task is 
destroyed and re-created with the same task ID, the old workers will be not be 
treated as part of the new task instance.
   
   Fix some return results from RPCs.  In some cases RPCs were returning values 
that were never used.  Attempting to re-create the same task ID with different 
arguments should fail.  Add RequestConflictException to represent HTTP error 
code 409 (CONFLICT) for this scenario.
   
   If only one worker in a task stops, don't stop all the other workers for 
that task, unless the worker that stopped had an error.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Trogdor should support destroying tasks
> ---
>
> Key: KAFKA-6696
> URL: https://issues.apache.org/jira/browse/KAFKA-6696
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
>
> Trogdor should support destroying tasks.  This will make it more practical to 
> have very long running Trogdor instances.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6704) Checking hasNext from SegementIterator could throw InvalidStateStoreException

2018-03-22 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-6704:
--

 Summary: Checking hasNext from SegementIterator could throw 
InvalidStateStoreException 
 Key: KAFKA-6704
 URL: https://issues.apache.org/jira/browse/KAFKA-6704
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Bill Bejeck
Assignee: Bill Bejeck
 Fix For: 1.2.0


When using Interactive Queries to do a range query on a Windowed store 
,{{SegmentIterator.hasNext}} could throw an {{InvalidStateStoreException}} if 
the {{StreamThread}} has closed the store.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-03-22 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410162#comment-16410162
 ] 

John Roesler commented on KAFKA-6474:
-

Yeah, thanks [~h314to]! I had tried it with just half of that change, and it 
obviously didn't work. I'm really glad you figured that out!

FYI, there are other sub-projects in kafka that are going to follow this 
pattern, so I'm going to start a dev mailing list discussion to share your 
pattern.

Also, for completeness, I had to augment your build.gradle patch slightly to 
get './gradlew streams:test-utils:test' to pass:
{noformat}
diff --git a/build.gradle b/build.gradle
index 5e4c35643..2f38bf28b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -921,6 +921,7 @@ project(':streams') {
testCompile project(':clients').sourceSets.test.output
testCompile project(':core')
testCompile project(':core').sourceSets.test.output
+ testCompile project(':streams:test-utils').sourceSets.main.output
testCompile libs.junit
testCompile libs.easymock
testCompile libs.bcpkix
@@ -965,11 +966,12 @@ project(':streams:test-utils') {
archivesBaseName = "kafka-streams-test-utils"

dependencies {
- compile project(':streams')
+ compile project(':streams').sourceSets.main.output
compile project(':clients')

testCompile project(':clients').sourceSets.test.output
testCompile libs.junit
+ testCompile libs.rocksDBJni

testRuntime libs.slf4jlog4j
}{noformat}
The reason is that we are skipping :streams:copyDependantLibs during 
test-utils:compile now (to avoid the circular dependency), so we have to 
explictly depend in testCompile on any libs that would have been transitively 
pulled in from :streams (and are in the test's run-time code path).

In case you run into a similar error when you run the full test suite, the 
error I saw was:
{noformat}
java.lang.ClassNotFoundException: org.rocksdb.RocksDBException{noformat}

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
>  Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6703) MirrorMaker cannot make progress when any matched topic from a whitelist regexp has -1 leader

2018-03-22 Thread Attila Sasvari (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Attila Sasvari updated KAFKA-6703:
--
Description: 
Scenario:
 - MM whitelabel regexp matches multiple topics
 - destination cluster has 5 brokers with multiple topics replication factor 3
 - without partition reassign shut down 2 brokers
 - suppose a topic has no leader any more because it was off-sync and the 
leader and the rest of the replicas are hosted on the downed brokers.
 - so we have 1 topic with some partitions with leader -1
 - the rest of the matching topics has 3 replicas with leaders

MM will not produce into any of the matched topics until:
 - the "orphaned" topic removed or
 - the partition reassign carried out from the downed brokers (suppose you can 
turn these back on)

In the MirrorMaker logs, there are a lot of messages like the following ones:
{code}
[2018-03-22 19:55:32,522] DEBUG [Consumer clientId=consumer-1, 
groupId=console-consumer-43054] Coordinator discovery failed, refreshing 
metadata (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2018-03-22 19:55:32,522] DEBUG [Consumer clientId=consumer-1, 
groupId=console-consumer-43054] Sending metadata request (type=MetadataRequest, 
topics=) to node 192.168.1.102:9092 (id: 0 rack: null) 
(org.apache.kafka.clients.NetworkClient)

[2018-03-22 19:55:32,525] DEBUG Updated cluster metadata version 10 to 
Cluster(id = Y-qtoFP-RMq2uuVnkEKAAw, nodes = [192.168.1.102:9092 (id: 0 rack: 
null)], partitions = [Partition(topic = testR1P2, partition = 1, leader = none, 
replicas = [42], isr = [], offlineReplicas = [42]), Partition(topic = testR1P1, 
partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), 
Partition(topic = testAlive, partition = 0, leader = 0, replicas = [0], isr = 
[0], offlineReplicas = []), Partition(topic = testERRR, partition = 0, leader = 
0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = 
testR1P2, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas 
= [])]) (org.apache.kafka.clients.Metadata)

[2018-03-22 19:55:32,525] DEBUG [Consumer clientId=consumer-1, 
groupId=console-consumer-43054] Sending FindCoordinator request to broker 
192.168.1.102:9092 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2018-03-22 19:55:32,525] DEBUG [Consumer clientId=consumer-1, 
groupId=console-consumer-43054] Received FindCoordinator response 
ClientResponse(receivedTimeMs=1521744932525, latencyMs=0, disconnected=false, 
requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, 
clientId=consumer-1, correlationId=19), 
responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2018-03-22 19:55:32,526] DEBUG [Consumer clientId=consumer-1, 
groupId=console-consumer-43054] Group coordinator lookup failed: The 
coordinator is not available. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
{code}

Interestingly, if MirrorMaker uses {{zookeeper.connect}} in its consumer 
properties file, then an OldConsumer is created, and it can make progress.



  was:
Scenario:
 - MM whitelabel regexp matches multiple topics
 - destination cluster has 5 brokers with multiple topics replication factor 3
 - without partition reassign shut down 2 brokers
 - suppose a topic has no leader any more because it was off-sync and the 
leader and the rest of the replicas are hosted on the downed brokers.
 - so we have 1 topic with some partitions with leader -1
 - the rest of the matching topics has 3 replicas with leaders

MM will not produce into any of the matched topics until:
 - the "orphaned" topic removed or
 - the partition reassign carried out from the downed brokers (suppose you can 
turn these back on)

In the MirrorMaker logs, there are a lot of messages like the following ones:
{code}
[2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-1, groupId=1] Sending 
FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-0, groupId=1] Sending 
FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Received 
FindCoordinator response ClientResponse(receivedTimeMs=1521741547782, 
latencyMs=1, disconnected=false, 
requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, 
clientId=1-0, correlationId=71), 
responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-22 18:59:

[jira] [Comment Edited] (KAFKA-6679) Random corruption (CRC validation issues)

2018-03-22 Thread Ari Uka (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410079#comment-16410079
 ] 

Ari Uka edited comment on KAFKA-6679 at 3/22/18 6:36 PM:
-

So I attempted to re-generate one of the topics we had and I've been able to 
actually find a corrupt message in the `.log` file. 

There exists 3 brokers: kafka-01, kafka-02, kafka-03 and the topic 
influxdb-telemetry which contains 161,760,969 (161.7M) messages. 

The topic description:
{noformat}
Topic:influxdb-telemetry PartitionCount:6 ReplicationFactor:3 Configs:
Topic: influxdb-telemetry Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2
Topic: influxdb-telemetry Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 3
Topic: influxdb-telemetry Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 1
Topic: influxdb-telemetry Partition: 3 Leader: 2 Replicas: 2,3,1 Isr: 2
Topic: influxdb-telemetry Partition: 4 Leader: 3 Replicas: 3,1,2 Isr: 3
Topic: influxdb-telemetry Partition: 5 Leader: 1 Replicas: 1,2,3 Isr: 
1{noformat}
After inserting only 71,235 messages, influxdb-telemetry-0 becomes corrupt on 
kafka-02 and kafka-01 starts to complain:
{noformat}
[2018-03-22 17:00:00,690] ERROR [ReplicaManager broker=2] Error processing 
fetch operation on partition influxdb-telemetry-0, offset 71236 
(kafka.server.ReplicaManager) 
{noformat}
 

The last good written RecordSet looks like this:
{noformat}
baseOffset: 71233 lastOffset: 71235 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
position: 10285560 CreateTime: -1 isvalid: true size: 471 magic: 2 
compresscodec: NONE crc: 491186814{noformat}
The header of this RecordSet looks like so:

$ hd -s 10285560 -n 471 
/var/db/kafka/influxdb-telemetry-0/.log
{noformat}
009ce30d  00 00 00 00 00 01 16 27  00 00 01 4b 00 00 00 00  |...'...K|
009ce31d  02 ee 03 fd 04 00 00 00  00 00 01 ff ff ff ff ff  ||
009ce32d  ff ff ff ff ff ff ff ff  ff ff ff ff ff ff ff ff  ||
009ce33d  ff ff ff ff ff ff ff ff  ff 00 00 00 02 a8 02 00  ||
009ce34d  00 00 01 9a 02 00 8b 09  89 54 4f d9 dd 82 b2 14  |.TO.|
{noformat}
If we're looking at the log files on `kafka-01` (who is trying to replicate), 
it hasn't replicated past message 71235 and it's just sitting there complaining.

If we go on the leader of this partition `kafka-02` and dump the next message 
after the good message, it's indeed corrupt, this is what it looks like:

$ hd -s 10286031 -n 471 
/var/db/kafka/influxdb-telemetry-0/.log

 
{noformat}
009cf3cf  00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00  ||
*
009cf59f
{noformat}
The asterisk ( * ) here means that every line is the same, so basically message 
71,236 contains 471 zeroes. I picked 471 arbitrarily seen I don't actually know 
how big the RecordSet is for the next Record.

If we try to dump `influxdb-telemetry-0` on the leader (kafka-02), it barfs and 
ends early because of this exception:

 
{noformat}
$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files 
/var/db/kafka/influxdb-telemetry-0/.log | tail -n 5
Exception in thread "main" 
org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller 
than minimum record overhead (14).
 
{noformat}
So it only tails these messages when in reality the log is 1GB and message 
71,236 is only about 10MB through the file.

 
{noformat}
offset: 71231 position: 10283251 CreateTime: -1 isvalid: true keysize: -1 
valuesize: 130 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 71232 position: 10283251 CreateTime: -1 isvalid: true keysize: -1 
valuesize: 125 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 71233 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 
valuesize: 130 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 71234 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 
valuesize: 126 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 71235 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 
valuesize: 127 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
{noformat}
What could cause kafka to dump 471 zeroes for a RecordSet?

 


was (Author: ari6123):
So I attempted to re-generate one of the topics we had and I've been able to 
actually find a corrupt message in the `.log` file. 

There exists 3 brokers: kafka-01, kafka-02, kafka-03 and the topic 
influxdb-telemetry which contains 161,760,969 (161.7M) messages. 

The topic description:


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE

2018-03-22 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410082#comment-16410082
 ] 

Matthias J. Sax commented on KAFKA-6535:


This ticket is labels as "need kip" -- not sure about this. Thoughts?

> Set default retention ms for Streams repartition topics to Long.MAX_VALUE
> -
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Khaireddine Rezgui
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.
> More specifically, in {{RepartitionTopicConfig}} we have a few default 
> overrides for repartition topic configs, we should just add the override for 
> {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still 
> allows users to override themselves if they want via 
> {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this 
> update takes effect.
> In addition to the code change, we also need to have doc changes in 
> streams/upgrade_guide.html specifying this default value change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6679) Random corruption (CRC validation issues)

2018-03-22 Thread Ari Uka (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410079#comment-16410079
 ] 

Ari Uka commented on KAFKA-6679:


So I attempted to re-generate one of the topics we had and I've been able to 
actually find a corrupt message in the `.log` file. 

There exists 3 brokers: kafka-01, kafka-02, kafka-03 and the topic 
influxdb-telemetry which contains 161,760,969 (161.7M) messages. 

The topic description:


{noformat}
Topic:influxdb-telemetry PartitionCount:6 ReplicationFactor:3 Configs:
Topic: influxdb-telemetry Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2
Topic: influxdb-telemetry Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 3
Topic: influxdb-telemetry Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 1
Topic: influxdb-telemetry Partition: 3 Leader: 2 Replicas: 2,3,1 Isr: 2
Topic: influxdb-telemetry Partition: 4 Leader: 3 Replicas: 3,1,2 Isr: 3
Topic: influxdb-telemetry Partition: 5 Leader: 1 Replicas: 1,2,3 Isr: 
1{noformat}

After inserting only 71,235 messages, influxdb-telemetry-0 becomes corrupt on 
kafka-02 and kafka-01 starts to complain:

 

 

 
{noformat}
[2018-03-22 17:00:00,690] ERROR [ReplicaManager broker=2] Error processing 
fetch operation on partition influxdb-telemetry-0, offset 71236 
(kafka.server.ReplicaManager) 
{noformat}
 

The last good written RecordSet looks like this:



 
{noformat}
baseOffset: 71233 lastOffset: 71235 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
position: 10285560 CreateTime: -1 isvalid: true size: 471 magic: 2 
compresscodec: NONE crc: 491186814{noformat}

The header of this RecordSet looks like so:

$ hd -s 10285560 -n 471 
/var/db/kafka/influxdb-telemetry-0/.log

 

 
{noformat}
009ce30d  00 00 00 00 00 01 16 27  00 00 01 4b 00 00 00 00  |...'...K|
009ce31d  02 ee 03 fd 04 00 00 00  00 00 01 ff ff ff ff ff  ||
009ce32d  ff ff ff ff ff ff ff ff  ff ff ff ff ff ff ff ff  ||
009ce33d  ff ff ff ff ff ff ff ff  ff 00 00 00 02 a8 02 00  ||
009ce34d  00 00 01 9a 02 00 8b 09  89 54 4f d9 dd 82 b2 14  |.TO.|
{noformat}

If we're looking at the log files on `kafka-01` (who is trying to replicate), 
it hasn't replicated past message 71235 and it's just sitting there complaining.

If we go on the leader of this partition and dump the next message after the 
good message, it's indeed corrupt, this is what it looks like:



$ hd -s 10286031 -n 471 
/var/db/kafka/influxdb-telemetry-0/.log

 
{noformat}
009cf3cf  00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00  ||
*
009cf59f
{noformat}

The asterisk ( * ) here means that every line is the same, so basically message 
71236 contains 471 zeroes. I picked 471 arbitrarily seen I don't actually know 
how big the RecordSet is for the next Record.

If we try to dump `influxdb-telemetry-0` on the leader, it barfs and ends early 
because of this exception:



 
{noformat}
$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files 
/var/db/kafka/influxdb-telemetry-0/.log | tail -n 5
Exception in thread "main" 
org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller 
than minimum record overhead (14).
 
{noformat}

So it only tails these messages when in reality the log is 1GB and message 
71,236 is only about 10MB through the file.



 
{noformat}
offset: 71231 position: 10283251 CreateTime: -1 isvalid: true keysize: -1 
valuesize: 130 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 71232 position: 10283251 CreateTime: -1 isvalid: true keysize: -1 
valuesize: 125 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 71233 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 
valuesize: 130 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 71234 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 
valuesize: 126 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 71235 position: 10285560 CreateTime: -1 isvalid: true keysize: -1 
valuesize: 127 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
{noformat}

What could cause kafka to dump 471 zeroes for a RecordSet?

 

> Random corruption (CRC validation issues) 
> --
>
> Key: KAFKA-6679
> URL: https://issues.apache.org/jira/browse/KAFKA-6679
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, replication
>Affects Versions: 0.10.2.0, 1.0.1
> Envir

[jira] [Commented] (KAFKA-6703) MirrorMaker cannot make progress when any matched topic from a whitelist regexp has -1 leader

2018-03-22 Thread Attila Sasvari (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410078#comment-16410078
 ] 

Attila Sasvari commented on KAFKA-6703:
---

There is a call to 
[ensureCoordinatorReady()|https://github.com/apache/kafka/blob/f0a29a693548efe539cba04807e21862c8dfc1bf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L279]
 in {{ConsumerCoordinator}}. It tries to poll the coordinator in a 
[loop|https://github.com/apache/kafka/blob/f0a29a693548efe539cba04807e21862c8dfc1bf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L217-L219].
 It does not succeed, so it will retry the connection, and other matched topics 
are ignored until the failing coordinator becomes healthy.


> MirrorMaker cannot make progress when any matched topic from a whitelist 
> regexp has -1 leader
> -
>
> Key: KAFKA-6703
> URL: https://issues.apache.org/jira/browse/KAFKA-6703
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Attila Sasvari
>Priority: Major
>
> Scenario:
>  - MM whitelabel regexp matches multiple topics
>  - destination cluster has 5 brokers with multiple topics replication factor 3
>  - without partition reassign shut down 2 brokers
>  - suppose a topic has no leader any more because it was off-sync and the 
> leader and the rest of the replicas are hosted on the downed brokers.
>  - so we have 1 topic with some partitions with leader -1
>  - the rest of the matching topics has 3 replicas with leaders
> MM will not produce into any of the matched topics until:
>  - the "orphaned" topic removed or
>  - the partition reassign carried out from the downed brokers (suppose you 
> can turn these back on)
> In the MirrorMaker logs, there are a lot of messages like the following ones:
> {code}
> [2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-1, groupId=1] Sending 
> FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-0, groupId=1] Sending 
> FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Received 
> FindCoordinator response ClientResponse(receivedTimeMs=1521741547782, 
> latencyMs=1, disconnected=false, 
> requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, 
> clientId=1-0, correlationId=71), 
> responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
> error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] Received 
> FindCoordinator response ClientResponse(receivedTimeMs=1521741547782, 
> latencyMs=1, disconnected=false, 
> requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, 
> clientId=1-1, correlationId=71), 
> responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
> error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Group 
> coordinator lookup failed: The coordinator is not available. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] Group 
> coordinator lookup failed: The coordinator is not available. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] 
> Coordinator discovery failed, refreshing metadata 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] 
> Coordinator discovery failed, refreshing metadata 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> {code}
> Interestingly, if MirrorMaker uses {{zookeeper.connect}} in its consumer 
> properties file, then an OldConsumer is created, and it can make progress.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6703) MirrorMaker cannot make progress when any matched topic from a whitelist regexp has -1 leader

2018-03-22 Thread Attila Sasvari (JIRA)
Attila Sasvari created KAFKA-6703:
-

 Summary: MirrorMaker cannot make progress when any matched topic 
from a whitelist regexp has -1 leader
 Key: KAFKA-6703
 URL: https://issues.apache.org/jira/browse/KAFKA-6703
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Attila Sasvari


Scenario:
 - MM whitelabel regexp matches multiple topics
 - destination cluster has 5 brokers with multiple topics replication factor 3
 - without partition reassign shut down 2 brokers
 - suppose a topic has no leader any more because it was off-sync and the 
leader and the rest of the replicas are hosted on the downed brokers.
 - so we have 1 topic with some partitions with leader -1
 - the rest of the matching topics has 3 replicas with leaders

MM will not produce into any of the matched topics until:
 - the "orphaned" topic removed or
 - the partition reassign carried out from the downed brokers (suppose you can 
turn these back on)

In the MirrorMaker logs, there are a lot of messages like the following ones:
{code}
[2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-1, groupId=1] Sending 
FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-22 18:59:07,781] DEBUG [Consumer clientId=1-0, groupId=1] Sending 
FindCoordinator request to broker 192.168.1.102:9092 (id: 0 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Received 
FindCoordinator response ClientResponse(receivedTimeMs=1521741547782, 
latencyMs=1, disconnected=false, 
requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, 
clientId=1-0, correlationId=71), 
responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] Received 
FindCoordinator response ClientResponse(receivedTimeMs=1521741547782, 
latencyMs=1, disconnected=false, 
requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, 
clientId=1-1, correlationId=71), 
responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Group 
coordinator lookup failed: The coordinator is not available. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] Group 
coordinator lookup failed: The coordinator is not available. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-0, groupId=1] Coordinator 
discovery failed, refreshing metadata 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-22 18:59:07,783] DEBUG [Consumer clientId=1-1, groupId=1] Coordinator 
discovery failed, refreshing metadata 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
{code}

Interestingly, if MirrorMaker uses {{zookeeper.connect}} in its consumer 
properties file, then an OldConsumer is created, and it can make progress.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410019#comment-16410019
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---

mjsax opened a new pull request #4758: KAFKA-6054: Fix upgrade path from Kafka 
Streams v0.10.0
URL: https://github.com/apache/kafka/pull/4758
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -
>
> Key: KAFKA-6054
> URL: https://issues.apache.org/jira/browse/KAFKA-6054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: James Cheng
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: kip
> Fix For: 1.2.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade]
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running.
> We observed the following stack trace:
> {code:java}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> 
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 in

[jira] [Commented] (KAFKA-6679) Random corruption (CRC validation issues)

2018-03-22 Thread Ari Uka (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409993#comment-16409993
 ] 

Ari Uka commented on KAFKA-6679:


One thing I'd like to also say is that we are using:

FreeBSD with ZFS, I'm unsure if that's relevant here. The cluster has been 
pretty healthy for about a year straight, so I'm unsure if that's an issue

I did find someone else complaining about ZFS + FreeBSD 
[https://mail-archives.apache.org/mod_mbox/kafka-dev/201602.mbox/%3cjira.12939477.1455623036000.60174.1455625758...@atlassian.jira%3E]

> Random corruption (CRC validation issues) 
> --
>
> Key: KAFKA-6679
> URL: https://issues.apache.org/jira/browse/KAFKA-6679
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, replication
>Affects Versions: 0.10.2.0, 1.0.1
> Environment: FreeBSD 11.0-RELEASE-p8
>Reporter: Ari Uka
>Priority: Major
>
> I'm running into a really strange issue on production. I have 3 brokers and 
> randomly consumers will start to fail with an error message saying the CRC 
> does not match. The brokers are all on 1.0.1, but the issue started on 0.10.2 
> with the hope that upgrading would help fix the issue.
> On the kafka side, I see errors related to this across all 3 brokers:
> ```
> [2018-03-17 20:59:58,967] ERROR [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Error for partition topic-a-0 to broker 
> 1:org.apache.kafka.common.errors.CorruptRecordException: This message has 
> failed its CRC checksum, exceeds the valid size, or is otherwise corrupt. 
> (kafka.server.ReplicaFetcherThread)
> [2018-03-17 20:59:59,411] ERROR [ReplicaManager broker=3] Error processing 
> fetch operation on partition topic-b-0, offset 23848795 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller 
> than minimum record overhead (14).
> [2018-03-17 20:59:59,411] ERROR [ReplicaManager broker=3] Error processing 
> fetch operation on partition topic-b-0, offset 23848795 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller 
> than minimum record overhead (14)
> [2018-03-17 20:59:59,490] ERROR [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error for partition topic-c-2 to broker 
> 2:org.apache.kafka.common.errors.CorruptRecordException: This message has 
> failed its CRC checksum, exceeds the valid size, or is otherwise corrupt. 
> (kafka.server.ReplicaFetcherThread)
> ```
>  
> To fix this, I have to use the kafka-consumer-groups.sh command line tool and 
> do a binary search until I can find a non corrupt message and push the 
> offsets forward. It's annoying because I can't actually push to a specific 
> date because kafka-consumer-groups.sh starts to emit the same error, 
> ErrInvalidMessage, CRC does not match.
> The error popped up again the next day after fixing it tho, so I'm trying to 
> find the root cause. 
> I'm using the Go consumer [https://github.com/Shopify/sarama] and 
> [https://github.com/bsm/sarama-cluster]. 
> At first, I thought it could be the consumer libraries, but the error happens 
> with kafka-console-consumer.sh as well when a specific message is corrupted 
> in Kafka. I don't think it's possible for Kafka producers to actually push 
> corrupt messages to Kafka and then cause all consumers to break right? I 
> assume Kafka would reject corrupt messages, so I'm not sure what's going on 
> here.
> Should I just re-create the cluster, I don't think it's hardware failure 
> across the 3 machines tho.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6699) When one of two Kafka nodes are dead, streaming API cannot handle messaging

2018-03-22 Thread Antony Stubbs (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409957#comment-16409957
 ] 

Antony Stubbs commented on KAFKA-6699:
--

Also, what is the replication factor for your topics? Input and intermediate 
topics? A describe all will help. If replication is one then that would cause 
you problems if any broker fails.

> When one of two Kafka nodes are dead, streaming API cannot handle messaging
> ---
>
> Key: KAFKA-6699
> URL: https://issues.apache.org/jira/browse/KAFKA-6699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> Dears,
> I am observing quite often, when Kafka Broker is partly dead(*), then 
> application, which uses streaming API are doing nothing.
> (*) Partly dead in my case it means that one of two Kafka nodes are out of 
> order. 
> Especially when disk is full on one machine, then Broker is going in some 
> strange state, where streaming API goes vacations. It seems like regular 
> producer/consumer API has no problem in such a case.
> Can you have a look on that matter?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-03-22 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409886#comment-16409886
 ] 

Matthias J. Sax commented on KAFKA-6437:


Sounds reasonable to me. The question is, what we want to do when topics are 
missing? Should we stop the world and fail (ie, throw an exception) or just log 
a warning? Or maybe make it even configurable? WDYT?

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Priority: Minor
>  Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-03-22 Thread Mariam John (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409759#comment-16409759
 ] 

Mariam John commented on KAFKA-6437:


In the KafkaStreams we can use the admin client to list all topics and go 
through the source and sink topics to see which all topics are missing and 
print an error message like a MissingTopicException. Is there something more we 
want to do for a fix or is there more to this bug I am missing. 

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Priority: Minor
>  Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect

2018-03-22 Thread UYAR Farid (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409695#comment-16409695
 ] 

UYAR Farid commented on KAFKA-4107:
---

There is also another issue I dont know if it's related.

When I delete a connector and recreates it with the same name, the previously 
stored offset is retrieved. I also have to resort to increment the name of the 
connector.

 

That's unintuitive and I think not intended.

> Support offset reset capability in Kafka Connect
> 
>
> Key: KAFKA-4107
> URL: https://issues.apache.org/jira/browse/KAFKA-4107
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Priority: Major
>
> It would be useful in some cases to be able to reset connector offsets. For 
> example, if a topic in Kafka corresponding to a source database is 
> accidentally deleted (or deleted because of corrupt data), an administrator 
> may want to reset offsets and reproduce the log from the beginning. It may 
> also be useful to have support for overriding offsets, but that seems like a 
> less likely use case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6670) Implement a Scala wrapper library for Kafka Streams

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409504#comment-16409504
 ] 

ASF GitHub Bot commented on KAFKA-6670:
---

debasishg opened a new pull request #4756: KAFKA-6670: Implement a Scala 
wrapper library for Kafka Streams
URL: https://github.com/apache/kafka/pull/4756
 
 
   This PR implements a Scala wrapper library for Kafka Streams. The library is 
implemented as a project under streams, namely `:streams:streams-scala`. The PR 
contains the following:
   
   * the library implementation of the wrapper abstractions
   * the test suite 
   * the changes in `build.gradle` to build the library jar
   
   The library has been tested running the tests as follows:
   
   ```
   $ ./gradlew 
-Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdes 
streams:streams-scala:test
   $ ./gradlew 
-Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro 
streams:streams-scala:test
   $ ./gradlew -Dtest.single=WordCountTest streams:streams-scala:test
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement a Scala wrapper library for Kafka Streams
> ---
>
> Key: KAFKA-6670
> URL: https://issues.apache.org/jira/browse/KAFKA-6670
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Debasish Ghosh
>Assignee: Debasish Ghosh
>Priority: Major
>  Labels: api, kip
>
> Implement a Scala wrapper library for Kafka Streams.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6685) Connect deserialization log message should distinguish key from value

2018-03-22 Thread Jagadesh Adireddi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406081#comment-16406081
 ] 

Jagadesh Adireddi edited comment on KAFKA-6685 at 3/22/18 12:28 PM:


Hi [~rhauch],

I would like to pick this Jira up. Wanted some advice on the error message .I 
am thinking , if the exception occurred during:
 a) keyAndSchema conversion,  then error msg logged as :  Failed to convert 
Record Key to Kafka Connect format.

b) valueAndSchema conversion, then error msg logged as : Failed to convert 
Record Value to Kafka Connect format.

And Main ConnectException thrown as :  Exiting WorkerSinkTask due to 
unconvertedmessage exception.

Any help on the hint message to fix the issue would be great?


was (Author: adireddijagad...@gmail.com):
Hi [~rhauch],

I would like to pick this Jira up. Wanted some advice on the error message .I 
am thinking , if the exception occurred during:
 a) keyAndSchema conversion,  then error msg logged as :  Failed to convert 
message Key And Schema to Kafka Connect format.

b) valueAndSchema conversion, then error msg logged as : Failed to convert 
message Value And Schema to Kafka Connect format.

And Main ConnectException thrown as :  Exiting WorkerSinkTask due to 
unconvertedmessage exception.

Any help on the hint message to fix the issue would be great?

> Connect deserialization log message should distinguish key from value
> -
>
> Key: KAFKA-6685
> URL: https://issues.apache.org/jira/browse/KAFKA-6685
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yeva Byzek
>Priority: Minor
>  Labels: newbie
>
> Connect was configured for Avro key and value but data had String key and 
> Avro value. The resulting error message was misleading because it didn't 
> distinguish key from value, and so I was chasing problems with the value 
> instead of the key.
> tl;dr Connect should at least tell you whether the problem is with 
> deserializing the key or value of a record
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6673) Segment and Stamped implement Comparable, but don't override equals.

2018-03-22 Thread Koen De Groote (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409214#comment-16409214
 ] 

Koen De Groote edited comment on KAFKA-6673 at 3/22/18 8:32 AM:


That implementation would just do an identity comparison. Because Stamped and 
Segment don't have an explicit super class, just Object.

And Object's equals is this:


{code:java}
public boolean equals(Object obj) {
return (this == obj);
}
{code}


The whole idea behind this ticket is that the equals method should act 
logically the same as the compareTo method, as to have only 1 kind of behavior.


was (Author: koendg):
That implementation would just do an identity comparison. Because Stamped and 
Segment don't have an explicit super class, just Object.

And Object's equals is this:


{code:java}
public boolean equals(Object obj) {
return (this == obj);
}
{code}


The whole idea behind this is that the equals method should act logically the 
same as the compareTo method, as to have only 1 kind of behavior.

> Segment and Stamped implement Comparable, but don't override equals.
> 
>
> Key: KAFKA-6673
> URL: https://issues.apache.org/jira/browse/KAFKA-6673
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Koen De Groote
>Priority: Minor
> Attachments: KAFKA_6673.patch
>
>
> The classes in question:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
> and
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
> This came up while doing static analysis on the codebase on the trunk branch.
> As described by the analysis tool built into Intellij:
> {quote}
> Reports classes which implement java.lang.Comparable which do not override 
> equals(). If equals() is not overridden, the equals() implementation is not 
> consistent with the compareTo() implementation. If an object of such a class 
> is added to a collection such as java.util.SortedSet, this collection will 
> violate the contract of java.util.Set, which is defined in terms of equals().
> {quote}
>  
> Implementing an equals for an object is generally a best practice, especially 
> considering this caveat, where it's not the compareTo that will be used but 
> the equals method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6673) Segment and Stamped implement Comparable, but don't override equals.

2018-03-22 Thread Koen De Groote (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409214#comment-16409214
 ] 

Koen De Groote edited comment on KAFKA-6673 at 3/22/18 8:31 AM:


That implementation would just do an identity comparison. Because Stamped and 
Segment don't have an explicit super class, just Object.

And Object's equals is this:


{code:java}
public boolean equals(Object obj) {
return (this == obj);
}
{code}


The whole idea behind this is that the equals method should act logically the 
same as the compareTo method, as to have only 1 kind of behavior.


was (Author: koendg):
That implementation would just do an identity comparison. Because Stamped and 
Segment don't have an explicit super class, just Object.

And Object's equals is this:

public boolean equals(Object obj) {
return (this == obj);
}

The whole idea behind this is that the equals method should act logically the 
same as the compareTo method, as to have only 1 kind of behavior.

> Segment and Stamped implement Comparable, but don't override equals.
> 
>
> Key: KAFKA-6673
> URL: https://issues.apache.org/jira/browse/KAFKA-6673
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Koen De Groote
>Priority: Minor
> Attachments: KAFKA_6673.patch
>
>
> The classes in question:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
> and
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
> This came up while doing static analysis on the codebase on the trunk branch.
> As described by the analysis tool built into Intellij:
> {quote}
> Reports classes which implement java.lang.Comparable which do not override 
> equals(). If equals() is not overridden, the equals() implementation is not 
> consistent with the compareTo() implementation. If an object of such a class 
> is added to a collection such as java.util.SortedSet, this collection will 
> violate the contract of java.util.Set, which is defined in terms of equals().
> {quote}
>  
> Implementing an equals for an object is generally a best practice, especially 
> considering this caveat, where it's not the compareTo that will be used but 
> the equals method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6673) Segment and Stamped implement Comparable, but don't override equals.

2018-03-22 Thread Koen De Groote (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409214#comment-16409214
 ] 

Koen De Groote commented on KAFKA-6673:
---

That implementation would just do an identity comparison. Because Stamped and 
Segment don't have an explicit super class, just Object.

And Object's equals is this:

public boolean equals(Object obj) {
return (this == obj);
}

The whole idea behind this is that the equals method should act logically the 
same as the compareTo method, as to have only 1 kind of behavior.

> Segment and Stamped implement Comparable, but don't override equals.
> 
>
> Key: KAFKA-6673
> URL: https://issues.apache.org/jira/browse/KAFKA-6673
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Koen De Groote
>Priority: Minor
> Attachments: KAFKA_6673.patch
>
>
> The classes in question:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
> and
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
> This came up while doing static analysis on the codebase on the trunk branch.
> As described by the analysis tool built into Intellij:
> {quote}
> Reports classes which implement java.lang.Comparable which do not override 
> equals(). If equals() is not overridden, the equals() implementation is not 
> consistent with the compareTo() implementation. If an object of such a class 
> is added to a collection such as java.util.SortedSet, this collection will 
> violate the contract of java.util.Set, which is defined in terms of equals().
> {quote}
>  
> Implementing an equals for an object is generally a best practice, especially 
> considering this caveat, where it's not the compareTo that will be used but 
> the equals method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409185#comment-16409185
 ] 

ASF GitHub Bot commented on KAFKA-6683:
---

hachikuji opened a new pull request #4755: KAFKA-6683; Ensure producer state 
not mutated prior to append
URL: https://github.com/apache/kafka/pull/4755
 
 
   We were unintentionally mutating the cached queue of batches prior to 
appending to the log. This could have several bad consequences if the append 
ultimately failed. In the reporter's case, it caused the snapshot to be invalid 
after a segment roll. The snapshot contained producer state at offsets higher 
than the snapshot offset. If we ever had to load from that snapshot, the state 
was left inconsistent, which led to an error that ultimately crashed the 
replica fetcher.
   
   The fix required some refactoring to avoid sharing the same underlying queue 
inside `ProducerAppendInfo`. I have added test cases which reproduce the 
invalid snapshot state. I have also made an effort to clean up logging since it 
was not easy to track this problem down.
   
   One final note: I have removed the duplicate check inside 
`ProducerStateManager` since it was both redundant and incorrect. The 
redundancy was in the checking of the cached batches: we already check these in 
`Log.analyzeAndValidateProducerState`. The incorrectness was the handling of 
sequence number overflow: we were only handling one very specific case of 
overflow, but others would have resulted in an invalid assertion. Instead, we 
now throw `OutOfOrderSequenceException`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReplicaFetcher crashes with "Attempted to complete a transaction which was 
> not started" 
> 
>
> Key: KAFKA-6683
> URL: https://issues.apache.org/jira/browse/KAFKA-6683
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0
> Environment: os: GNU/Linux 
> arch: x86_64
> Kernel: 4.9.77
> jvm: OpenJDK 1.8.0
>Reporter: Chema Sanchez
>Assignee: Jason Gustafson
>Priority: Critical
> Attachments: server.properties
>
>
> We have been experiencing this issue lately when restarting or replacing 
> brokers of our Kafka clusters during maintenance operations.
> Having restarted or replaced a broker, after some minutes performing normally 
> it may suddenly throw the following exception and stop replicating some 
> partitions:
> {code:none}
> 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalArgumentException: Attempted to complete a transaction which 
> was not started
>     at 
> kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720)
>     at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540)
>     at 
> kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540)
>     at scala.collection.immutable.List.foreach(List.scala:389)
>     at 
> scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35)
>     at 
> scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35)
>     at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44)
>     at kafka.log.Log.loadProducersFromLog(Log.scala:540)
>     at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521)
>     at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514)
>     at scala.collection.Iterator.foreach(Iterator.scala:929)
>     at scala.collection.Iterator.foreach$(Iterator.scala:929)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at kafka.log.Log.loadProducerState(Log.scala:514)
>     at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487)
>     at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
>     at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>     at kafka.log.Log.truncateTo(Log.scala:1467)