[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-27 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700100#comment-16700100
 ] 

Patrik Kleindl commented on KAFKA-7660:
---

[~vvcephei] Thanks for confirming, glad I was able to help. It all pointed into 
that direction.

I can try out any change locally once you submit it.

Maybe I'll go for the constants, are patch-files ok too? I don't have committer 
rights etc yet.

Should I create a separate Jira issue for that?

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7508) Kafka broker anonymous disconnected from Zookeeper

2018-11-27 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700187#comment-16700187
 ] 

Jonathan Santilli commented on KAFKA-7508:
--

Kafka Brokers offer good metrics from JMX, you can use your favorite JMX client 
to connect to the Brokers JMX port.

Linkedin has good tools to monitor [https://github.com/linkedin/streaming]

Kafka Brokers is a process that runs in your server, you can monitor it as a 
normal process (just to check is the Broker is up or not)

But is not just that of course, you need to monitor the Broker from the 
application point of view as well and JMX metrics will help you with that.

 

BTW, if you consider this issue has been solved, let us know to close it, 
please.

Cheers!

--

Jonathan

> Kafka broker anonymous disconnected from Zookeeper
> --
>
> Key: KAFKA-7508
> URL: https://issues.apache.org/jira/browse/KAFKA-7508
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config
>Reporter: Sathish Yanamala
>Priority: Blocker
>
> Hello Team,
>  
> We are facing below Error , Kafka broker unable to connect Zookeeper , Please 
> check and suggest is there any configuration changes required on Kafka Broker.
>  
>  ERROR:
> 2018-10-15 12:24:07,502 WARN kafka.network.Processor: Attempting to send 
> response via channel for which there is no open connection, connection id 
> - -:9093-- -:47542-25929
> 2018-10-15 12:24:09,428 INFO kafka.coordinator.group.GroupCoordinator: 
> [GroupCoordinator 3]: Group KMOffsetCache-xxx  with generation 9 is now 
> empty (__consumer_offsets-22)
> 2018-10-15 12:24:09,428 INFO kafka.server.epoch.LeaderEpochFileCache: Updated 
> PartitionLeaderEpoch. New: \{epoch:1262, offset:151}, Current: \{epoch:1261, 
> offset144} for Partition: __consumer_offsets-22. Cache now contains 15 
> entries.
> {color:#d04437}*2018-10-15 12:24:10,905 ERROR kafka.utils.KafkaScheduler: 
> Uncaught exception in scheduled task 'highwatermark-checkpoint'*{color}
> {color:#d04437}*java.lang.OutOfMemoryError: Java heap space*{color}
> {color:#d04437}    at{color} 
> scala.collection.convert.DecorateAsScala$$Lambda$214/x.get$Lambda(Unknown 
> Source)
>     at 
> java.lang.invoke.LambdaForm$DMH/xxx.invokeStatic_LL_L(LambdaForm$DMH)
>     at 
> java.lang.invoke.LambdaForm$MH/xx.linkToTargetMethod(LambdaForm$MH)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter(DecorateAsScala.scala:45)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter$(DecorateAsScala.scala:44)
>     at 
> scala.collection.JavaConverters$.collectionAsScalaIterableConverter(JavaConverters.scala:73)
>     at kafka.utils.Pool.values(Pool.scala:85)
>     at 
> kafka.server.ReplicaManager.nonOfflinePartitionsIterator(ReplicaManager.scala:397)
>     at 
> kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1340)
>     at 
> kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:253)
>     at 
> kafka.server.ReplicaManager$$Lambda$608/xx.apply$mcV$sp(Unknown Source)
>     at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:110)
>     at 
> kafka.utils.KafkaScheduler$$Lambda$269/.apply$mcV$sp(Unknown Source)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> Thank you,
> Sathish Yanamala
> M:832-382-4487



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7677) Client login with already existing JVM subject

2018-11-27 Thread Gabor Somogyi (JIRA)
Gabor Somogyi created KAFKA-7677:


 Summary: Client login with already existing JVM subject
 Key: KAFKA-7677
 URL: https://issues.apache.org/jira/browse/KAFKA-7677
 Project: Kafka
  Issue Type: New Feature
Reporter: Gabor Somogyi


If JVM is already logged in to KDC and has a Subject + TGT in it's security 
context it can be used by clients and not logging in again. Example code:

{code:java}
org.apache.hadoop.security.UserGroupInformation.getCurrentUser().doAs(
  new java.security.PrivilegedExceptionAction[Unit] { 
override def run(): Unit = {
val subject = 
javax.security.auth.Subject.getSubject(java.security.AccessController.getContext())
val adminClient = AdminClient.create...
  }
)
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7677) Client login with already existing JVM subject

2018-11-27 Thread Gabor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated KAFKA-7677:
-
Affects Version/s: 2.2.0

> Client login with already existing JVM subject
> --
>
> Key: KAFKA-7677
> URL: https://issues.apache.org/jira/browse/KAFKA-7677
> Project: Kafka
>  Issue Type: New Feature
>Affects Versions: 2.2.0
>Reporter: Gabor Somogyi
>Priority: Major
>
> If JVM is already logged in to KDC and has a Subject + TGT in it's security 
> context it can be used by clients and not logging in again. Example code:
> {code:java}
> org.apache.hadoop.security.UserGroupInformation.getCurrentUser().doAs(
>   new java.security.PrivilegedExceptionAction[Unit] { 
> override def run(): Unit = {
> val subject = 
> javax.security.auth.Subject.getSubject(java.security.AccessController.getContext())
> val adminClient = AdminClient.create...
>   }
> )
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-27 Thread Jonathan Santilli (JIRA)
Jonathan Santilli created KAFKA-7678:


 Summary: Failed to close producer due to 
java.lang.NullPointerException
 Key: KAFKA-7678
 URL: https://issues.apache.org/jira/browse/KAFKA-7678
 Project: Kafka
  Issue Type: Bug
Reporter: Jonathan Santilli


This occurs when the group is rebalancing in a Kafka Stream application and the 
process (the Kafka Stream application) receives a SIGTERM to stop it gracefully.

 

 
{noformat}
ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
Failed to close producer due to the following error:
java.lang.NullPointerException
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
 

 

Although I have checked the code and the method 
`maybeAbortTransactionAndCloseRecordCollector` in the `StreamTask.java` class 
is expecting any kind of error to happen since is catching `Throwable`.

 

 

 
{noformat}
try {
 recordCollector.close();
} catch (final Throwable e) {
 log.error("Failed to close producer due to the following error:", e);
} finally {
 producer = null;
}{noformat}
 

Should we consider this a bug?

In my opinion, we could check for the `null` possibility at 
`RecordCollectorImpl.java` class:
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 producer.close();
 producer = null;
 checkForException();
}{noformat}
 

Change it for:

 
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 if ( Objects.nonNull(producer) ) {
producer.close();
producer = null;
 }
 checkForException();
}{noformat}
 

How does that sound?

 

Kafka Brokers running 2.0.0

Kafka Stream and client 2.1.0

OpenJDK 8

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-27 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli updated KAFKA-7678:
-
Description: 
This occurs when the group is rebalancing in a Kafka Stream application and the 
process (the Kafka Stream application) receives a *SIGTERM* to stop it 
gracefully.

 

 
{noformat}
ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
Failed to close producer due to the following error:
java.lang.NullPointerException
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
 

 

