[jira] [Commented] (KAFKA-5431) LogCleaner stopped due to org.apache.kafka.common.errors.CorruptRecordException

2018-12-16 Thread Swathi Mocharla (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722728#comment-16722728
 ] 

Swathi Mocharla commented on KAFKA-5431:


hi [~huxi_2b], we are currently on .11.0.0 and are seeing this issue with the 
default value of log.preallocate which is false. We have a large number of 
segement files in the __consumer_offsets that are not getting compacted. 

{{[2018-12-12 00:11:04,597] INFO Cleaner 0: Building offset map for log 
__consumer_offsets-45 for 124 segments in offset range [16446991, 85239736). 
(kafka.log.LogCleaner)}}
{{[2018-12-12 00:11:04,831] ERROR [kafka-log-cleaner-thread-0]: Error due to 
(kafka.log.LogCleaner)}}
{{org.apache.kafka.common.errors.CorruptRecordException: Record size is less 
than the minimum record overhead (14)}}
{{[2018-12-12 00:11:04,837] INFO [kafka-log-cleaner-thread-0]: Stopped 
(kafka.log.LogCleaner)}}

 

We previously deleted the segment files and restarted our consumers. But this 
didn't help and we are running towards a disk full issue. Can you please help.

> LogCleaner stopped due to 
> org.apache.kafka.common.errors.CorruptRecordException
> ---
>
> Key: KAFKA-5431
> URL: https://issues.apache.org/jira/browse/KAFKA-5431
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Carsten Rietz
>Assignee: huxihx
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.1, 1.0.0
>
>
> Hey all,
> i have a strange problem with our uat cluster of 3 kafka brokers.
> the __consumer_offsets topic was replicated to two instances and our disks 
> ran full due to a wrong configuration of the log cleaner. We fixed the 
> configuration and updated from 0.10.1.1 to 0.10.2.1 .
> Today i increased the replication of the __consumer_offsets topic to 3 and 
> triggered replication to the third cluster via kafka-reassign-partitions.sh. 
> That went well but i get many errors like
> {code}
> [2017-06-12 09:59:50,342] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,18] offset 0 error Record size is less than the 
> minimum record overhead (14) (kafka.server.ReplicaFetcherThread)
> [2017-06-12 09:59:50,342] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,24] offset 0 error Record size is less than the 
> minimum record overhead (14) (kafka.server.ReplicaFetcherThread)
> {code}
> Which i think are due to the full disk event.
> The log cleaner threads died on these wrong messages:
> {code}
> [2017-06-12 09:59:50,722] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> org.apache.kafka.common.errors.CorruptRecordException: Record size is less 
> than the minimum record overhead (14)
> [2017-06-12 09:59:50,722] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> {code}
> Looking at the file is see that some are truncated and some are jsut empty:
> $ ls -lsh 00594653.log
> 0 -rw-r--r-- 1 user user 100M Jun 12 11:00 00594653.log
> Sadly i do not have the logs any more from the disk full event itsself.
> I have three questions:
> * What is the best way to clean this up? Deleting the old log files and 
> restarting the brokers?
> * Why did kafka not handle the disk full event well? Is this only affecting 
> the cleanup or may we also loose data?
> * Is this maybe caused by the combination of upgrade and disk full?
> And last but not least: Keep up the good work. Kafka is really performing 
> well while being easy to administer and has good documentation!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7728) Add JoinReason to the join group request for better rebalance handling

2018-12-16 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722715#comment-16722715
 ] 

Boyang Chen commented on KAFKA-7728:


[~guozhang] [~mjsax] [~hachikuji] [~enether] Do you have time to add some 
discussion to this thread? Thank you!

