[jira] [Commented] (KAFKA-7315) Streams serialization docs contain a broken link for Avro

2018-09-12 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613037#comment-16613037
 ] 

Matthias J. Sax commented on KAFKA-7315:


Feel free to do a PR [~cricket007] :)

> Streams serialization docs contain a broken link for Avro
> -
>
> Key: KAFKA-7315
> URL: https://issues.apache.org/jira/browse/KAFKA-7315
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: docuentation, newbie
>
> https://kafka.apache.org/documentation/streams/developer-guide/datatypes.html#avro



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7315) Streams serialization docs contain a broken link for Avro

2018-09-12 Thread Jordan Moore (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613030#comment-16613030
 ] 

Jordan Moore commented on KAFKA-7315:
-

So, I've been noticing the reorganization of the Confluent documentation, and 
honestly, I feel like a lot more effort is going into that rather than the 
Kafka documentation :( 

For example, everything that I would want to add to the Kafka docs is summed up 
in the top section of https://docs.confluent.io/current/avro.html

If we ignore all the Confluent references and Avro details below, those two 
bullets hit the nail on the head for me, plus the references for 
"cross-language serialization libraries". 

All I could really suggest to be added to this is some DIY reference to a 
custom serializer / deserializer for some obscure object type like the 
[PriorityQueueSerializer|https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/main/java/io/confluent/examples/streams/utils/PriorityQueueSerializer.java]

> Streams serialization docs contain a broken link for Avro
> -
>
> Key: KAFKA-7315
> URL: https://issues.apache.org/jira/browse/KAFKA-7315
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: docuentation, newbie
>
> https://kafka.apache.org/documentation/streams/developer-guide/datatypes.html#avro



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6926) Reduce NPath exceptions in Connect

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612993#comment-16612993
 ] 

ASF GitHub Bot commented on KAFKA-6926:
---

ijuma closed pull request #5051:  KAFKA-6926: Simplified some logic to 
eliminate some suppressions of NPath complexity checks
URL: https://github.com/apache/kafka/pull/5051
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ba48c38cb28..cba38f2b172 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -114,7 +114,7 @@
   files="Values.java"/>
 
 
+  
files="(DistributedHerder|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask).java"/>
 
 
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
index 2ad8a046d76..2c5f514836d 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
@@ -155,24 +155,14 @@ public boolean equals(Object o) {
 
 ConnectRecord that = (ConnectRecord) o;
 
-if (kafkaPartition != null ? 
!kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null)
-return false;
-if (topic != null ? !topic.equals(that.topic) : that.topic != null)
-return false;
-if (keySchema != null ? !keySchema.equals(that.keySchema) : 
that.keySchema != null)
-return false;
-if (key != null ? !key.equals(that.key) : that.key != null)
-return false;
-if (valueSchema != null ? !valueSchema.equals(that.valueSchema) : 
that.valueSchema != null)
-return false;
-if (value != null ? !value.equals(that.value) : that.value != null)
-return false;
-if (timestamp != null ? !timestamp.equals(that.timestamp) : 
that.timestamp != null)
-return false;
-if (!Objects.equals(headers, that.headers))
-return false;
-
-return true;
+return Objects.equals(kafkaPartition, that.kafkaPartition)
+   && Objects.equals(topic, that.topic)
+   && Objects.equals(keySchema, that.keySchema)
+   && Objects.equals(key, that.key)
+   && Objects.equals(valueSchema, that.valueSchema)
+   && Objects.equals(value, that.value)
+   && Objects.equals(timestamp, that.timestamp)
+   && Objects.equals(headers, that.headers);
 }
 
 @Override
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
index ff8271635f3..a42a33e8af7 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
@@ -218,14 +218,10 @@ public static void validateValue(String name, Schema 
schema, Object value) {
 if (!schema.isOptional())
 throw new DataException("Invalid value: null used for required 
field: \"" + name
 + "\", schema type: " + schema.type());
-else
-return;
+return;
 }
 
-List expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
-
-if (expectedClasses == null)
-expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+List expectedClasses = expectedClassesFor(schema);
 
 if (expectedClasses == null)
 throw new DataException("Invalid Java object for schema type " + 
schema.type()
@@ -266,6 +262,13 @@ public static void validateValue(String name, Schema 
schema, Object value) {
 }
 }
 