Although I have checked the code and the method 
`*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
class is expecting any kind of error to happen since is catching `*Throwable*`.

 

 

 
{noformat}
try {
 recordCollector.close();
} catch (final Throwable e) {
 log.error("Failed to close producer due to the following error:", e);
} finally {
 producer = null;
}{noformat}
 

Should we consider this a bug?

In my opinion, we could check for the `*null*` possibility at 
`*RecordCollectorImpl*.*java*` class:
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 producer.close();
 producer = null;
 checkForException();
}{noformat}
 

Change it for:

 
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 if ( Objects.nonNull(producer) ) {
producer.close();
producer = null;
 }
 checkForException();
}{noformat}
 

How does that sound?

 

Kafka Brokers running 2.0.0

Kafka Stream and client 2.1.0

OpenJDK 8

 

  was:
This occurs when the group is rebalancing in a Kafka Stream application and the 
process (the Kafka Stream application) receives a SIGTERM to stop it gracefully.

 

 
{noformat}
ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
Failed to close producer due to the following error:
java.lang.NullPointerException
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
 

 

Although I have checked the code and the method 
`maybeAbortTransactionAndCloseRecordCollector` in the `StreamTask.java` class 
is expecting any kind of error to happen since is catching `Throwable`.

 

 

 
{noformat}
try {
 recordCollector.close();
} catch (final Throwable e) {
 log.error("Failed to close producer due to the following error:", e);
} finally {
 producer = null;
}{noformat}
 

Should we consider this a bug?

In my opinion, we could check for the `null` possibility at 
`RecordCollectorImpl.java` class:
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 producer.close();
 producer = null;
 checkForException();
}{noformat}
 

Change it for:

 
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 if ( Objects.nonNull(producer) ) {
producer.close();
producer = null;
 }
 checkForException();
}{noformat}
 

How does that sound?

 

Kafka Brokers running 2.0.0

Kafka Stream and client 2.1.0

OpenJDK 8

 


> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue

[jira] [Commented] (KAFKA-6635) Producer close does not await pending transaction

2018-11-27 Thread Viktor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700392#comment-16700392
 ] 

Viktor Somogyi commented on KAFKA-6635:
---

[~hachikuji] is this still relevant? Do you mind if I pick this up?

> Producer close does not await pending transaction
> -
>
> Key: KAFKA-6635
> URL: https://issues.apache.org/jira/browse/KAFKA-6635
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Priority: Major
>
> Currently close() only awaits completion of pending produce requests. If 
> there is a transaction ongoing, it may be dropped. For example, if one thread 
> is calling {{commitTransaction()}} and another calls {{close()}}, then the 
> commit may never happen even if the caller is willing to wait for it (by 
> using a long timeout). What's more, the thread blocking in 
> {{commitTransaction()}} will be stuck since the result will not be completed 
> once the producer has shutdown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7508) Kafka broker anonymous disconnected from Zookeeper

2018-11-27 Thread Sathish Yanamala (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700514#comment-16700514
 ] 

Sathish Yanamala commented on KAFKA-7508:
-

Jonathan,

Thanks for the support , Please close story !

 

Thank you,

Sathish Yanamala

> Kafka broker anonymous disconnected from Zookeeper
> --
>
> Key: KAFKA-7508
> URL: https://issues.apache.org/jira/browse/KAFKA-7508
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config
>Reporter: Sathish Yanamala
>Priority: Blocker
>
> Hello Team,
>  
> We are facing below Error , Kafka broker unable to connect Zookeeper , Please 
> check and suggest is there any configuration changes required on Kafka Broker.
>  
>  ERROR:
> 2018-10-15 12:24:07,502 WARN kafka.network.Processor: Attempting to send 
> response via channel for which there is no open connection, connection id 
> - -:9093-- -:47542-25929
> 2018-10-15 12:24:09,428 INFO kafka.coordinator.group.GroupCoordinator: 
> [GroupCoordinator 3]: Group KMOffsetCache-xxx  with generation 9 is now 
> empty (__consumer_offsets-22)
> 2018-10-15 12:24:09,428 INFO kafka.server.epoch.LeaderEpochFileCache: Updated 
> PartitionLeaderEpoch. New: \{epoch:1262, offset:151}, Current: \{epoch:1261, 
> offset144} for Partition: __consumer_offsets-22. Cache now contains 15 
> entries.
> {color:#d04437}*2018-10-15 12:24:10,905 ERROR kafka.utils.KafkaScheduler: 
> Uncaught exception in scheduled task 'highwatermark-checkpoint'*{color}
> {color:#d04437}*java.lang.OutOfMemoryError: Java heap space*{color}
> {color:#d04437}    at{color} 
> scala.collection.convert.DecorateAsScala$$Lambda$214/x.get$Lambda(Unknown 
> Source)
>     at 
> java.lang.invoke.LambdaForm$DMH/xxx.invokeStatic_LL_L(LambdaForm$DMH)
>     at 
> java.lang.invoke.LambdaForm$MH/xx.linkToTargetMethod(LambdaForm$MH)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter(DecorateAsScala.scala:45)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter$(DecorateAsScala.scala:44)
>     at 
> scala.collection.JavaConverters$.collectionAsScalaIterableConverter(JavaConverters.scala:73)
>     at kafka.utils.Pool.values(Pool.scala:85)
>     at 
> kafka.server.ReplicaManager.nonOfflinePartitionsIterator(ReplicaManager.scala:397)
>     at 
> kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1340)
>     at 
> kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:253)
>     at 
> kafka.server.ReplicaManager$$Lambda$608/xx.apply$mcV$sp(Unknown Source)
>     at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:110)
>     at 
> kafka.utils.KafkaScheduler$$Lambda$269/.apply$mcV$sp(Unknown Source)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> Thank you,
> Sathish Yanamala
> M:832-382-4487



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-27 Thread lkgen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700533#comment-16700533
 ] 

lkgen commented on KAFKA-1194:
--

Hi, if you are interested on a more complete approach for the problem you may 
inspect commit

[https://github.com/lkgendev/kafka/commit/a9f49f51cc782b59bd2c20f9275dbf43ed8cc90e]

 

 

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, 
> image-2018-11-26-10-18-59-381.png, kafka-1194-v1.patch, kafka-1194-v2.patch, 
> kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-11-27 Thread LiorK (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LiorK updated KAFKA-1194:
-
Comment: was deleted

(was:   I have some code that appears to solve the problem on Windows, 
including the compaction, but it does not pass the Linux unit tests and may 
break some functionality

  Can you try using this code and help fix the unit tests ? is there a way to 
share code that does not pass unit testing
)

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, 
> image-2018-11-26-10-18-59-381.png, kafka-1194-v1.patch, kafka-1194-v2.patch, 
> kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7679) With acks=all a single "stuck" non-leader replica can cause a timeout

2018-11-27 Thread Corentin Chary (JIRA)
Corentin Chary created KAFKA-7679:
-

 Summary: With acks=all a single "stuck" non-leader replica can 
cause a timeout
 Key: KAFKA-7679
 URL: https://issues.apache.org/jira/browse/KAFKA-7679
 Project: Kafka
  Issue Type: Bug
Reporter: Corentin Chary


>From the documentation:
{code:java}
acks=all

This means the leader will wait for the full set of in-sync replicas to 
acknowledge the record. This guarantees that the record will not be lost as 
long as at least one in-sync replica remains alive. This is the strongest 
available guarantee. This is equivalent to the acks=-1 setting.{code}
{code:java}
min.insync.replicas

When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the 
minimum number of replicas that must acknowledge a write for the write to be 
considered successful. If this minimum cannot be met, then the producer will 
raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
When used together, min.insync.replicas and acks allow you to enforce greater 
durability guarantees. A typical scenario would be to create a topic with a 
replication factor of 3, set min.insync.replicas to 2, and produce with acks of 
"all". This will ensure that the producer raises an exception if a majority of 
replicas do not receive a write.   int{code}
Given a replication factor of 3 and min.inseyc.repliacs set to 2, I would 
expect that the client get an acknowledgment as soon as it writes to the leader 
and at least one replica. This is what happens when on a 3 node cluster one of 
the broker is down for example.

Howether, it looks like this is not the case when a broker is "stuck" (which 
happens when you have network "blips").

Here is how I reproduced this, but you can probably do the same with iptables 
on your own cluster:
{code:java}
# Start a cluster with 3 nodes
$ docker-compose up -d
$ docker-compose scale kafka=3
Starting kafka-docker_kafka_1_dbf4109a3095 ... done
Creating kafka-docker_kafka_2_973a373fa5b5 ... done
Creating kafka-docker_kafka_3_3d8fab2ac44a ... done

# Create topics with various settings
$ docker-compose exec kafka bash
$ kafka-topics.sh --create --topic tests-irs2 --config min.insync.replicas=2 
--zookeeper=${KAFKA_ZOOKEEPER_CONNECT} --partitions=1 --replication-factor=3
$ kafka-topics.sh --describe --zookeeper ${KAFKA_ZOOKEEPER_CONNECT}
Topic:tests-irs2 PartitionCount:1 ReplicationFactor:3 
Configs:min.insync.replicas=2
Topic: tests-irs2 Partition: 0 Leader: 1003 Replicas: 1003,1002,1001 Isr: 
1003,1002,1001{code}
 

Then start a small script that produces message periodically

 
{code:java}
# Start the latency to get an idea of the normal latency
$ KAFKA_BOOTSTRAP_SERVERS=localhost:32784 KAFKA_TOPIC=tests-irs2 KAFKA_ACKS=-1 
../test.py
localhost:32784 tests-irs2 0.068457s
localhost:32784 tests-irs2 0.016032s
localhost:32784 tests-irs2 0.015884s
localhost:32784 tests-irs2 0.018244s
localhost:32784 tests-irs2 0.008625s{code}
 

Run `docker pause` on 1002

 
{code:java}
 
2018-11-27 14:07:47,608 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout 
waiting for future
Traceback (most recent call last):
 File "../test.py", line 27, in send_message
 producer.flush(timeout=MESSAGE_INTERVAL_SECS)
 File 
"/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/kafka.py",
 line 577, in flush
 self._accumulator.await_flush_completion(timeout=timeout)
 File 
"/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/record_accumulator.py",
 line 530, in await_flush_completion
 raise Errors.KafkaTimeoutError('Timeout waiting for future')
KafkaTimeoutError: KafkaTimeoutError: Timeout waiting for future
2018-11-27 14:07:49,618 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout 
waiting for future
Traceback (most recent call last):
 File "../test.py", line 27, in send_message
 producer.flush(timeout=MESSAGE_INTERVAL_SECS)
 File 
"/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/kafka.py",
 line 577, in flush
 self._accumulator.await_flush_completion(timeout=timeout)
 File 
"/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/record_accumulator.py",
 line 530, in await_flush_completion
 raise Errors.KafkaTimeoutError('Timeout waiting for future')
KafkaTimeoutError: KafkaTimeoutError: Timeout waiting for future
2018-11-27 14:07:51,628 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout 
waiting for future
Traceback (most recent call last):
 File "../test.py", line 27, in send_message
 producer.flush(timeout=MESSAGE_INTERVAL_SECS)
 File 
"/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/kafka.py",
 line 577, in flush
 self._accumulator.await_flush_completion(timeout=timeout)
 File 
"/Users/corentin.chary/dev/kafka-

[jira] [Resolved] (KAFKA-7508) Kafka broker anonymous disconnected from Zookeeper

2018-11-27 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli resolved KAFKA-7508.
--
Resolution: Not A Problem

Closed since was requested by the reported.

Sathish agree to try JVM recommended parameters

> Kafka broker anonymous disconnected from Zookeeper
> --
>
> Key: KAFKA-7508
> URL: https://issues.apache.org/jira/browse/KAFKA-7508
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config
>Reporter: Sathish Yanamala
>Priority: Blocker
>
> Hello Team,
>  
> We are facing below Error , Kafka broker unable to connect Zookeeper , Please 
> check and suggest is there any configuration changes required on Kafka Broker.
>  
>  ERROR:
> 2018-10-15 12:24:07,502 WARN kafka.network.Processor: Attempting to send 
> response via channel for which there is no open connection, connection id 
> - -:9093-- -:47542-25929
> 2018-10-15 12:24:09,428 INFO kafka.coordinator.group.GroupCoordinator: 
> [GroupCoordinator 3]: Group KMOffsetCache-xxx  with generation 9 is now 
> empty (__consumer_offsets-22)
> 2018-10-15 12:24:09,428 INFO kafka.server.epoch.LeaderEpochFileCache: Updated 
> PartitionLeaderEpoch. New: \{epoch:1262, offset:151}, Current: \{epoch:1261, 
> offset144} for Partition: __consumer_offsets-22. Cache now contains 15 
> entries.
> {color:#d04437}*2018-10-15 12:24:10,905 ERROR kafka.utils.KafkaScheduler: 
> Uncaught exception in scheduled task 'highwatermark-checkpoint'*{color}
> {color:#d04437}*java.lang.OutOfMemoryError: Java heap space*{color}
> {color:#d04437}    at{color} 
> scala.collection.convert.DecorateAsScala$$Lambda$214/x.get$Lambda(Unknown 
> Source)
>     at 
> java.lang.invoke.LambdaForm$DMH/xxx.invokeStatic_LL_L(LambdaForm$DMH)
>     at 
> java.lang.invoke.LambdaForm$MH/xx.linkToTargetMethod(LambdaForm$MH)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter(DecorateAsScala.scala:45)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter$(DecorateAsScala.scala:44)
>     at 
> scala.collection.JavaConverters$.collectionAsScalaIterableConverter(JavaConverters.scala:73)
>     at kafka.utils.Pool.values(Pool.scala:85)
>     at 
> kafka.server.ReplicaManager.nonOfflinePartitionsIterator(ReplicaManager.scala:397)
>     at 
> kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1340)
>     at 
> kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:253)
>     at 
> kafka.server.ReplicaManager$$Lambda$608/xx.apply$mcV$sp(Unknown Source)
>     at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:110)
>     at 
> kafka.utils.KafkaScheduler$$Lambda$269/.apply$mcV$sp(Unknown Source)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> Thank you,
> Sathish Yanamala
> M:832-382-4487



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-11-27 Thread Hans Schuell (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700692#comment-16700692
 ] 

Hans Schuell commented on KAFKA-6188:
-

I have also a Confluent 5.0.0 (Kafka 2.0.0) test system on Windows 10 and the 
broker stopped working after I have deleted a topic and the broker was 
restarted.

The stack trace from server.log is below. The german system message from the 
Windows operating system says, that the process cannot access the file, because 
it is still in use.

The english Windows system message would be "*the process cannot access the 
file because it is being used by another process*"

This is one major difference between *nix and Windows. Under Windows a file 
cannot be moved (in the case below) or deleted, when there is still an open 
file handle of the running current process or of another process. I know, that 
Windows is not supported, but it is very likely, that also under Linux it is a 
bit weird, that the file xxx.log is renamed to xxx.log.deleted by the 
"scheduler" while the file is not yet closed. 

{{[2018-11-27 17:11:04,811] INFO [Log partition=test-events-1-4, 
dir=C:\tmp\confluent\kafka-logs] Scheduling log segment [baseOffset 0, size 
219591] for deletion. (kafka.log.Log)}}
{{[2018-11-27 17:11:04,814] ERROR Error while deleting segments for 
test-events-1-4 in dir C:\tmp\confluent\kafka-logs 
(kafka.server.LogDirFailureChannel)}}
{{java.nio.file.FileSystemException: 
C:\tmp\confluent\kafka-logs\test-events-1-4\.log -> 
C:\tmp\confluent\kafka-logs\test-events-1-4\.log.deleted: 
Der Prozess kann nicht auf die Datei zugreifen, da sie von einem anderen 
Prozess verwendet wird.}}{{ at 
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)}}
{{ at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)}}
{{ at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)}}
{{ at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)}}
{{ at java.nio.file.Files.move(Files.java:1395)}}
{{ at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:786)}}
{{ at 
org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:211)}}
{{ at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:488)}}
{{ at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1751)}}
{{ at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1738)}}
{{ at 
kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1309)}}
{{ at 
kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1309)}}
{{ at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
{{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
{{ at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1309)}}
{{ at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1300)}}
{{ at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1300)}}
{{ at kafka.log.Log.maybeHandleIOException(Log.scala:1837)}}
{{ at kafka.log.Log.deleteSegments(Log.scala:1300)}}
{{ at kafka.log.Log.deleteOldSegments(Log.scala:1295)}}
{{ at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1367)}}
{{ at kafka.log.Log.deleteOldSegments(Log.scala:1361)}}
{{ at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:874)}}
{{ at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:872)}}
{{ at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)}}
{{ at scala.collection.immutable.List.foreach(List.scala:392)}}
{{ at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)}}
{{ at kafka.log.LogManager.cleanupLogs(LogManager.scala:872)}}
{{ at 
kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:395)}}
{{ at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114)}}
{{ at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)}}
{{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
{{ at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)}}
{{ at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)}}
{{ at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}}
{{ at java.lang.Thread.run(Thread.java:748)}}
{{ Suppressed: java.nio.file.FileSystemException: 
C:\tmp\confluent\kafka-logs\test-events-1-4\.log -> 
C:\tmp\confluent\kafka-logs\test-events-1-4\.log.deleted: 
Der Prozess kann nicht auf die Datei zugreifen, da sie

[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2018-11-27 Thread Hugh O'Brien (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700785#comment-16700785
 ] 

Hugh O'Brien commented on KAFKA-7504:
-

[~guozhang] that's an interesting suggestion, as long as the data is indeed 
known to be 'once only' it may be appropriate. However I'm biased in thinking 
that a better caching algorithm should be able to solve this without 
application level changes. I've spoken about this here if you're curious: 
[https://www.confluent.io/kafka-summit-sf18/kafka-on-zfs]

Some benchmarks for these different approaches should be fairly straightforward 
to produce though. Any volunteers?

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer with non-blocking 
> manner.
>  However, Kafka uses sendfile(2) for transferring log data to client sockets. 
> The target data might be in page cache, but old data which has written a bit 
> far ago and never read since then, are likely not

[jira] [Comment Edited] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2018-11-27 Thread Hugh O'Brien (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700785#comment-16700785
 ] 

Hugh O'Brien edited comment on KAFKA-7504 at 11/27/18 5:46 PM:
---

[~guozhang] that's an interesting suggestion, as long as the data is indeed 
known to be 'once only' it may be appropriate. However I'm biased in thinking 
that a better caching algorithm should be able to solve this without 
application level changes. I've spoken about this here if you're curious: 
[https://www.confluent.io/kafka-summit-sf18/kafka-on-zfs]

Yuto spoke also: https://www.confluent.io/kafka-summit-sf18/kafka-multi-tenancy

Some benchmarks for these different approaches should be fairly straightforward 
to produce though. Any volunteers?


was (Author: hughobrien):
[~guozhang] that's an interesting suggestion, as long as the data is indeed 
known to be 'once only' it may be appropriate. However I'm biased in thinking 
that a better caching algorithm should be able to solve this without 
application level changes. I've spoken about this here if you're curious: 
[https://www.confluent.io/kafka-summit-sf18/kafka-on-zfs]

Some benchmarks for these different approaches should be fairly straightforward 
to produce though. Any volunteers?

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread conta

[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701020#comment-16701020
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---

guozhangwang closed pull request #5795: KAFKA-7223: Suppression Buffer Metrics
URL: https://github.com/apache/kafka/pull/5795
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 5b0d8b59233..12c481307f7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -20,10 +20,17 @@
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
 
 public class Sensors {
 private Sensors() {}
@@ -38,8 +45,8 @@ public static Sensor lateRecordDropSensor(final 
InternalProcessorContext context
 );
 StreamsMetricsImpl.addInvocationRateAndCount(
 sensor,
-"stream-processor-node-metrics",
-metrics.tagMap("task-id", context.taskId().toString(), 
"processor-node-id", context.currentNode().name()),
+PROCESSOR_NODE_METRICS_GROUP,
+metrics.tagMap("task-id", context.taskId().toString(), 
PROCESSOR_NODE_ID_TAG, context.currentNode().name()),
 "late-record-drop"
 );
 return sensor;
@@ -75,4 +82,40 @@ public static Sensor recordLatenessSensor(final 
InternalProcessorContext context
 );
 return sensor;
 }
+
+public static Sensor suppressionEmitSensor(final InternalProcessorContext 
context) {
+final StreamsMetricsImpl metrics = context.metrics();
+
+final Sensor sensor = metrics.nodeLevelSensor(
+context.taskId().toString(),
+context.currentNode().name(),
+"suppression-emit",
+Sensor.RecordingLevel.DEBUG
+);
+
+final Map tags = metrics.tagMap(
+"task-id", context.taskId().toString(),
+PROCESSOR_NODE_ID_TAG, context.currentNode().name()
+);
+
+sensor.add(
+new MetricName(
+"suppression-emit-rate",
+PROCESSOR_NODE_METRICS_GROUP,
+"The average number of occurrence of suppression-emit 
operation per second.",
+tags
+),
+new Rate(TimeUnit.SECONDS, new Sum())
+);
+sensor.add(
+new MetricName(
+"suppression-emit-total",
+PROCESSOR_NODE_METRICS_GROUP,
+"The total number of occurrence of suppression-emit 
operations.",
+tags
+),
+new Total()
+);
+return sensor;
+}
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 50e74a38fd6..06d5004f65f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.kstream.internals.suppress;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
 import 
org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
 import org.apache.kafk

[jira] [Commented] (KAFKA-7597) Trogdor - Support transactions in ProduceBenchWorker

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701009#comment-16701009
 ] 

ASF GitHub Bot commented on KAFKA-7597:
---

cmccabe closed pull request #5885: KAFKA-7597: Make Trogdor ProduceBenchWorker 
support transactions
URL: https://github.com/apache/kafka/pull/5885
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 4d514dfcf97..5ce648adf77 100644
--- a/build.gradle
+++ b/build.gradle
@@ -565,6 +565,7 @@ project(':core') {
   dependencies {
 compile project(':clients')
 compile libs.jacksonDatabind
+compile libs.jacksonJDK8Datatypes
 compile libs.joptSimple
 compile libs.metrics
 compile libs.scalaLibrary
@@ -830,6 +831,7 @@ project(':clients') {
 compile libs.snappy
 compile libs.slf4jApi
 compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token 
parsing
+compileOnly libs.jacksonJDK8Datatypes
 
 jacksonDatabindConfig libs.jacksonDatabind // to publish as provided scope 
dependency.
 
@@ -839,6 +841,7 @@ project(':clients') {
 
 testRuntime libs.slf4jlog4j
 testRuntime libs.jacksonDatabind
+testRuntime libs.jacksonJDK8Datatypes
   }
 
   task determineCommitId {
@@ -918,6 +921,7 @@ project(':tools') {
 compile project(':log4j-appender')
 compile libs.argparse4j
 compile libs.jacksonDatabind
+compile libs.jacksonJDK8Datatypes
 compile libs.slf4jApi
 
 compile libs.jacksonJaxrsJsonProvider
@@ -1347,6 +1351,7 @@ project(':connect:json') {
   dependencies {
 compile project(':connect:api')
 compile libs.jacksonDatabind
+compile libs.jacksonJDK8Datatypes
 compile libs.slf4jApi
 
 testCompile libs.easymock
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 7dd3604db08..59f56fcd4ab 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -103,6 +103,7 @@ libs += [
   bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
   easymock: "org.easymock:easymock:$versions.easymock",
   jacksonDatabind: 
"com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
+  jacksonJDK8Datatypes: 
"com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson",
   jacksonJaxrsJsonProvider: 
"com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
   jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb",
   jaxrsApi: "javax.ws.rs:javax.ws.rs-api:$versions.jaxrs",
diff --git a/tests/bin/trogdor-run-transactional-produce-bench.sh 
b/tests/bin/trogdor-run-transactional-produce-bench.sh
new file mode 100755
index 000..fd5ff0a01f2
--- /dev/null
+++ b/tests/bin/trogdor-run-transactional-produce-bench.sh
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+COORDINATOR_ENDPOINT="localhost:8889"
+TASK_ID="produce_bench_$RANDOM"
+TASK_SPEC=$(
+cat < transactionGenerator;
 private final Map producerConf;
 private final Map adminClientConf;
 private final Map commonClientConf;
@@ -53,6 +83,7 @@ public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
  @JsonProperty("maxMessages") int maxMessages,
  @JsonProperty("keyGenerator") PayloadGenerator 
keyGenerator,
  @JsonProperty("valueGenerator") PayloadGenerator 
valueGenerator,
+ @JsonProperty("transactionGenerator") 
Optional txGenerator,
  @JsonProperty("producerConf") Map 
producerConf,
  @JsonProperty("commonClientConf") Map 
commonClientConf,
  @JsonProperty("adminClientConf") Map 
adminClientConf,
@@ -67,6 +98,7 @@ public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
 new SequentialPayloadGenerator(4, 0) : keyGenerator;
 this.valueGenerator = valueGenerator == null ?
 new ConstantPayloadGenerator(5

[jira] [Commented] (KAFKA-7367) Streams should not create state store directories unless they are needed

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701041#comment-16701041
 ] 

ASF GitHub Bot commented on KAFKA-7367:
---

mjsax closed pull request #5696: KAFKA-7367: Streams should not create state 
store directories unless they are needed
URL: https://github.com/apache/kafka/pull/5696
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bbda11de150..c29b7bc4244 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -644,12 +644,6 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 final LogContext logContext = new 
LogContext(String.format("stream-client [%s] ", clientId));
 this.log = logContext.logger(getClass());
 
-try {
-stateDirectory = new StateDirectory(config, time);
-} catch (final ProcessorStateException fatal) {
-throw new StreamsException(fatal);
-}
-
 final MetricConfig metricConfig = new MetricConfig()
 .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
 
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -664,7 +658,7 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 internalTopologyBuilder.rewriteTopology(config);
 
 // sanity check to fail-fast in case we cannot build a 
ProcessorTopology due to an exception
-internalTopologyBuilder.build();
+final ProcessorTopology taskTopology = internalTopologyBuilder.build();
 
 streamsMetadataState = new StreamsMetadataState(
 internalTopologyBuilder,
@@ -680,6 +674,14 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 }
 final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
 final long cacheSizePerThread = totalCacheSize / (threads.length + 
(globalTaskTopology == null ? 0 : 1));
+final boolean createStateDirectory = 
taskTopology.hasPersistentLocalStore() ||
+(globalTaskTopology != null && 
globalTaskTopology.hasPersistentGlobalStore());
+
+try {
+stateDirectory = new StateDirectory(config, time, 
createStateDirectory);
+} catch (final ProcessorStateException fatal) {
+throw new StreamsException(fatal);
+}
 
 final StateRestoreListener delegatingStateRestoreListener = new 
DelegatingStateRestoreListener();
 GlobalStreamThread.State globalThreadState = null;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 8fcbbb44302..57af1f3f340 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -152,6 +152,24 @@ boolean isRepartitionTopic(final String topic) {
 return repartitionTopics.contains(topic);
 }
 
+public boolean hasPersistentLocalStore() {
+for (final StateStore store : stateStores) {
+if (store.persistent()) {
+return true;
+}
+}
+return false;
+}
+
+public boolean hasPersistentGlobalStore() {
+for (final StateStore store : globalStateStores) {
+if (store.persistent()) {
+return true;
+}
+}
+return false;
+}
+
 private String childrenToString(final String indent, final 
List> children) {
 if (children == null || children.isEmpty()) {
 return "";
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index a3227ca0c7d..f5c4c31d083 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -26,7 +26,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
@@ -50,6 +49,7 @@
 private static final Logger lo

[jira] [Resolved] (KAFKA-7367) Streams should not create state store directories unless they are needed

2018-11-27 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7367.

   Resolution: Fixed
Fix Version/s: 2.2.0

> Streams should not create state store directories unless they are needed
> 
>
> Key: KAFKA-7367
> URL: https://issues.apache.org/jira/browse/KAFKA-7367
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: newbie
> Fix For: 2.2.0
>
>
> Streams currently unconditionally creates state store directories, even if 
> the topology is stateless.
> This can be a problem running Streams in an environment without access to 
> disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-27 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701043#comment-16701043
 ] 

John Roesler commented on KAFKA-7660:
-

Hey [~pkleindl],

I'll get you a PR to test asap.

No, we don't do patch files, but you should be able to fork the repo in github 
and send a PR from your fork. Let me know if you're not sure how to do this.

Personally, I think it's easier and better to just reference this Jira. It has 
all the rationale and our discussion for context.

Thanks!

-John

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2018-11-27 Thread Randall Hauch (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673834#comment-16673834
 ] 

Randall Hauch edited comment on KAFKA-7509 at 11/27/18 9:12 PM:


Since changing the unused property log messages to debug was not the correct 
change, I closed that PR and took yet another approach that just filters out 
worker properties before passing them to the producer, consumer, and admin 
clients. Previously, Connect would pass many of its worker configuration 
properties (including the `{{producer.}}` and `{{consumer.}}` properties for 
connectors) into the producer, consumer, and admin client used for *internal* 
topics, resulting in lots of log warnings about unused config properties.

With [this new approach|https://github.com/apache/kafka/pull/5876], Connect now 
attempts to filter out the worker’s configuration properties that are not 
applicable to the producer, consumer, or admin client used for internal topics. 
(Connect is already including only producer and consumer properties when 
creating clients for connectors, since those properties are prefixed in the 
worker config.)

For the most part, this is relatively straightforward, since there are some 
top-level worker-specific properties that can be easily removed, and most 
extension-specific properties have Connect-specific properties. Unfortunately, 
the recently added REST extension is the only type of connect extension that 
uses unprefixed properties from the worker config, so it is not possible to 
remove those from the properties passed to the producer, consumer, and admin 
clients since we can't distinguish between a REST extension property and a 
property for a client interceptor, metric reporter, etc. Hopefully, REST 
extensions are not prevalent yet and most users may not see any warnings about 
unused properties in the producer, consumer, and admin client; but if a user 
does use a REST extension with extension-specific properties, the user may also 
see WARN log messages about how they're not known to the clients.

We also want to ensure that we don't pass any top-level properties for the 
consumer or admin client to the producer used for internal topics; producer and 
admin client properties to the consumer for internal topics; and producer and 
consumer properties to the admin client. This is a bit trickier, since there 
are a lot of common configs between these different types of clients, and we 
don't want to be remove a property if it's actually used by that client. Note 
that any property that is unknown (e.g., properties for REST extension, 
interceptors, metric reporters, serdes, partitioners, etc.) must be passed to 
the producer, consumer, and admin client. All of these — except for the REST 
extension properties — should indeed be used by the producer and consumer. But, 
since the admin client only supports metric reporters, any properties for 
interceptors, serdes, partitioners and REST extension will also be logged as 
unused. This is about the best we can do at this point.

All of this filtering logic was added to the {{ConnectUtils}} class, allowing 
the logic to be encapsulated and easily unit tested. Also, all changes are 
limited to Kafka Connect, and will work with all client and Connect extensions 
(passing them to the clients if they are unknown).

This still does not go as far as I'd like. It'd be ideal if Connect could use a 
RecordingMap to know which properties were used across _all_ of the clients and 
the worker itself, and to then log _only those_ properties that were never used 
at all. This is beyond the scope of this issue, however, and can be addressed 
in a follow-on issue that will require changes to AbstractConfig, Producer, 
Consumer, and AdminClient so that these components don't record unused 
components and instead we allow Connect to record all unused properties once.


was (Author: rhauch):
Since changing the unused property log messages to debug was not the correct 
change, I closed that PR and took yet another approach that just filters out 
worker properties before passing them to the producer, consumer, and admin 
clients. Previously, Connect would pass many of its worker configuration 
properties into the producer, consumer, and admin client used for internal 
topics, resulting in lots of log warnings about unused config properties.

With [this change|https://github.com/apache/kafka/pull/5876], Connect now 
attempts to filter out the worker’s configuration properties that are not 
applicable to the producer, consumer, or admin client used for internal topics. 
(Connect is already including only producer and consumer properties when 
creating those clients for connectors, since those properties are prefixed in 
the worker config.)

For the most part, this is relatively straightforward, since there are some 
top-level worker-specific properties t

[jira] [Created] (KAFKA-7680) fetching a refilled chunk of log can cause log divergence

2018-11-27 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7680:
--

 Summary: fetching a refilled chunk of log can cause log divergence
 Key: KAFKA-7680
 URL: https://issues.apache.org/jira/browse/KAFKA-7680
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao


We use FileRecords.writeTo to send a fetch response for a follower. A log could 
be truncated and refilled in the middle of the send process (due to leader 
change). Then it's possible for the follower to append some uncommitted 
messages followed by committed messages. Those uncommitted messages may never 
be removed, causing log divergence.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7680) fetching a refilled chunk of log can cause log divergence

2018-11-27 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701084#comment-16701084
 ] 

Jun Rao commented on KAFKA-7680:


One way to fix this is the following. We already include the expected 
leaderEpoch for each partition in the fetch response. So, the follower 
shouldn't see message sets with leaderEpoch higher than the expected one. If 
the follower does see such message sets, it's the result that the log is 
refilled in the middle of a transfer and the follower should just ignore this 
response and retry.

cc [~hachikuji]

> fetching a refilled chunk of log can cause log divergence
> -
>
> Key: KAFKA-7680
> URL: https://issues.apache.org/jira/browse/KAFKA-7680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Major
>
> We use FileRecords.writeTo to send a fetch response for a follower. A log 
> could be truncated and refilled in the middle of the send process (due to 
> leader change). Then it's possible for the follower to append some 
> uncommitted messages followed by committed messages. Those uncommitted 
> messages may never be removed, causing log divergence.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7681) new metric for request thread utilization by request type

2018-11-27 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7681:
--

 Summary: new metric for request thread utilization by request type
 Key: KAFKA-7681
 URL: https://issues.apache.org/jira/browse/KAFKA-7681
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


When the request thread pool is saturated, it's often useful to know which type 
request is using the thread pool the most. It would be useful to add a metric 
that tracks the fraction of request thread pool usage by request type. This 
would be equivalent to (request rate) * (request local time ms) / 1000, but 
will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-27 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701100#comment-16701100
 ] 

John Roesler commented on KAFKA-7660:
-

ok [~pkleindl],

I think that [https://github.com/apache/kafka/pull/5953] should fix the memory 
leak. I still need to write a test for it, but I'm "pretty sure it's right"(tm)

In case you wanted to check it out and re-run your experiment.

Thanks,

-John

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701096#comment-16701096
 ] 

ASF GitHub Bot commented on KAFKA-7660:
---

vvcephei opened a new pull request #5953: KAFKA-7660: fix parentSensors memory 
leak
URL: https://github.com/apache/kafka/pull/5953
 
 
   In StreamsMetricsImpl, the parentSensors map was keeping references to 
Sensors after the sensors themselves had been removed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7682) turning on request logging for a subset of request types

2018-11-27 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7682:
--

 Summary: turning on request logging for a subset of request types
 Key: KAFKA-7682
 URL: https://issues.apache.org/jira/browse/KAFKA-7682
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


Turning on request level logging can be useful for debugging. However, the 
request logging can be quite verbose. It would be useful to turn if on for a 
subset of the request types. We already have a jmx bean to turn on/off the 
request logging dynamically. We could add a new jmx bean to control the request 
types to be logged. This requires a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7389) Upgrade spotBugs for Java 11 support

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701130#comment-16701130
 ] 

ASF GitHub Bot commented on KAFKA-7389:
---

ijuma closed pull request #5943: KAFKA-7389: Enable spotBugs with Java 11 and 
disable false positive warnings
URL: https://github.com/apache/kafka/pull/5943
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 4d514dfcf97..526f3646abb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -149,8 +149,7 @@ subprojects {
   apply plugin: 'maven'
   apply plugin: 'signing'
   apply plugin: 'checkstyle'
-  if (!JavaVersion.current().isJava11Compatible())
-apply plugin: "com.github.spotbugs"
+  apply plugin: "com.github.spotbugs"
 
   sourceCompatibility = minJavaVersion
   targetCompatibility = minJavaVersion
@@ -372,20 +371,18 @@ subprojects {
   }
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 
-  if (!JavaVersion.current().isJava11Compatible()) {
-spotbugs {
-  toolVersion = '3.1.8'
-  excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
-  ignoreFailures = false
-}
-test.dependsOn('spotbugsMain')
+  spotbugs {
+toolVersion = '3.1.8'
+excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
+ignoreFailures = false
+  }
+  test.dependsOn('spotbugsMain')
 
-tasks.withType(com.github.spotbugs.SpotBugsTask) {
-  reports {
-// Continue supporting `xmlFindBugsReport` for compatibility
-xml.enabled(project.hasProperty('xmlSpotBugsReport') || 
project.hasProperty('xmlFindBugsReport'))
-html.enabled(!project.hasProperty('xmlSpotBugsReport') && 
!project.hasProperty('xmlFindBugsReport'))
-  }
+  tasks.withType(com.github.spotbugs.SpotBugsTask) {
+reports {
+  // Continue supporting `xmlFindBugsReport` for compatibility
+  xml.enabled(project.hasProperty('xmlSpotBugsReport') || 
project.hasProperty('xmlFindBugsReport'))
+  html.enabled(!project.hasProperty('xmlSpotBugsReport') && 
!project.hasProperty('xmlFindBugsReport'))
 }
   }
 
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index d83c4c4375f..a954baf68d4 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -23,6 +23,17 @@ This file dictates which categories of bugs and individual 
false positives that
 For a detailed description of spotbugs bug categories, see 
https://spotbugs.readthedocs.io/en/latest/bugDescriptions.html
 -->
 
+
+
+
+
+
+
+
+
+
+
+
 
 
> Key: KAFKA-7389
> URL: https://issues.apache.org/jira/browse/KAFKA-7389
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.2.0
>
>
> KAFKA-5887 replaces findBugs with spotBugs adding support for Java 9 and 10. 
> However, Java 11 is not supported in spotbugs 3.1.5.
> Once this is fixed, we also need to update the build to enable spotBugs when 
> executed with Java 11 and we need to update the relevant Jenkins jobs to 
> execute spotBugs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7671) A KStream/GlobalKTable join shouldn't reset the repartition flag

2018-11-27 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701203#comment-16701203
 ] 

Andy Bryant commented on KAFKA-7671:


Hi [~mjsax],

I just checked the old versions (1.1, 1.0, 0.11, 0.10.2). Looks like the 
{{repartitionRequired}} was set to {{false}} for globalKTable joins all the way 
back to when the GlobalKTable was introduced in 0.10.2 with KAFKA-4490.

See 
[https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L596]

Cheers
Andy

> A KStream/GlobalKTable join shouldn't reset the repartition flag
> 
>
> Key: KAFKA-7671
> URL: https://issues.apache.org/jira/browse/KAFKA-7671
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Andy Bryant
>Priority: Major
>
> Currently a KStream/GlobalKTable join resets the repartition required flag to 
> false.
> I have a topology where I map a stream, join with a GlobalKTable, then 
> groupByKey then aggregate.
> The aggregate wasn't behaving correctly because it didn't force a repartition 
> as I expected. The KStream/GlobalKTable join had reset the flag and hence I 
> was getting the same keys in different partitions.
> Since a KStream/GlobalKTable join does not itself force a repartition, it 
> should simply propagate the flag down to the resultant KStream the same way 
> most other operators work.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7683) Support ConfigDef.Type.MAP

2018-11-27 Thread Paul Czajka (JIRA)
Paul Czajka created KAFKA-7683:
--

 Summary: Support ConfigDef.Type.MAP
 Key: KAFKA-7683
 URL: https://issues.apache.org/jira/browse/KAFKA-7683
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Paul Czajka


Support ConfigDef.Type.MAP which will parse a string value (e.g. "a=1;b=2;c=3") 
into a HashMap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7683) Support ConfigDef.Type.MAP

2018-11-27 Thread Paul Czajka (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Czajka updated KAFKA-7683:
---
Issue Type: Improvement  (was: Bug)

> Support ConfigDef.Type.MAP
> --
>
> Key: KAFKA-7683
> URL: https://issues.apache.org/jira/browse/KAFKA-7683
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Paul Czajka
>Priority: Minor
>
> Support ConfigDef.Type.MAP which will parse a string value (e.g. 
> "a=1;b=2;c=3") into a HashMap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7683) Support ConfigDef.Type.MAP

2018-11-27 Thread Ryanne Dolan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701291#comment-16701291
 ] 

Ryanne Dolan commented on KAFKA-7683:
-

You can use originalsWithPrefix() to get back a map from properties like:

 
{code:java}
foo.a=1
foo.b=2
foo.c=3
{code}
which IMO is much nicer than foo=a=1;b=2;c=3

> Support ConfigDef.Type.MAP
> --
>
> Key: KAFKA-7683
> URL: https://issues.apache.org/jira/browse/KAFKA-7683
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Paul Czajka
>Priority: Minor
>
> Support ConfigDef.Type.MAP which will parse a string value (e.g. 
> "a=1;b=2;c=3") into a HashMap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.

2018-11-27 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701345#comment-16701345
 ] 

Guozhang Wang commented on KAFKA-7653:
--

I think [~mark.tranter] is trying to provide a work-around if we cannot auto 
configure the same serde class' instance with key turned on / off. If that is 
possible, then I feel providing two instances of a single class of serde whose 
which will be configured differently for key and value would be ideal; if this 
cannot be achieved, then we need to consider using two classes, and passing in 
the instance of each class respectively for key and value.

Note that https://issues.apache.org/jira/browse/KAFKA-3729 was filed before we 
have KIP-182; now with key / value serdes wrapped in Produced / Consumed / 
Grouped etc, we know exactly which serde is used for key / value, and hence 
should be able to auto-configure them accordingly inside the constructor of 
those control objects. Then with that this ticket should be easy to resolve, 
does that make sense?

> Streams-Scala: Add type level differentiation for Key and Value serdes.
> ---
>
> Key: KAFKA-7653
> URL: https://issues.apache.org/jira/browse/KAFKA-7653
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mark Tranter
>Assignee: Mark Tranter
>Priority: Minor
>  Labels: scala
>
> Implicit resolution/conversion of Serdes/Consumed etc is a big improvement 
> for the Scala Streams API. However in cases where a user needs to 
> differentiate between Key and Value serializer functionality (i.e. using the 
> Schema Registry), implicit resolution doesn't help and could cause issues. 
> e.g.
> {code:java}
> case class MouseClickEvent(pageId: Long, userId: String)
> builder
>   // Long serde taken from implicit scope configured with
>   // `isKey` = true
>   .stream[Long, MouseClickEvent]("mouse-clicks")
>   .selectKey((_,v) => v.userId)
>   .groupByKey
>   .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => 
> count + 1)
>   .toStream
>   // Same Long serde taken from implicit scope configured with
>   // `isKey` = true, even thought the `Long` value in this case
>   // will be the Value
>   .to("mouse-clicks-by-user")
> {code}
> It would be ideal if Key and Value Serde/SerdeWrapper types/type classes 
> could be introduced to overcome this limitation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-27 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7678:

Component/s: streams

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Priority: Major
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type

2018-11-27 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701387#comment-16701387
 ] 

Mayuresh Gharat commented on KAFKA-7681:


Hi [~junrao], 
This seems more like the Broker Topic Metrics that we have today but at the 
RequestHandler level and not for a specific topic. Is my understanding correct?

> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7671) A KStream/GlobalKTable join shouldn't reset the repartition flag

2018-11-27 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701398#comment-16701398
 ] 

Matthias J. Sax commented on KAFKA-7671:


Thanks for looking into this [~kiwiandy]

> A KStream/GlobalKTable join shouldn't reset the repartition flag
> 
>
> Key: KAFKA-7671
> URL: https://issues.apache.org/jira/browse/KAFKA-7671
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Andy Bryant
>Priority: Major
>
> Currently a KStream/GlobalKTable join resets the repartition required flag to 
> false.
> I have a topology where I map a stream, join with a GlobalKTable, then 
> groupByKey then aggregate.
> The aggregate wasn't behaving correctly because it didn't force a repartition 
> as I expected. The KStream/GlobalKTable join had reset the flag and hence I 
> was getting the same keys in different partitions.
> Since a KStream/GlobalKTable join does not itself force a repartition, it 
> should simply propagate the flag down to the resultant KStream the same way 
> most other operators work.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-27 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701400#comment-16701400
 ] 

Matthias J. Sax commented on KAFKA-7678:


Are you saying that the shutdown still finished successfully and gracefully and 
the only issue if, that we get the NPE logged? If yes, I would consider this a 
minor bug and agree that a `null` check sounds like an appropriate fix.

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Priority: Major
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7671) A KStream/GlobalKTable join shouldn't reset the repartition flag

2018-11-27 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7671:
---
Fix Version/s: 2.1.1
   2.2.0

> A KStream/GlobalKTable join shouldn't reset the repartition flag
> 
>
> Key: KAFKA-7671
> URL: https://issues.apache.org/jira/browse/KAFKA-7671
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Andy Bryant
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> Currently a KStream/GlobalKTable join resets the repartition required flag to 
> false.
> I have a topology where I map a stream, join with a GlobalKTable, then 
> groupByKey then aggregate.
> The aggregate wasn't behaving correctly because it didn't force a repartition 
> as I expected. The KStream/GlobalKTable join had reset the flag and hence I 
> was getting the same keys in different partitions.
> Since a KStream/GlobalKTable join does not itself force a repartition, it 
> should simply propagate the flag down to the resultant KStream the same way 
> most other operators work.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss

2018-11-27 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7672:
---
Fix Version/s: (was: 2.1.0)

> The local state not fully restored after KafkaStream rebalanced, resulting in 
> data loss
> ---
>
> Key: KAFKA-7672
> URL: https://issues.apache.org/jira/browse/KAFKA-7672
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: linyue li
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> Normally, when a task is mitigated to a new thread and no checkpoint file was 
> found under its task folder, Kafka Stream needs to restore the local state 
> for remote changelog topic completely and then resume running. However, in 
> some scenarios, we found that Kafka Stream *NOT* restore this state even no 
> checkpoint was found, but just clean the state folder and transition to 
> running state directly, resulting the historic data loss. 
> To be specific, I will give the detailed logs for Kafka Stream in our project 
> to show this scenario: 
> {quote}2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Revoking previously assigned partitions [AuditTrailBatch-0-5]
> 2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to 
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
> 2018-10-23 08:27:53,622 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
> stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task 
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
> *Reinitializing the task and restore its state from the beginning.*
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher  - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting 
> offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to 
> RUNNING*
> {quote}
> From the logs above, we can get the procedure for thread 
> AuditTrailBatch-StreamThread-1:
>  # the previous running task assigned to thread 1 is task 0_5 (the 
> corresponding partition is AuditTrailBatch-0-5)
>  # group begins to rebalance, the new task 1_1 is assigned to thread 1.
>  # no checkpoint was found under 1_1 state folder, so reset the offset to 0 
> and clean the local state folder.
>  # thread 1 transitions to RUNNING state directly without the restoration for 
> task 1_1, so the historic data for state 1_1 is lost for thread 1. 
> *ThoubleShoot*
> To investigate the cause for this issue, we analysis the source code in 
> KafkaStream and found the key is the variable named "completedRestorers".
> This is the definition of the variable:
> {code:java}
> private final Set completedRestorers = new HashSet<>();{code}
> Each thread object has its own completedRestorers, which is created in the 
> thread initialization, and not accessed crossly by other threads. The 
> completedRestorers is used to record the partitions that has bee

[jira] [Commented] (KAFKA-7671) A KStream/GlobalKTable join shouldn't reset the repartition flag

2018-11-27 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701416#comment-16701416
 ] 

Matthias J. Sax commented on KAFKA-7671:


I marked fixed version as 2.2.0 and 2.1.1 for now, to make sure we fix it for 
those release. We should try to back port the fix to older versions, too, 
though.

> A KStream/GlobalKTable join shouldn't reset the repartition flag
> 
>
> Key: KAFKA-7671
> URL: https://issues.apache.org/jira/browse/KAFKA-7671
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Andy Bryant
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> Currently a KStream/GlobalKTable join resets the repartition required flag to 
> false.
> I have a topology where I map a stream, join with a GlobalKTable, then 
> groupByKey then aggregate.
> The aggregate wasn't behaving correctly because it didn't force a repartition 
> as I expected. The KStream/GlobalKTable join had reset the flag and hence I 
> was getting the same keys in different partitions.
> Since a KStream/GlobalKTable join does not itself force a repartition, it 
> should simply propagate the flag down to the resultant KStream the same way 
> most other operators work.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss

2018-11-27 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7672:
---
Fix Version/s: 2.1.1
   2.2.0

> The local state not fully restored after KafkaStream rebalanced, resulting in 
> data loss
> ---
>
> Key: KAFKA-7672
> URL: https://issues.apache.org/jira/browse/KAFKA-7672
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: linyue li
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> Normally, when a task is mitigated to a new thread and no checkpoint file was 
> found under its task folder, Kafka Stream needs to restore the local state 
> for remote changelog topic completely and then resume running. However, in 
> some scenarios, we found that Kafka Stream *NOT* restore this state even no 
> checkpoint was found, but just clean the state folder and transition to 
> running state directly, resulting the historic data loss. 
> To be specific, I will give the detailed logs for Kafka Stream in our project 
> to show this scenario: 
> {quote}2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Revoking previously assigned partitions [AuditTrailBatch-0-5]
> 2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to 
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
> 2018-10-23 08:27:53,622 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
> stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task 
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
> *Reinitializing the task and restore its state from the beginning.*
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher  - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting 
> offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to 
> RUNNING*
> {quote}
> From the logs above, we can get the procedure for thread 
> AuditTrailBatch-StreamThread-1:
>  # the previous running task assigned to thread 1 is task 0_5 (the 
> corresponding partition is AuditTrailBatch-0-5)
>  # group begins to rebalance, the new task 1_1 is assigned to thread 1.
>  # no checkpoint was found under 1_1 state folder, so reset the offset to 0 
> and clean the local state folder.
>  # thread 1 transitions to RUNNING state directly without the restoration for 
> task 1_1, so the historic data for state 1_1 is lost for thread 1. 
> *ThoubleShoot*
> To investigate the cause for this issue, we analysis the source code in 
> KafkaStream and found the key is the variable named "completedRestorers".
> This is the definition of the variable:
> {code:java}
> private final Set completedRestorers = new HashSet<>();{code}
> Each thread object has its own completedRestorers, which is created in the 
> thread initialization, and not accessed crossly by other threads. The 
> completedRestorers is used to record the partition

[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2018-11-27 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701422#comment-16701422
 ] 

Matthias J. Sax commented on KAFKA-3184:


Just a side comment: RocksDB allows to "pin/freeze" a current state to protect 
it from compaction. This is a light weight metadata operation. After a version 
is pinned (not sure atm what the correct RocksDB term for this operation is), 
the immutable SST files can be copied in the background to complete a 
checkpoint.

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7620) ConfigProvider is broken for KafkaConnect when TTL is not null

2018-11-27 Thread Ewen Cheslack-Postava (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-7620.
--
   Resolution: Fixed
Fix Version/s: 2.0.2
   2.1.1
   2.2.0

Issue resolved by pull request 5914
[https://github.com/apache/kafka/pull/5914]

> ConfigProvider is broken for KafkaConnect when TTL is not null
> --
>
> Key: KAFKA-7620
> URL: https://issues.apache.org/jira/browse/KAFKA-7620
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Ye Ji
>Assignee: Robert Yokota
>Priority: Major
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> If the ConfigData returned by ConfigProvider.get implementations has non-null 
> and non-negative ttl, it will trigger infinite recursion, here is an excerpt 
> of the stack trace:
> {code:java}
> at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
>   at 
> org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
>   at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
> {code}
> Basically, 
> 1) if a non-null ttl is returned from the config provider, connect runtime 
> will try to schedule a reload in the future, 
> 2) scheduleReload function reads the config again to see if it is a restart 
> or not, by calling 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to 
> transform the config
> 3) the transform function calls config provider, and gets a non-null ttl, 
> causing scheduleReload being called, we are back to step 1.
> To reproduce, simply fork the provided 
> [FileConfigProvider|https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java],
>  and add a non-negative ttl to the ConfigData returned by the get functions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5581) Avoid creating changelog topics for state stores that are materialized from a source topic

2018-11-27 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701424#comment-16701424
 ] 

Matthias J. Sax commented on KAFKA-5581:


[~guozhang] Wasn't the optimization for (1) there since 0.10.0 release? We 
introduced a regression at some point, but this is fixed already, too.

However, for (1) we still might want to revisit the design. From user 
feedback/question, I get the impression that there are people who read 
retention based topics as tables. For this case, we cannot safely perform this 
optimization. We might need to improve our docs, and or reconsider default 
behavior?

For (2), we would also need to make sure that the input topic retention time is 
large enough to obey the `JoinWindow` configure `until()` retention time.

> Avoid creating changelog topics for state stores that are materialized from a 
> source topic
> --
>
> Key: KAFKA-5581
> URL: https://issues.apache.org/jira/browse/KAFKA-5581
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>Priority: Major
>  Labels: architecture, performance
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. A few examples:
> There are a few places where the materialized store do not need a separate 
> changelog topic. This issue summarize a specific issue:
> 1) If a KTable is read directly from a source topic, and is materialized i.e. 
> {code}
> table1 = builder.table("topic1", "store1")`.
> {code}
> In this case {{table1}}'s changelog topic can just be {{topic1}}, and we do 
> not need to create a separate {{table1-changelog}} topic.
> 2) if a KStream is materialized for joins where the streams are directly from 
> a topic, e.g.:
> {code}
> stream1 = builder.stream("topic1");
> stream2 = builder.stream("topic2");
> stream3 = stream1.join(stream2, windows);  // stream1 and stream2 are 
> materialized with a changelog topic
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6036) Enable logical materialization to physical materialization