> Add JoinReason to the join group request for better rebalance handling
> --
>
> Key: KAFKA-7728
> URL: https://issues.apache.org/jira/browse/KAFKA-7728
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Mayuresh Gharat
>Priority: Major
>  Labels: consumer, mirror-maker, needs-kip
>
> Recently [~mgharat] and I discussed about the current rebalance logic on 
> leader join group request handling. So far we blindly trigger rebalance when 
> the leader rejoins. The caveat is that KIP-345 is not covering this effort 
> and if a consumer group is not using sticky assignment but using other 
> strategy like round robin, the redundant rebalance could still shuffle the 
> topic partitions around consumers. (for example mirror maker application)
> I checked on broker side and here is what we currently do:
>  
> {code:java}
> if (group.isLeader(memberId) || !member.matches(protocols))  
> // force a rebalance if a member has changed metadata or if the leader sends 
> JoinGroup. 
> // The latter allows the leader to trigger rebalances for changes affecting 
> assignment 
> // which do not affect the member metadata (such as topic metadata changes 
> for the consumer) {code}
> Based on the broker logic, we only need to trigger rebalance for leader 
> rejoin when the topic metadata change has happened. I also looked up the 
> ConsumerCoordinator code on client side, and found out the metadata 
> monitoring logic here:
> {code:java}
> public boolean rejoinNeededOrPending() {
> ...
> // we need to rejoin if we performed the assignment and metadata has changed
> if (assignmentSnapshot != null && 
> !assignmentSnapshot.equals(metadataSnapshot))
>   return true;
> }{code}
>  I guess instead of just returning true, we could introduce a new enum field 
> called JoinReason which could indicate the purpose of the rejoin. Thus we 
> don't need to do a full rebalance when the leader is just in rolling bounce.
> We could utilize this information I guess. Just add another enum field into 
> the join group request called JoinReason so that we know whether leader is 
> rejoining due to topic metadata change. If yes, we trigger rebalance 
> obviously; if no, we shouldn't trigger rebalance.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-12-16 Thread Justin Jack (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722691#comment-16722691
 ] 

Justin Jack commented on KAFKA-7641:


We are already working on the KIP and it is currently being discussed.

 

-

[Dragon City Hack IOS|https://dragongames.co/dragon-city-for-ios/]

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7742) DelegationTokenCache#hmacIdCache entry is not cleared when a token is removed using removeToken(String tokenId) API.

2018-12-16 Thread Satish Duggana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-7742:
--
Description: 
DelegationTokenCache#hmacIdCache entry is not cleared when a token is removed 
using `removeToken(String tokenId)`[1] API.


1) 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCache.java#L84

  was:
DelegationTokenCache#hmacIdCache entry is not cleared when a token is removed 
using `removeToken(String tokenId)`[1] API.


[1] 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCache.java#L84


> DelegationTokenCache#hmacIdCache entry is not cleared when a token is removed 
> using removeToken(String tokenId) API.
> 
>
> Key: KAFKA-7742
> URL: https://issues.apache.org/jira/browse/KAFKA-7742
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>
> DelegationTokenCache#hmacIdCache entry is not cleared when a token is removed 
> using `removeToken(String tokenId)`[1] API.
> 1) 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCache.java#L84



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7742) DelegationTokenCache#hmacIdCache entry is not cleared when a token is removed using removeToken(String tokenId) API.

2018-12-16 Thread Satish Duggana (JIRA)
Satish Duggana created KAFKA-7742:
-

 Summary: DelegationTokenCache#hmacIdCache entry is not cleared 
when a token is removed using removeToken(String tokenId) API.
 Key: KAFKA-7742
 URL: https://issues.apache.org/jira/browse/KAFKA-7742
 Project: Kafka
  Issue Type: Bug
  Components: security
Reporter: Satish Duggana
Assignee: Satish Duggana


DelegationTokenCache#hmacIdCache entry is not cleared when a token is removed 
using `removeToken(String tokenId)`[1] API.


[1] 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCache.java#L84



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-12-16 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722635#comment-16722635
 ] 

Richard Yu commented on KAFKA-7432:
---

Hi, just want to point out something here.

What Kafka currently supports is continuous processing, which Spark Streaming 
most recently implemented. In contrast, what this ticket is suggesting to 
implement is microbatch processing in which data is sent in batches.  In some 
data streaming circles, continuous processing is considered the best option for 
sending data. Microbatching was an older technique. 

I don't know if we need to implement this particular option, especially since 
latency overall for microbatching is higher than continuous processing.

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-12-16 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722635#comment-16722635
 ] 

Richard Yu edited comment on KAFKA-7432 at 12/17/18 1:59 AM:
-

Hi, just want to point out something here.

What Kafka currently supports is continuous processing, which Spark Streaming 
most recently implemented. In contrast, what this ticket is suggesting to 
implement is microbatch processing in which data is sent in batches.  In some 
data streaming circles, continuous processing is considered the best option for 
sending data. Microbatching was an older technique. 

I don't know if we need to implement this particular option, especially since 
latency overall for microbatching is higher than continuous processing.

Spark is moving from microbatch processing to continuous largely because of 
latency improvements. So with what Kafka has right now, this ticket probably 
wouldn't be necessary.


was (Author: yohan123):
Hi, just want to point out something here.

What Kafka currently supports is continuous processing, which Spark Streaming 
most recently implemented. In contrast, what this ticket is suggesting to 
implement is microbatch processing in which data is sent in batches.  In some 
data streaming circles, continuous processing is considered the best option for 
sending data. Microbatching was an older technique. 