+private static List expectedClassesFor(Schema schema) {
+List expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
+if (expectedClasses == null)
+expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+return expectedClasses;
+}
+
 /**
  * Validate that the value can be used for this schema, i.e. that its type 
matches the schema type and optional
  * requirements. Throws a DataException if the value is invalid.
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
index 00a050a310f..dd1b94a854e 100644
--- 

[jira] [Commented] (KAFKA-7397) Ability to apply DSL stateless transformation on a global table

2018-09-12 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612785#comment-16612785
 ] 

John Roesler commented on KAFKA-7397:
-

Hi [~frederic.tardif],

Thanks for the feature request, and also for the workaround code.

Just to put this out there, are you interested in writing a KIP and 
implementing the feature? If so, I can help guide you through the process.

Thanks,

-John

> Ability to apply DSL stateless transformation on a global table
> ---
>
> Key: KAFKA-7397
> URL: https://issues.apache.org/jira/browse/KAFKA-7397
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Frederic Tardif
>Priority: Major
>  Labels: needs-kip
> Attachments: kafka.zip
>
>
> When consuming a globalKTable (with the expectation of caching all the data 
> of a topic in a consumer store), we can't apply any stateless transformation 
> (filter, map), prior to materializing. To achieve this, while ensuring to 
> consume the records of all the partitions, we must first run a stream app 
> that does preprocessing on the ingress topic into an exact K1,V1 egress topic 
> as we want to store in our GlobalKTable. This looks unnecessarily complex, 
> and causes to double the storage of the topic, while the only goal is to 
> adapt statelessly the data prior to storing (rockDB) at the receiving end.
> See discussion on 
> :[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic]
> As a workaround, I have used `new Builder().addGlobalStore()` with a 
> Custom Processor able to filter and map prior to store (see attached). 
> Although this seem to work, I believe this functionality should be part of 
> the basic dsl api when working with a globalTable (`new 
> StreamsBuilder().globalTable().filter(...).map()... `).
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7405) Support for graceful handling of corrupted records in Kafka consumer

2018-09-12 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612713#comment-16612713
 ] 

Matthias J. Sax commented on KAFKA-7405:


[~hachikuji] [~cmccabe] Any thoughts about this?

> Support for graceful handling of corrupted records in Kafka consumer
> 
>
> Key: KAFKA-7405
> URL: https://issues.apache.org/jira/browse/KAFKA-7405
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Affects Versions: 0.10.2.2, 0.11.0.3, 1.1.1
>Reporter: Eugen Feller
>Priority: Major
>
> We have run into issues several times where corrupted records cause the Kafka 
> consumer to throw an error code 2 exception (CRC checksum failure) in the 
> fetch layer. Specifically, when using Kafka streams we run into KAFKA-6977 
> that throws an IllegalStateException and crashes the service. It would be 
> great if the Kafka consumer could be extended with a setting similar to 
> [KIP-161|https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers],
>  that would allow one to gracefully ignore corrupted records.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7214) Mystic FATAL error

2018-09-12 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612689#comment-16612689
 ] 

John Roesler commented on KAFKA-7214:
-

There should be a log message preceeding this at "ERROR" level that explains 
the unrecoverable condition and actually logs the exception, and then says 
"Shutting down". This happens before the transition to "PENDING_SHUTDOWN".

Is this not the case?

If not, can you go ahead and upload the whole log (supposing you don't have 
sensitive information in it)?

> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> Very often at startup of the streaming application I got exception:
> {code}
> Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=my_instance_medium_topic, partition=1, offset=198900203; 
> [org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:212),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:347),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:420),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:339),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:648),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)]
>  in thread 
> my_application-my_instance-my_instance_medium-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62
> {code}
> and then (without shutdown request from my side):
> {code}
> 2018-07-30 07:45:02 [ar313] [INFO ] StreamThread:912 - stream-thread 
> [my_application-my_instance-my_instance-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62]
>  State transition from PENDING_SHUTDOWN to DEAD.
> {code}
> What is this?
> How to correctly handle it?
> Thanks in advance for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7406) Naming Join and Grouping Operations

2018-09-12 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-7406:
--

 Summary: Naming Join and Grouping Operations
 Key: KAFKA-7406
 URL: https://issues.apache.org/jira/browse/KAFKA-7406
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bill Bejeck
Assignee: Bill Bejeck
 Fix For: 2.1.0


To help make Streams compatible with topology changes, we will need to give 
users the ability to name some operators so after adjusting the topology a 
rolling upgrade is possible.  

This Jira is the first in this effort to allow for giving operators 
deterministic names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7405) Support for graceful handling of corrupted records in Kafka consumer

2018-09-12 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612623#comment-16612623
 ] 

Eugen Feller commented on KAFKA-7405:
-

[~guozhang] [~mjsax]

> Support for graceful handling of corrupted records in Kafka consumer
> 
>
> Key: KAFKA-7405
> URL: https://issues.apache.org/jira/browse/KAFKA-7405
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Affects Versions: 0.10.2.2, 0.11.0.3, 1.1.1
>Reporter: Eugen Feller
>Priority: Major
>
> We have run into issues several times where corrupted records cause the Kafka 
> consumer to throw an error code 2 exception (CRC checksum failure) in the 
> fetch layer. Specifically, when using Kafka streams we run into KAFKA-6977 
> that throws an IllegalStateException and crashes the service. It would be 
> great if the Kafka consumer could be extended with a setting similar to 
> [KIP-161|https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers],
>  that would allow one to gracefully ignore corrupted records.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7405) Support for graceful handling of corrupted records in Kafka consumer

2018-09-12 Thread Eugen Feller (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugen Feller updated KAFKA-7405:

Description: 
We have run into issues several times where corrupted records cause the Kafka 
consumer to throw an error code 2 exception (CRC checksum failure) in the fetch 
layer. Specifically, when using Kafka streams we run into KAFKA-6977 that 
throws an IllegalStateException and crashes the service. It would be great if 
the Kafka consumer could be extended with a setting similar to 
[KIP-161|https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers],
 that would allow one to gracefully ignore corrupted records.

 

  was:
We have run into issues several times where a corrupted records cause the Kafka 
consumer to throw an error code 2 exception in the fetch layer that can not be 
handled gracefully. Specifically, when using Kafka streams we run into 
KAFKA-6977 that throws an IllegalStateException. It would be great if the Kafka 
consumer could be extended with a setting similar to 
[KIP-161|https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers],
 that would allow one to cleanly ignore corrupted records.

 


> Support for graceful handling of corrupted records in Kafka consumer
> 
>
> Key: KAFKA-7405
> URL: https://issues.apache.org/jira/browse/KAFKA-7405
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Affects Versions: 0.10.2.2, 0.11.0.3, 1.1.1
>Reporter: Eugen Feller
>Priority: Major
>
> We have run into issues several times where corrupted records cause the Kafka 
> consumer to throw an error code 2 exception (CRC checksum failure) in the 
> fetch layer. Specifically, when using Kafka streams we run into KAFKA-6977 
> that throws an IllegalStateException and crashes the service. It would be 
> great if the Kafka consumer could be extended with a setting similar to 
> [KIP-161|https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers],
>  that would allow one to gracefully ignore corrupted records.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7405) Support for graceful handling of corrupted records in Kafka consumer

2018-09-12 Thread Eugen Feller (JIRA)
Eugen Feller created KAFKA-7405:
---

 Summary: Support for graceful handling of corrupted records in 
Kafka consumer
 Key: KAFKA-7405
 URL: https://issues.apache.org/jira/browse/KAFKA-7405
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, streams
Affects Versions: 1.1.1, 0.11.0.3, 0.10.2.2
Reporter: Eugen Feller


We have run into issues several times where a corrupted records cause the Kafka 
consumer to throw an error code 2 exception in the fetch layer that can not be 
handled gracefully. Specifically, when using Kafka streams we run into 
KAFKA-6977 that throws an IllegalStateException. It would be great if the Kafka 
consumer could be extended with a setting similar to 
[KIP-161|https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers],
 that would allow one to cleanly ignore corrupted records.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6699) When one of two Kafka nodes are dead, streaming API cannot handle messaging

2018-09-12 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6699.

Resolution: Not A Bug

This ticket does not seem to be a bug but a configuration question. I am 
closing this for now. If you have further questions, please consult the user 
mailing list (https://kafka.apache.org/contact).

> When one of two Kafka nodes are dead, streaming API cannot handle messaging
> ---
>
> Key: KAFKA-6699
> URL: https://issues.apache.org/jira/browse/KAFKA-6699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> Dears,
> I am observing quite often, when Kafka Broker is partly dead(*), then 
> application, which uses streaming API are doing nothing.
> (*) Partly dead in my case it means that one of two Kafka nodes are out of 
> order. 
> Especially when disk is full on one machine, then Broker is going in some 
> strange state, where streaming API goes vacations. It seems like regular 
> producer/consumer API has no problem in such a case.
> Can you have a look on that matter?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-12 Thread Jon Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612461#comment-16612461
 ] 

Jon Lee commented on KAFKA-7403:


[~hachikuji] Thanks for the suggestion. Updated the title and also affected 
version.

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[scala-library-2.11.12.jar:?]
> at 

[jira] [Updated] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-12 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee updated KAFKA-7403:
---
Summary: Offset commit failure after upgrading brokers past 
KIP-211/KAFKA-4682  (was: Offset commit failure after broker upgrade to 2.0)

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[scala-library-2.11.12.jar:?]
> at 

[jira] [Updated] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-12 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee updated KAFKA-7403:
---
Affects Version/s: 2.1.0

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Jon Lee
>Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[scala-library-2.11.12.jar:?]
> at 
> 

[jira] [Commented] (KAFKA-6699) When one of two Kafka nodes are dead, streaming API cannot handle messaging

2018-09-12 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612441#comment-16612441
 ] 

Matthias J. Sax commented on KAFKA-6699:


If you want to insure consistency, and you want to be able to write if one 
broker goes down, you need to have 3 broker. If you only have 2 brokers and one 
goes down, you either loose the ability to write, or you trade of correctness 
guarantees and might loose data. This is independent of Zookeeper, because 
those guarantees depend on the data replication factor you get and data is 
store on the brokers.

Let's phrase it differently: to guarantee consistent writes and not to loose 
any data, you need to insure that your write goes to at least 2 brokers before 
you ack the write to the producer. Thus, if you only have 2 brokers and one 
goes down, you cannot get 2 replicates of your data any longer and hence, you 
either sacrifice replication (resulting in potential data loss) or you just 
disallow writing until the second broker comes back.

> When one of two Kafka nodes are dead, streaming API cannot handle messaging
> ---
>
> Key: KAFKA-6699
> URL: https://issues.apache.org/jira/browse/KAFKA-6699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> Dears,
> I am observing quite often, when Kafka Broker is partly dead(*), then 
> application, which uses streaming API are doing nothing.
> (*) Partly dead in my case it means that one of two Kafka nodes are out of 
> order. 
> Especially when disk is full on one machine, then Broker is going in some 
> strange state, where streaming API goes vacations. It seems like regular 
> producer/consumer API has no problem in such a case.
> Can you have a look on that matter?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612028#comment-16612028
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 9/12/18 4:25 PM:


Thanks for your time [~stephane.maa...@gmail.com].
  
 I am adding some logs to the method and reproducing again.


was (Author: kobi hikri):
Thanks for your time [~stephane.maa...@gmail.com].
  
I am adding some logs to the method and reproducing again.

public static void atomicMoveWithFallback(Path source, Path target) throws 
IOException {
 try {
 log.debug("Attempting atomic move of {} to {}", source, target);
 Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
 log.debug("Atomic move of {} to {} succeeded", source, target);
 } catch (IOException outer) {
 try {
 log.debug("Attempting Non-atomic move of {} to {} succeeded after atomic move 
failed due to {}", source, target,
 outer.getMessage());
 Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
 log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due 
to {}", source, target,
 outer.getMessage());
 } catch (IOException inner) {
 log.error("Non-atomic move of {} to {} failed due to {}", source, target, 
inner.getMessage());
 inner.addSuppressed(outer);
 throw inner;
 }
 }
 }

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> 

[jira] [Comment Edited] (KAFKA-7214) Mystic FATAL error

2018-09-12 Thread Seweryn Habdank-Wojewodzki (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612374#comment-16612374
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-7214 at 9/12/18 3:45 PM:


[~vvcephei]
Back to the roots. What shall I say to Maintenance and Operations staff, when 
they need to handle the case below?

{code}
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [FATAL] SingleTopicstreamer:102 - Caught unhandled 
exception: Exception caught in process. taskId=0_2, 
processor=KSTREAM-SOURCE-00, topic=ar313_medium_topic, partition=2, 
offset=1892533025; 
[org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:240),
 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94),
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411),
 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:922),
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802),
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749),
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)]
 in thread 
streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [INFO ] KafkaStreams:261 - stream-client 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e] State 
transition from RUNNING to PENDING_SHUTDOWN
{code}

* What is wrong there? 
* What are the steps to avoid this in the future? 
* How to repair the situation?



was (Author: habdank):
[~vvcephei]
Back to the roots. What shall I say to Maintenance and Operations staff, when 
they need to handle the case below?

{code}
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [FATAL] SingleTopicstreamer:102 - Caught unhandled 
exception: Exception caught in process. taskId=0_2, 
processor=KSTREAM-SOURCE-00, topic=ar313_medium_topic, partition=2, 
offset=1892533025; 
[org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:240),
 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94),
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411),
 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:922),
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802),
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749),
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)]
 in thread 
streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [INFO ] KafkaStreams:261 - stream-client 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e] State 
transition from RUNNING to PENDING_SHUTDOWN
{code}

What is wrong there? What are the steps to avoid this in the future? How to 
repair the situation?


> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1
>Reporter: Seweryn 

[jira] [Comment Edited] (KAFKA-7214) Mystic FATAL error

2018-09-12 Thread Seweryn Habdank-Wojewodzki (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612374#comment-16612374
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-7214 at 9/12/18 3:45 PM:


[~vvcephei]
Back to the roots. What shall I say to Maintenance and Operations staff, when 
they need to handle the case below?

{code}
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [FATAL] SingleTopicstreamer:102 - Caught unhandled 
exception: Exception caught in process. taskId=0_2, 
processor=KSTREAM-SOURCE-00, topic=ar313_medium_topic, partition=2, 
offset=1892533025; 
[org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:240),
 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94),
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411),
 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:922),
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802),
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749),
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)]
 in thread 
streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [INFO ] KafkaStreams:261 - stream-client 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e] State 
transition from RUNNING to PENDING_SHUTDOWN
{code}

What is wrong there? What are the steps to avoid this in the future? How to 
repair the situation?



was (Author: habdank):
[~vvcephei]
Back to the roots. What shall I say to Maintenance and Operations staff, when 
they need to handle the case below?

{code}
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [FATAL] SingleTopicstreamer:102 - Caught unhandled 
exception: Exception caught in process. taskId=0_2, 
processor=KSTREAM-SOURCE-00, topic=ar313_medium_topic, partition=2, 
offset=1892533025; 
[org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:240),
 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94),
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411),
 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:922),
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802),
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749),
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)]
 in thread 
streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [INFO ] KafkaStreams:261 - stream-client 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e] State 
transition from RUNNING to PENDING_SHUTDOWN
{code}

What is wrong there? What are the steps to avoid this in the future?


> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> 

[jira] [Comment Edited] (KAFKA-7214) Mystic FATAL error

2018-09-12 Thread Seweryn Habdank-Wojewodzki (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612374#comment-16612374
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-7214 at 9/12/18 3:44 PM:


[~vvcephei]
Back to the roots. What shall I say to Maintenance and Operations staff, when 
they need to handle the case below?

{code}
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [FATAL] SingleTopicstreamer:102 - Caught unhandled 
exception: Exception caught in process. taskId=0_2, 
processor=KSTREAM-SOURCE-00, topic=ar313_medium_topic, partition=2, 
offset=1892533025; 
[org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:240),
 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94),
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411),
 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:922),
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802),
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749),
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)]
 in thread 
streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [INFO ] KafkaStreams:261 - stream-client 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e] State 
transition from RUNNING to PENDING_SHUTDOWN
{code}

What is wrong there? What are the steps to avoid this in the future?



was (Author: habdank):
[~vvcephei]
Back to the roots. What shall I say to Maintenance and Operations staff, when 
they need to handle the case below?

{code}
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [FATAL] SingleTopicstreamer:102 - Caught unhandled 
exception: Exception caught in process. taskId=0_2, 
processor=KSTREAM-SOURCE-00, topic=ar313_medium_topic, partition=2, 
offset=1892533025; 
[org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:240),
 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94),
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411),
 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:922),
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802),
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749),
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)]
 in thread 
streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [INFO ] KafkaStreams:261 - stream-client 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e] State 
transition from RUNNING to PENDING_SHUTDOWN
{code}

What is wrong there?


> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> Very often at startup of the streaming application I got exception:

[jira] [Commented] (KAFKA-7214) Mystic FATAL error

2018-09-12 Thread Seweryn Habdank-Wojewodzki (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612374#comment-16612374
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-7214:
---

[~vvcephei]
Back to the roots. What shall I say to Maintenance and Operations staff, when 
they need to handle the case below?

{code}
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [FATAL] SingleTopicstreamer:102 - Caught unhandled 
exception: Exception caught in process. taskId=0_2, 
processor=KSTREAM-SOURCE-00, topic=ar313_medium_topic, partition=2, 
offset=1892533025; 
[org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:240),
 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94),
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411),
 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:922),
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802),
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749),
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)]
 in thread 
streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-51
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:200 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 State transition from PENDING_SHUTDOWN to DEAD
2018-09-06 10:05:21 [ar313] [INFO ] StreamThread:1128 - stream-thread 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e-StreamThread-58]
 Shutdown complete
2018-09-06 10:05:21 [ar313] [INFO ] KafkaStreams:261 - stream-client 
[streamer-ar313-ar313_medium-15864802-2c1b-47e6-90f4-80b8b4fe4c3e] State 
transition from RUNNING to PENDING_SHUTDOWN
{code}

What is wrong there?


> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> Very often at startup of the streaming application I got exception:
> {code}
> Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=my_instance_medium_topic, partition=1, offset=198900203; 
> [org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:212),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:347),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:420),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:339),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:648),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)]
>  in thread 
> my_application-my_instance-my_instance_medium-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62
> {code}
> and then (without shutdown request from my side):
> {code}
> 2018-07-30 07:45:02 [ar313] [INFO ] StreamThread:912 - stream-thread 
> [my_application-my_instance-my_instance-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62]
>  State transition from PENDING_SHUTDOWN to DEAD.
> {code}
> What is this?
> How to correctly handle it?
> Thanks in advance for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6699) When one of two Kafka nodes are dead, streaming API cannot handle messaging

2018-09-12 Thread Seweryn Habdank-Wojewodzki (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612356#comment-16612356
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-6699:
---

[~mjsax]I understand you point, but still I do not see why we need more than 2 
kafka brokers. Especially that we have 5 separate zookeeper nodes.

> When one of two Kafka nodes are dead, streaming API cannot handle messaging
> ---
>
> Key: KAFKA-6699
> URL: https://issues.apache.org/jira/browse/KAFKA-6699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> Dears,
> I am observing quite often, when Kafka Broker is partly dead(*), then 
> application, which uses streaming API are doing nothing.
> (*) Partly dead in my case it means that one of two Kafka nodes are out of 
> order. 
> Especially when disk is full on one machine, then Broker is going in some 
> strange state, where streaming API goes vacations. It seems like regular 
> producer/consumer API has no problem in such a case.
> Can you have a look on that matter?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7379) send.buffer.bytes should be allowed to set -1 in KafkaStreams

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612167#comment-16612167
 ] 

ASF GitHub Bot commented on KAFKA-7379:
---

aai95 opened a new pull request #5643: [KAFKA-7379] [streams] send.buffer.bytes 
should be allowed to set -1 in KafkaStreams
URL: https://github.com/apache/kafka/pull/5643
 
 
   *atLeast(0) were replaced by atLeast(-1) in 
org.apache.kafka.streams.StreamsConfig*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> send.buffer.bytes should be allowed to set -1 in KafkaStreams
> -
>
> Key: KAFKA-7379
> URL: https://issues.apache.org/jira/browse/KAFKA-7379
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Badai Aqrandista
>Assignee: Aleksei Izmalkin
>Priority: Minor
>  Labels: beginner, easyfix, newbie
>
> send.buffer.bytes and receive.buffer.bytes are declared with atLeast(0) 
> constraint in StreamsConfig, whereas -1 should be also allowed to set. This 
> is like KAFKA-6891.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612028#comment-16612028
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 9/12/18 12:17 PM:
-

Thanks for your time [~stephane.maa...@gmail.com].
  
I am adding some logs to the method and reproducing again.

public static void atomicMoveWithFallback(Path source, Path target) throws 
IOException {
 try {
 log.debug("Attempting atomic move of {} to {}", source, target);
 Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
 log.debug("Atomic move of {} to {} succeeded", source, target);
 } catch (IOException outer) {
 try {
 log.debug("Attempting Non-atomic move of {} to {} succeeded after atomic move 
failed due to {}", source, target,
 outer.getMessage());
 Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
 log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due 
to {}", source, target,
 outer.getMessage());
 } catch (IOException inner) {
 log.error("Non-atomic move of {} to {} failed due to {}", source, target, 
inner.getMessage());
 inner.addSuppressed(outer);
 throw inner;
 }
 }
 }


was (Author: kobi hikri):
Thanks for your time [~stephane.maa...@gmail.com].
 

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at 

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612028#comment-16612028
 ] 

Kobi Hikri commented on KAFKA-1194:
---

Thanks for your time [~stephane.maa...@gmail.com].
 

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Stephane Maarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612019#comment-16612019
 ] 

Stephane Maarek commented on KAFKA-1194:


Thanks [~Kobi Hikri]! That's what I thought, it was a quick fix for the log 
cleaner, but it breaks the delete feature. At least the brokers don't crash 
anymore
To be honest I won't pursue more fix because of the lack of time, but I really 
hope someone in the core team can pick this up.

To me, it seems we have pinpointed through the PR all the points of potential 
failures for Windows.
As for the quality of the fix, I would say very low, I honestly don't 
understand much to the Windows file system and the order of Kafka's internal 
for log cleaning and deletion.

I'm really hoping this is enough research so that [~ijuma] or [~lindong] can 
probably determine a better fix. 
In a super ideal world, the function to fix is 
{code}Utils.atomicMoveWithFallback(src, dst);{code}

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte 

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612012#comment-16612012
 ] 

ASF GitHub Bot commented on KAFKA-1194:
---

simplesteph closed pull request #5604: DO-NOT-MERGE KAFKA-1194: Alternative fix
URL: https://github.com/apache/kafka/pull/5604
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala 
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index ec9d55f89ac..9f1be4ac41a 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -202,8 +202,26 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
* @throws IOException if rename fails
*/
   def renameTo(f: File) {
-try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
-finally file = f
+if (!OperatingSystem.IS_WINDOWS) {
+  try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
+  finally file = f
+} else {
+  // we get the file's position
+  val position = if (this.mmap == null) 0 else this.mmap.position()
+  if (OperatingSystem.IS_WINDOWS && this.mmap != null)
+// this sets mmap = null
+safeForceUnmap()
+  try {
+Utils.atomicMoveWithFallback(file.toPath, f.toPath)
+// we re-initialize mmap
+val raf = new RandomAccessFile(f, "rw")
+val len = raf.length()
+this.mmap = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+this.mmap.position(position)
+CoreUtils.swallow(raf.close(), this)
+  }
+  finally file = f
+}
   }
 
   /**
@@ -228,7 +246,7 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
   // However, in some cases it can pause application threads(STW) for a 
long moment reading metadata from a physical disk.
   // To prevent this, we forcefully cleanup memory mapping within proper 
execution which never affects API responsiveness.
   // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
-  safeForceUnmap()
+  if (mmap != null) safeForceUnmap()
 }
 Files.deleteIfExists(file.toPath)
   }
@@ -251,6 +269,7 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
   /** Close the index */
   def close() {
 trimToValidSize()
+closeHandler()
   }
 
   def closeHandler(): Unit = {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 32203acde9a..45cacd637f3 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -830,6 +830,7 @@ class LogManager(logDirs: Seq[File],
 cleaner.abortCleaning(topicPartition)
 cleaner.updateCheckpoints(removedLog.dir.getParentFile)
   }
+  removedLog.close()
   removedLog.renameDir(Log.logDeleteDirName(topicPartition))
   checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
   checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611916#comment-16611916
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 9/12/18 11:27 AM:
-

Hi [~stephane.maa...@gmail.com],

Apologies for the late response.

I built the trunk branch merged with your PR.

Log attached here: [^RetentionExpiredWindows.txt]

A snapshot of the log file directory is also attached. Please notice that the 
latest log files were *correctly marked as deleted*.

TL;DR:
 # The broker *no longer shuts-down*
 # Files marked for deletion are still not deleted. Few are deleted, but not 
all.
 # Plenty of "ERROR" log lines (as expected :)).



!image-2018-09-12-14-25-52-632.png!


was (Author: kobi hikri):
Hi [~stephane.maa...@gmail.com],

Apologies for the late response.

I built the trunk branch merged with your PR.

Log attached here: [^RetentionExpiredWindows.txt]

TL;DR:

1. The broker *no longer shuts-down*
 2. Files marked for deletion are still not deleted. Few are deleted, but not 
all.
 3. Plenty of "ERROR" log lines (as expected :)).

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the 