2018-11-27 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6036:
--

Assignee: Guozhang Wang

> Enable logical materialization to physical materialization
> --
>
> Key: KAFKA-6036
> URL: https://issues.apache.org/jira/browse/KAFKA-6036
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Today whenever users specify a queryable store name for KTable, we would 
> always add a physical state store in the translated processor topology.
> For some scenarios, we should consider not physically materialize the KTable 
> but only "logically" materialize it when you have some simple transformation 
> operations or even join operations that generated new KTables, and which 
> needs to be materialized with a state store, you can use the changelog topic 
> of the previous KTable and applies the transformation logic upon restoration 
> instead of creating a new changelog topic. For example:
> {code}
> table1 = builder.table("topic1");
> table2 = table1.filter(..).join(table3); // table2 needs to be materialized 
> for joining
> {code}
> We can actually set the {{getter}} function of table2's materialized store, 
> say {{state2}} to be reading from {{topic1}} and then apply the filter 
> operator, instead of creating a new {{state2-changelog}} topic in this case.
> We can come up with a general internal impl optimizations to determine when 
> to logically materialize, and whether we should actually allow users of DSL 
> to "hint" whether to materialize or not (it then may need a KIP).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3705) Support non-key joining in KTable

2018-11-27 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3705:
---
Issue Type: New Feature  (was: Bug)

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: api
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7620) ConfigProvider is broken for KafkaConnect when TTL is not null

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701426#comment-16701426
 ] 