I don't know if we need to implement this particular option, especially since 
latency overall for microbatching is higher than continuous processing.

Spark is largely moving from microbatch processing to continuous largely 
because of latency improvements. So with what Kafka has right now, this ticket 
probably wouldn't be necessary.

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7243) Add unit integration tests to validate metrics in Kafka Streams

2018-12-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722624#comment-16722624
 ] 

Guozhang Wang commented on KAFKA-7243:
--

[~johnma] are you still working on this ticket? If not could you un-assign 
yourself so that other people could pick it up?

> Add unit integration tests to validate metrics in Kafka Streams
> ---
>
> Key: KAFKA-7243
> URL: https://issues.apache.org/jira/browse/KAFKA-7243
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Mariam John
>Priority: Major
>  Labels: newbie++
>
> We should add an integration test for Kafka Streams, that validates:
> 1. After streams application are started, all metrics from different levels 
> (thread, task, processor, store, cache) are correctly created and displaying 
> recorded values.
> 2. When streams applicatio are shutdown, all metrics are correctly 
> de-registered and removed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6820) Improve on StreamsMetrics Public APIs

2018-12-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722623#comment-16722623
 ] 

Guozhang Wang commented on KAFKA-6820:
--

[~NIzhikov]

Yes this ticket is still on, and actually it is a very good timing to ask :)

But I'd like to clearly define the scope of this ticket to separate it from 
https://issues.apache.org/jira/browse/KAFKA-6819, this is based on discussions 
with [~vvcephei] on those two ticket so far.

For KAFKA-6819: it should be for refactoring Stream's provided built-in 
metrics, and as John suggested, we should consider only providing a single 
granularity for each metric, and let the users to do rolling ups themselves 
than letting Streams provides it. It would be a quite major built-in metrics 
refactoring that requires a KIP, hence a big scope for discussion details.

For KAFKA-6820 (this ticket), let's scope on 2) in the description: 

{code}
We could enforce the scopeName possible values, and well document on the sensor 
hierarchies that would be incurred from the function calls. In this way the 
library can help closing user's sensors automatically when the corresponding 
scope (store, task, thread, etc) is being de-constructed.
{code}

I think  KAFKA-6820 has a smaller scope and hence would be good to pick up for 
now.

> Improve on StreamsMetrics Public APIs
> -
>
> Key: KAFKA-6820
> URL: https://issues.apache.org/jira/browse/KAFKA-6820
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Our current `addLatencyAndThroughputSensor`, `addThroughputSensor` are not 
> very well designed and hence not very user friendly to people to add their 
> customized sensors. We could consider improving on this feature. Some related 
> things to consider:
> 1. Our internal built-in metrics should be independent on these public APIs 
> which are for user customized sensor only. See KAFKA-6819 for related 
> description.
> 2. We could enforce the scopeName possible values, and well document on the 
> sensor hierarchies that would be incurred from the function calls. In this 
> way the library can help closing user's sensors automatically when the 
> corresponding scope (store, task, thread, etc) is being de-constructed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6819) Refactor build-in StreamsMetrics internal implementations

2018-12-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722619#comment-16722619
 ] 

Guozhang Wang commented on KAFKA-6819:
--

Please see my other comment on KAFKA-6820.

> Refactor build-in StreamsMetrics internal implementations
> -
>
> Key: KAFKA-6819
> URL: https://issues.apache.org/jira/browse/KAFKA-6819
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Our current internal implementations of StreamsMetrics and different layered 
> metrics like StreamMetricsThreadImpl, TaskMetrics, NodeMetrics etc are a bit 
> messy nowadays. We could improve on the current situation by doing the 
> following:
> 0. For thread-level metrics, refactor the {{StreamsMetricsThreadImpl}} class 
> to {{ThreadMetrics}} such that a) it does not extend from 
> {{StreamsMetricsImpl}} but just include the {{StreamsMetricsThreadImpl}} as 
> its constructor parameters. And make its constructor, replacing with a static 
> {{addAllSensors(threadName)}} that tries to register all the thread-level 
> sensors for the given thread name.
> 1. Add a static function for each of the built-in sensors of the thread-level 
> metrics in {{ThreadMetrics}} that relies on the internal 
> {{StreamsMetricsConventions}} to get thread level sensor names. If the sensor 
> cannot be found from the internal {{Metrics}} registry, create the sensor 
> on-the-fly.
> 2.a Add a static {{removeAllSensors(threadName)}} function in 
> {{ThreadMetrics}} that tries to de-register all the thread-level metrics for 
> this thread, if there is no sensors then it will be a no-op. In 
> {{StreamThread#close()}} we will trigger this function; and similarly in 
> `TopologyTestDriver` when we close the driver we will also call this function 
> as well. As a result, the {{ThreadMetrics}} class itself would only contain 
> static functions with no member fields at all.
> 2.b We can consider doing the same for {{TaskMetrics}}, {{NodeMetrics}} and 
> {{NamedCacheMetrics}} as well, and add a {{StoreMetrics}} following the 
> similar pattern: although these metrics are not accessed externally to their 
> enclosing class in the future this may be changed as well.
> 3. Then, we only pass {{StreamsMetricsImpl}} around between the internal 
> classes, to access the specific sensor whenever trying to record it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7176) State store metrics for migrated tasks are not removed

2018-12-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722610#comment-16722610
 ] 