[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611916#comment-16611916
 ] 

Kobi Hikri edited comment on KAFKA-1194 at 9/12/18 11:05 AM:
-

Hi [~stephane.maa...@gmail.com],

Apologies for the late response.

I built the trunk branch merged with your PR.

Log attached here: [^RetentionExpiredWindows.txt]

TL;DR:

1. The broker *no longer shuts-down*
 2. Files marked for deletion are still not deleted. Few are deleted, but not 
all.
 3. Plenty of "ERROR" log lines (as expected :)).


was (Author: kobi hikri):
Hi [~stephane.maa...@gmail.com],

Apologies for the late response.

I built the trunk branch merged with your PR.

Log attached below.

TL;DR:

1. The broker *no longer shuts-down*
2. Files marked for deletion are still not deleted. Few are deleted, but not 
all.
3. Plenty of "ERROR" log lines (as expected :)).

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, kafka-1194-v1.patch, kafka-1194-v2.patch, kafka-bombarder.7z, 
> screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is 

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-12 Thread Kobi Hikri (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611916#comment-16611916
 ] 

Kobi Hikri commented on KAFKA-1194:
---

Hi [~stephane.maa...@gmail.com],

Apologies for the late response.

I built the trunk branch merged with your PR.

Log attached below.

TL;DR:

1. The broker *no longer shuts-down*
2. Files marked for deletion are still not deleted. Few are deleted, but not 
all.
3. Plenty of "ERROR" log lines (as expected :)).

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4030) Update older quickstart documents to clarify which version they relate to

2018-09-12 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4030.
--
Resolution: Not A Problem

Closing as per PR comment.

> Update older quickstart documents to clarify which version they relate to
> -
>
> Key: KAFKA-4030
> URL: https://issues.apache.org/jira/browse/KAFKA-4030
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Todd Snyder
>Priority: Major
>  Labels: documentation, website
>
> If you search for 'kafka quickstart' it takes you to 
> kafka.apache.org/07/quickstart.html which is, unclearly, for release 0.7 and 
> not the current release.
> [~gwenshap] suggested a ticket and a note added to the 0.7 (and likely 0.8 
> and 0.9) quickstart guides directing people to use ~current for the latest 
> release documentation.
> I'll submit a PR shortly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4910) kafka consumer not receiving messages

2018-09-12 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4910.
--
Resolution: Cannot Reproduce

Closing inactive issue.  Please reopen if the issue still exists on newer 
versions.

> kafka consumer not receiving messages
> -
>
> Key: KAFKA-4910
> URL: https://issues.apache.org/jira/browse/KAFKA-4910
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: zhiwei
>Priority: Major
> Attachments: 128314.jstack
>
>
> kafka consumer not receiving messages
> consumer log:
> "2017-03-10 14:35:34,448" | INFO  | [Thread-5-KafkaSpout] | Revoking 
> previously assigned partitions [MAFS_BPIS_ICSWIPE_IC-0] for group 
> data_storm_hw_tianlu | 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 
> (ConsumerCoordinator.java:291) 
> "2017-03-10 14:35:34,451" | INFO  | [Thread-5-KafkaSpout] | (Re-)joining 
> group data_storm_hw_tianlu | 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> (AbstractCoordinator.java:326) 
> "2017-03-10 14:35:34,456" | INFO  | [Thread-5-KafkaSpout] | Successfully 
> joined group data_storm_hw_tianlu with generation 7 | 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> (AbstractCoordinator.java:434) 
> "2017-03-10 14:35:34,456" | INFO  | [Thread-5-KafkaSpout] | Setting newly 
> assigned partitions [MAFS_BPIS_ICSWIPE_IC-0] for group data_storm_hw_tianlu | 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 
> (ConsumerCoordinator.java:230) 
> "2017-03-10 14:36:10,297" | INFO  | [Thread-7-__system] | Getting metrics for 
> server on port 29104 | backtype.storm.messaging.netty.Server 
> (Server.java:321) 
> "2017-03-10 14:36:10,298" | INFO  | [Thread-7-__system] | Getting metrics for 
> client connection to Netty-Client-streaming96/10.55.45.96:29106 | 
> backtype.storm.messaging.netty.Client (Client.java:405) 
> "2017-03-10 14:37:10,297" | INFO  | [Thread-7-__system] | Getting metrics for 
> server on port 29104 | backtype.storm.messaging.netty.Server 
> (Server.java:321) 
> "2017-03-10 14:37:10,298" | INFO  | [Thread-7-__system] | Getting metrics for 
> client connection to Netty-Client-streaming96/10.55.45.96:29106 | 
> backtype.storm.messaging.netty.Client (Client.java:405) 
> "2017-03-10 14:38:10,297" | INFO  | [Thread-7-__system] | Getting metrics for 
> server on port 29104 | backtype.storm.messaging.netty.Server 
> (Server.java:321) 
> "2017-03-10 14:38:10,297" | INFO  | [Thread-7-__system] | Getting metrics for 
> client connection to Netty-Client-streaming96/10.55.45.96:29106 | 
> backtype.storm.messaging.netty.Client (Client.java:405) 
> "2017-03-10 14:39:10,297" | INFO  | [Thread-7-__system] | Getting metrics for 
> server on port 29104 | backtype.storm.messaging.netty.Server 
> (Server.java:321) 
> "2017-03-10 14:39:10,298" | INFO  | [Thread-7-__system] | Getting metrics for 
> client connection to Netty-Client-streaming96/10.55.45.96:29106 | 
> backtype.storm.messaging.netty.Client (Client.java:405) 
> "2017-03-10 14:40:10,298" | INFO  | [Thread-7-__system] | Getting metrics for 
> server on port 29104 | backtype.storm.messaging.netty.Server 
> (Server.java:321) 
> "2017-03-10 14:40:10,298" | INFO  | [Thread-7-__system] | Getting metrics for 
> client connection to Netty-Client-streaming96/10.55.45.96:29106 | 
> backtype.storm.messaging.netty.Client (Client.java:405) 
> "2017-03-10 14:40:34,454" | INFO  | [Thread-5-KafkaSpout] | Revoking 
> previously assigned partitions [MAFS_BPIS_ICSWIPE_IC-0] for group 
> data_storm_hw_tianlu | 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 
> (ConsumerCoordinator.java:291) 
> "2017-03-10 14:40:34,457" | INFO  | [Thread-5-KafkaSpout] | (Re-)joining 
> group data_storm_hw_tianlu | 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> (AbstractCoordinator.java:326) 
> "2017-03-10 14:40:36,458" | INFO  | [Thread-5-KafkaSpout] | Successfully 
> joined group data_storm_hw_tianlu with generation 8 | 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> (AbstractCoordinator.java:434) 
> "2017-03-10 14:40:36,459" | INFO  | [Thread-5-KafkaSpout] | Setting newly 
> assigned partitions [MAFS_BPIS_ICSWIPE_IC-0] for group data_storm_hw_tianlu | 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 
> (ConsumerCoordinator.java:230) 
> "2017-03-10 14:41:10,297" | INFO  | [Thread-7-__system] | Getting metrics for 
> server on port 29104 | backtype.storm.messaging.netty.Server 
> (Server.java:321) 
> "2017-03-10 14:41:10,298" | INFO  | [Thread-7-__system] | Getting metrics for 
> client connection to Netty-Client-streaming96/10.55.45.96:29106 | 
> backtype.storm.messaging.netty.Client (Client.java:405) 
> "2017-03-10 14:42:10,297" | 