ASF GitHub Bot commented on KAFKA-7620:
---

ewencp closed pull request #5914: KAFKA-7620:  Fix restart logic for TTLs in 
WorkerConfigTransformer
URL: https://github.com/apache/kafka/pull/5914
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index efcc01d2c45..8889aadbae1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -35,6 +35,7 @@
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import static 
org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
@@ -105,8 +106,8 @@
 "indicates that a configuration value will expire in the future.";
 
 private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action";
-public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.toString();
-public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.toString();
+public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT);
+public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT);
 
 public static final String ERRORS_RETRY_TIMEOUT_CONFIG = 
"errors.retry.timeout";
 public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout 
for Errors";
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5c7cc1429aa..c572e20b52f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -148,12 +148,6 @@
  */
 void restartTask(ConnectorTaskId id, Callback cb);
 
-/**
- * Get the configuration reload action.
- * @param connName name of the connector
- */
-ConfigReloadAction connectorConfigReloadAction(final String connName);
-
 /**
  * Restart the connector.
  * @param connName name of the connector
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1b715c70c76..3373d5ce328 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,6 +34,8 @@
  * retrieved TTL values.
  */
 public class WorkerConfigTransformer {
+private static final Logger log = 
LoggerFactory.getLogger(WorkerConfigTransformer.class);
+
 private final Worker worker;
 private final ConfigTransformer configTransformer;
 private final ConcurrentMap> requests = 
new ConcurrentHashMap<>();
@@ -46,7 +53,16 @@ public WorkerConfigTransformer(Worker worker, Map config
 if (configs == null) return null;
 ConfigTransformerResult result = configTransformer.transform(configs);
 if (connectorName != null) {
-scheduleReload(connectorName, result.ttls());
+String key = ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG;
+String action = (String) ConfigDef.parseType(key, 
configs.get(key), ConfigDef.Type.STRING);
+if (action == null) {
+// The default action is "restart".
+action = ConnectorConfig.CONFIG_RELOAD_ACTION_RESTART;
+}
+ConfigReloadAction reloadAction = 
ConfigRel

[jira] [Created] (KAFKA-7684) kafka consumer SchemaException occurred: Error reading field 'brokers':

2018-11-27 Thread leibo (JIRA)
leibo created KAFKA-7684:


 Summary: kafka consumer SchemaException occurred: Error reading 
field 'brokers': 
 Key: KAFKA-7684
 URL: https://issues.apache.org/jira/browse/KAFKA-7684
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.11.0.1
Reporter: leibo


kafka consumer occurred org.apache.kafka.common.protocol.types.SchemaException: 
Error reading field 'brokers': Error reading array of size 65535, only 22 bytes 
available when the broker is not work due to a long time full GC.

Exception Detail:

org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'brokers': Error reading array of size 65535, only 22 bytes available
 at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)
 at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:153)
 at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:560)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:217)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:203)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
 at 