Guozhang Wang commented on KAFKA-7176:
--

I've looked into trunk again and I believe it's been fixed now with 
[~vvcephei]'s fix. John, could you confirm? If yes please feel free to resolve 
it with the fix versions.

> State store metrics for migrated tasks are not removed
> --
>
> Key: KAFKA-7176
> URL: https://issues.apache.org/jira/browse/KAFKA-7176
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Sam Lendle
>Priority: Major
>
> I observed that state store metrics for tasks that have been migrated to 
> other instances are not removed and are still being updated with phantom 
> values, (when viewed for example via jmx mbeans). 
> For all tasks/threads on the same instance (including for migrated tasks), 
> the values of state store metrics are all (nearly) the same. For the rate 
> metrics at least, the value reported for each task is the rate I expect for 
> all active tasks on that instance, so things are apparently being counted 
> multiple times. Presumably, this is how migrated task metrics are being 
> updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7293) Merge followed by groupByKey/join might violate co-partioning

2018-12-16 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722600#comment-16722600
 ] 

Richard Yu commented on KAFKA-7293:
---

It probably also would be a good idea to define the behavior that happens when 
a runtime exception as mentioned above is thrown. For example, do we attempt to 
partition it afterwards?  

> Merge followed by groupByKey/join might violate co-partioning
> -
>
> Key: KAFKA-7293
> URL: https://issues.apache.org/jira/browse/KAFKA-7293
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> The merge() operations can be applied to input KStreams that have a different 
> number of tasks (ie, input topic partitions). For this case, the input topics 
> are not co-partitioned and thus the result KStream is not partitioned even if 
> each input KStream is partitioned by its own.
> Because, no "repartitionRequired" flag is set on the input KStreams, the flag 
> is also not set on the output KStream. Hence, if a groupByKey() or join() 
> operation is applied the output KStream, we don't insert a repartition topic. 
> However, repartitioning would be required because the KStream is not 
> partitioned.
> We cannot detect this during compile time, because the number or partitions 
> is unknown, and thus, we cannot decide if repartitioning is required or not. 
> However, we can add a runtime check similar to joins() that checks if data is 
> correctly (co-)partitioned and if not, we can raise a runtime exception.
> Note, for merge() in contrast to join(), we should only check for 
> co-partitioning, if the merge() is followed by a groupByKey() or join() 
> operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2758) Improve Offset Commit Behavior

2018-12-16 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722593#comment-16722593
 ] 

Richard Yu commented on KAFKA-2758:
---

I just want to say that this issue might no longer be relevant. It appears to 
be quite old.

> Improve Offset Commit Behavior
> --
>
> Key: KAFKA-2758
> URL: https://issues.apache.org/jira/browse/KAFKA-2758
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie, reliability
>
> There are two scenarios of offset committing that we can improve:
> 1) we can filter the partitions whose committed offset is equal to the 
> consumed offset, meaning there is no new consumed messages from this 
> partition and hence we do not need to include this partition in the commit 
> request.
> 2) we can make a commit request right after resetting to a fetch / consume 
> position either according to the reset policy (e.g. on consumer starting up, 
> or handling of out of range offset, etc), or through the {code} seek {code} 
> so that if the consumer fails right after these event, upon recovery it can 
> restarts from the reset position instead of resetting again: this can lead 
> to, for example, data loss if we use "largest" as reset policy while there 
> are new messages coming to the fetching partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7234) Allow auto leader rebalance during partition reassignment

2018-12-16 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722598#comment-16722598
 ] 

Richard Yu commented on KAFKA-7234:
---

This might be a corner case, but if several nodes are brought into the cluster 
in rapid succession, then we probably wouldn't want to rebalance every single 
time a node is added. We should artificially trigger a rebalance once every set 
block of time if necessary. 