[jira] [Resolved] (KAFKA-3657) NewProducer NullPointerException on ProduceRequest

2018-09-12 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3657.
--
Resolution: Cannot Reproduce

Closing inactive issue.  Please reopen if  the issue still exists on newer 
versions.

> NewProducer NullPointerException on ProduceRequest
> --
>
> Key: KAFKA-3657
> URL: https://issues.apache.org/jira/browse/KAFKA-3657
> Project: Kafka
>  Issue Type: Bug
>  Components: network, producer 
>Affects Versions: 0.8.2.1, 0.9.0.1
> Environment: linux 3.2.0 debian7
>Reporter: Vamsi Subhash Achanta
>Assignee: Jun Rao
>Priority: Major
>  Labels: reliability
>
> The producer upon send.get() on the future appends to the accumulator the 
> record batches and the Sender.java (separate thread) flushes it to the 
> server. The produce request waits on the countDownLatch in the 
> FutureRecordMetadata:
> public RecordMetadata get() throws InterruptedException, 
> ExecutionException {
> this.result.await();
> In this case, the client thread is blocked for ever (as it is get() without 
> timeout) for the response and the response upon poll by the Sender returns an 
> attachment with the batch value as null. The batch is processed and the 
> request is errored out. The Sender catches a global level exception and then 
> goes ahead. As the accumulator is drained, the response will never be 
> returned and the producer client thread calling get() is blocked for ever on 
> the latch await call.
> I checked at the server end but still haven't found the reason for null 
> batch. Any pointers on this?
> ERROR [2016-05-01 21:00:09,256] [kafka-producer-network-thread |producer-app] 
> [Sender] message_id: group_id: : Uncaught error in kafka producer I/O thread:
> ! java.lang.NullPointerException: null
> ! at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:266)
> ! at 
> org.apache.kafka.clients.producer.internals.Sender.handleResponse(Sender.java:236)
> ! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:196)
> ! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> ! at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-12 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611610#comment-16611610
 ] 