com.zte.ums.zenap.cometd.ReceiveKafkaMessage.run(ReceiveKafkaMessage.java:40)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7684) kafka consumer SchemaException occurred: Error reading field 'brokers':

2018-11-27 Thread leibo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leibo updated KAFKA-7684:
-
Description: 
kafka consumer occurred org.apache.kafka.common.protocol.types.SchemaException: 
Error reading field 'brokers': Error reading array of size 65535, only 22 bytes 
available when the broker is not work due to a l*ong time full GC.*

*Exception Detail:*

org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'brokers': Error reading array of size 65535, only 22 bytes available
 at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)
 at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:153)
 at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:560)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:217)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:203)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
 at 
com.zte.ums.zenap.cometd.ReceiveKafkaMessage.run(ReceiveKafkaMessage.java:40)

*After this , the following exception occurred:*

2018-11-21 19:43:52 097 ERROR 
[org.apache.kafka.clients.consumer.KafkaConsumer][Thread-22] - Failed to close 
coordinator
java.lang.IllegalStateException: Correlation id for response (3097236) does not 
match request (3097235), request header: 
\{api_key=8,api_version=3,correlation_id=3097235,client_id=pm-task_100.100.0.43_8}
 at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
 at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:284)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:706)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:503)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1605)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1573)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1549)
 at 