> Allow auto leader rebalance during partition reassignment
> -
>
> Key: KAFKA-7234
> URL: https://issues.apache.org/jira/browse/KAFKA-7234
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> We currently skip auto leader balancing while a reassignment is in progress. 
> However, when bringing new nodes into the cluster, you actually want the 
> leaders to rebalance as soon as possible so that the new nodes can begin 
> doing work. In general, having better balance in the cluster seems like it 
> would be better for the reassignment, but perhaps there is a good reason to 
> skip it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-16 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722591#comment-16722591
 ] 

Dongjoon Hyun commented on KAFKA-7703:
--

Hi, All.
Is there any update for this issue?

> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
>Reporter: Shixiong Zhu
>Assignee: Viktor Somogyi
>Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean (Windows OS)

2018-12-16 Thread sacha barber (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722566#comment-16722566
 ] 

sacha barber commented on KAFKA-6647:
-

I would also like to add this seems to be caused by the 

TopologyTestDriver.close

 

if I add a method like this (scala sorry)

 

def cleanup(props:Properties, testDriver: TopologyTestDriver) = {

{code}

def cleanup(props:Properties, testDriver: TopologyTestDriver) = {

 try {
  testDriver.close
 } catch {
    case e: Exception => {
      delete(new File("C:\\data\\kafka-streams"))
    }
  }
}

def delete(file: File) {
  if (file.isDirectory)
    Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
    file.delete
  }

{code}

 

I see the Exception others are talking about above getting caught for the 
TopologyTestDriver close() call, But then I just resort to using regular 

java.io to do the actual delete for my tests. This does get my tests to pass 
ok, but why cant the Kafka code do this on windows, if my simple tests code 
works. I read the part about how windows will only delete file on next file 
assignment, but to my eyes my simple tests using delete worked here, whilst 
Kafka TopologyTestDriver close() did not

 

I am using Windows 10.0, and am using Kafka 2.1.0

 

And have changed my state directory to this one

 

{code}

props.put(StreamsConfig.STATE_DIR_CONFIG, 
s"C:\\data\\kafka-streams".asInstanceOf[Object])

{code}

 

Any ideas when this will get fixed properly?

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean 
> (Windows OS)
> -
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Priority: Minor
>  Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7741) Bad dependency via SBT

2018-12-16 Thread sacha barber (JIRA)
sacha barber created KAFKA-7741:
---

 Summary: Bad dependency via SBT
 Key: KAFKA-7741
 URL: https://issues.apache.org/jira/browse/KAFKA-7741
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.1.0
 Environment: Windows 10 professional, IntelliJ IDEA 2017.1
Reporter: sacha barber


I am using the Kafka-Streams-Scala 2.1.0 JAR.

And if I create a new Scala project using SBT with these dependencies 

{code}
name := "ScalaKafkaStreamsDemo"

version := "1.0"

scalaVersion := "2.12.1"

libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0"
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"

//TEST
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % 
"2.0.0" % Test
{code}

I get this error

 

{code}
SBT 'ScalaKafkaStreamsDemo' project refresh failed
Error:Error while importing SBT project:...[info] Resolving 
jline#jline;2.14.1 ...
[warn] [FAILED ] 
javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}: (0ms)
[warn]  local: tried
[warn] 
C:\Users\sacha\.ivy2\local\javax.ws.rs\javax.ws.rs-api\2.1.1\${packaging.type}s\javax.ws.rs-api.${packaging.type}
[warn]  public: tried
[warn] 
https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.${packaging.type}
[info] downloading 
https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-test-utils/2.1.0/kafka-streams-test-utils-2.1.0.jar
 ...
[info] [SUCCESSFUL ] 
org.apache.kafka#kafka-streams-test-utils;2.1.0!kafka-streams-test-utils.jar 
(344ms)
[warn] ::
[warn] :: FAILED DOWNLOADS ::
[warn] :: ^ see resolution messages for details ^ ::
[warn] ::
[warn] :: javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
[warn] ::
[trace] Stack trace suppressed: run 'last *:ssExtractDependencies' for the full 
output.
[trace] Stack trace suppressed: run 'last *:update' for the full output.
[error] (*:ssExtractDependencies) sbt.ResolveException: download failed: 
javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
[error] (*:update) sbt.ResolveException: download failed: 
javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
[error] Total time: 8 s, completed 16-Dec-2018 19:27:21
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; 
support was removed in 8.0See complete log in file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log
{code}

This seems to be a common issue with bad dependency from Kafka to 
javax.ws.rs-api.

if I drop the Kafka version down to 2.0.0 and add this line to my SBT file this 
error goes away

{code}
libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" 
artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))`
{code}
 

However I would like to work with 2.1.0 version.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)