Navinder Brar edited comment on KAFKA-7149 at 9/12/18 7:00 AM:
---

[~guozhang] I am working with [~asurana] on raising this PR, will send it in a 
couple of days. Currently, the changes I have made is using taskIds instead of 
topicPartitions in AssignmentInfo. But another thing I observed is we are 
sending the same assignmentInfo to all consumers, so we are replicating the 
complete assignment(of all hosts and partitions) to all the consumers. Maybe we 
can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array 
so that it is not replicated for all the hosts and is sent just once. With the 
current changes(changing TopicPartitions to TaskIDs and using GZIP compression) 
I have reduced assignment size(on 400 hosts with 3 threads each, having 512 
partitions) from 196 MBs to 8 MB). If we can stop the replication of 
partitionsByHost on each consumer, the assignment size can be reduced to a few 
hundred kbs). Please share your thoughts.


was (Author: navibrar):
I am working with [~asurana] on raising this PR, will send it in a couple of 
days. Currently, the changes I have made is using taskIds instead of 
topicPartitions in AssignmentInfo. But another thing I observed is we are 
sending the same assignmentInfo to all consumers, so we are replicating the 
complete assignment(of all hosts and partitions) to all the consumers. Maybe we 
can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array 
so that it is not replicated for all the hosts and is sent just once. With the 
current changes(changing TopicPartitions to TaskIDs and using GZIP compression) 
I have reduced assignment size(on 400 hosts with 3 threads each, having 512 
partitions) from 196 MBs to 8 MB). If we can stop the replication of 
partitionsByHost on each consumer, the assignment size can be reduced to a few 
hundred kbs). Please share your thoughts.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)