com.zte.ums.zenap.pm.common.kafka.BasicKafkaConsumerWorker$1.run(BasicKafkaConsumerWorker.java:77)
 at java.lang.Thread.run(Thread.java:811)

18-11-21 19:43:52 099 ERROR 
[com.zte.ums.zenap.dw.ext.FrameWorkThreadGroup][Thread-24] - Thread-24 throw 
this exception 
org.apache.kafka.common.KafkaException: Failed to close kafka consumer
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1623)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1573)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1549)
 at 
com.zte.ums.zenap.pm.common.kafka.BasicKafkaConsumerWorker$1.run(BasicKafkaConsumerWorker.java:77)
 at java.lang.Thread.run(Thread.java:811)

 

Kafka broker version: *0.11.0.1*

Kafka consumer version: *0.11.0.1*

*More Info:*

In the same time , the kafka broker have a long time(about 16 seconds) Full GC 
, and lots of consumer rebalance and zookeeper session timeout occurred. 

 

  was:
kafka consumer occurred org.apache.kafka.common.protocol.types.SchemaException: 
Error reading field 'brokers': Error reading array of size 65535, only 22 bytes 
available when the broker is not work due to a long time full GC.

Exception Detail:

org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'brokers': Error reading array of size 65535, only 22 bytes available
 at org.apache.kafka.commo