[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics

2019-05-23 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847286#comment-16847286
 ] 

Boyang Chen commented on KAFKA-8342:


[~clearpal7] Yea, feel free to take it. I will probably be away for a couple of 
days, you could take a initial look and see how that tool should be built. 
Thank you for the help!

> Admin tool to setup Kafka Stream topology (internal) topics
> ---
>
> Key: KAFKA-8342
> URL: https://issues.apache.org/jira/browse/KAFKA-8342
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> We have seen customers who need to deploy their application to production 
> environment but don't have access to create changelog and repartition topics. 
> They need to ask admin team to manually create those topics before proceeding 
> to start the actual stream job. We could add an admin tool to help them go 
> through the process quicker by providing a command that could
>  # Read through current stream topology
>  # Create corresponding topics as needed, even including output topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics

2019-05-23 Thread WooYoung (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847245#comment-16847245
 ] 

WooYoung commented on KAFKA-8342:
-

[~bchen225242] I`m interested in this ticket to help them to go through the 
process quicker by providing a command tool.

could I take this ticket?

 

> Admin tool to setup Kafka Stream topology (internal) topics
> ---
>
> Key: KAFKA-8342
> URL: https://issues.apache.org/jira/browse/KAFKA-8342
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> We have seen customers who need to deploy their application to production 
> environment but don't have access to create changelog and repartition topics. 
> They need to ask admin team to manually create those topics before proceeding 
> to start the actual stream job. We could add an admin tool to help them go 
> through the process quicker by providing a command that could
>  # Read through current stream topology
>  # Create corresponding topics as needed, even including output topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8424) Replace ListGroups request/response with automated protocol

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847241#comment-16847241
 ] 

ASF GitHub Bot commented on KAFKA-8424:
---

abbccdda commented on pull request #6805: KAFKA-8424: replace ListGroups 
request/response with automated protocol
URL: https://github.com/apache/kafka/pull/6805
 
 
   As title suggested.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replace ListGroups request/response with automated protocol
> ---
>
> Key: KAFKA-8424
> URL: https://issues.apache.org/jira/browse/KAFKA-8424
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers

2019-05-23 Thread Sebastiaan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847211#comment-16847211
 ] 

Sebastiaan commented on KAFKA-8412:
---

[~mjsax] yes we are using EOS.

And yeah you're right, I was surprised it was solved by a null check for the 
other ticket when it also seemed to me the situation should be avoided in the 
first place. But I'll leave that to the actual developers.

> Still a nullpointer exception thrown on shutdown while flushing before 
> closing producers
> 
>
> Key: KAFKA-8412
> URL: https://issues.apache.org/jira/browse/KAFKA-8412
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Sebastiaan
>Priority: Minor
>
> I found a closed issue and replied there but decided to open one myself 
> because although they're related they're slightly different. The original 
> issue is at https://issues.apache.org/jira/browse/KAFKA-7678
> The fix there has been to implement a null check around closing a producer 
> because in some cases the producer is already null there (has been closed 
> already)
> In version 2.1.1 we are getting a very similar exception, but in the 'flush' 
> method that is called pre-close. This is in the log:
> {code:java}
> message: stream-thread 
> [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
> while closing StreamTask 1_26 due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> Followed by:
>  
> {code:java}
> message: task [1_26] Could not close task due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.StreamTask
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> If I look at the source code at this point, I see a nice null check in the 
> close method, but not in the flush method that is called just before that:
> {code:java}
> public void flush() {
>     this.log.debug("Flushing producer");
>     this.producer.flush();
>     this.checkForException();
> }
> public void close() {
>     this.log.debug("Closing producer");
>     if (this.producer != null) {
>     this.producer.close();
>     this.producer = null;
>     }
>     this.checkForException();
> }{code}
> Seems to my (ignorant) eye that the flush method should also be wrapped in a 
> null check in the same way as has been done for close.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics

2019-05-23 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847174#comment-16847174
 ] 

Boyang Chen commented on KAFKA-8342:


[~clearpal7] Let me know if this sounds interesting to you.

> Admin tool to setup Kafka Stream topology (internal) topics
> ---
>
> Key: KAFKA-8342
> URL: https://issues.apache.org/jira/browse/KAFKA-8342
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> We have seen customers who need to deploy their application to production 
> environment but don't have access to create changelog and repartition topics. 
> They need to ask admin team to manually create those topics before proceeding 
> to start the actual stream job. We could add an admin tool to help them go 
> through the process quicker by providing a command that could
>  # Read through current stream topology
>  # Create corresponding topics as needed, even including output topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling

2019-05-23 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847170#comment-16847170
 ] 

Boyang Chen commented on KAFKA-8311:


[~clearpal7] No worry, let me take a look of your fix!

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: WooYoung
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8424) Replace ListGroups request/response with automated protocol

2019-05-23 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8424:
--

 Summary: Replace ListGroups request/response with automated 
protocol
 Key: KAFKA-8424
 URL: https://issues.apache.org/jira/browse/KAFKA-8424
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8423) Update ducktape to not use deprecated APIs

2019-05-23 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-8423:
--

 Summary: Update ducktape to not use deprecated APIs
 Key: KAFKA-8423
 URL: https://issues.apache.org/jira/browse/KAFKA-8423
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Affects Versions: 2.3.0
Reporter: Matthias J. Sax


Running system tests locally, I see the following warnings:


{code:java}
/usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:39: 
CryptographyDeprecationWarning: encode_point has been deprecated on 
EllipticCurvePublicNumbers and will be removed in a future version. Please use 
EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed 
point encoding.
m.add_string(self.Q_C.public_numbers().encode_point())
/usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:94: 
CryptographyDeprecationWarning: Support for unsafe construction of public 
numbers from encoded data will be removed in a future version. Please use 
EllipticCurvePublicKey.from_encoded_point
self.curve, Q_S_bytes
/usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:109: 
CryptographyDeprecationWarning: encode_point has been deprecated on 
EllipticCurvePublicNumbers and will be removed in a future version. Please use 
EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed 
point encoding.
hm.add_string(self.Q_C.public_numbers().encode_point())
{code}
We should update the code to not use deprecated APIs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8404) Authorization header is not passed in Connect when forwarding REST requests

2019-05-23 Thread Robert Yokota (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Yokota updated KAFKA-8404:
-
Affects Version/s: 2.0.0

> Authorization header is not passed in Connect when forwarding REST requests
> ---
>
> Key: KAFKA-8404
> URL: https://issues.apache.org/jira/browse/KAFKA-8404
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Robert Yokota
>Priority: Blocker
> Fix For: 2.3.0
>
>
> When Connect forwards a REST request from one worker to another, the 
> Authorization header is not forwarded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8422) Client should not use old versions of OffsetsForLeaderEpoch

2019-05-23 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-8422:
--

 Summary: Client should not use old versions of 
OffsetsForLeaderEpoch
 Key: KAFKA-8422
 URL: https://issues.apache.org/jira/browse/KAFKA-8422
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 2.3.0


For KIP-320, we changed the permissions of the OffsetsForLeaderEpoch to be 
topic-level so that consumers did not require Cluster permission. However, 
there is no way for a consumer to know whether the broker is new enough to 
support this permission scheme. The only way to be sure is to use the version 
of this API that was bumped in 2.3. For older versions, we should revert to the 
old behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8093) Fix JavaDoc markup

2019-05-23 Thread Patrik Kleindl (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrik Kleindl reassigned KAFKA-8093:
-

Assignee: Matthias J. Sax  (was: Patrik Kleindl)

> Fix JavaDoc markup
> --
>
> Key: KAFKA-8093
> URL: https://issues.apache.org/jira/browse/KAFKA-8093
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Trivial
>
> Running `./gradlew install` gives the following warning
> {code:java}
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87:
>  warning - Tag @link: reference not found: java.nio.channels.Selector
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87:
>  warning - Tag @link: reference not found: 
> java.nio.channels.Selector#wakeup() wakeup()
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java:34:
>  warning - Tag @link: reference not found: 
> org.apache.kafka.clients.producer.ProducerRecord ProducerRecord
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> {code}
> Those should be fixed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8093) Fix JavaDoc markup

2019-05-23 Thread Patrik Kleindl (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrik Kleindl resolved KAFKA-8093.
---
Resolution: Fixed

Fixed by other commits, see PR for discussion.

> Fix JavaDoc markup
> --
>
> Key: KAFKA-8093
> URL: https://issues.apache.org/jira/browse/KAFKA-8093
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Matthias J. Sax
>Assignee: Patrik Kleindl
>Priority: Trivial
>
> Running `./gradlew install` gives the following warning
> {code:java}
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87:
>  warning - Tag @link: reference not found: java.nio.channels.Selector
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87:
>  warning - Tag @link: reference not found: 
> java.nio.channels.Selector#wakeup() wakeup()
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java:34:
>  warning - Tag @link: reference not found: 
> org.apache.kafka.clients.producer.ProducerRecord ProducerRecord
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> {code}
> Those should be fixed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8093) Fix JavaDoc markup

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847073#comment-16847073
 ] 

ASF GitHub Bot commented on KAFKA-8093:
---

pkleindl commented on pull request #6439: KAFKA-8093: fixed some javadoc errors
URL: https://github.com/apache/kafka/pull/6439
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix JavaDoc markup
> --
>
> Key: KAFKA-8093
> URL: https://issues.apache.org/jira/browse/KAFKA-8093
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Matthias J. Sax
>Assignee: Patrik Kleindl
>Priority: Trivial
>
> Running `./gradlew install` gives the following warning
> {code:java}
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87:
>  warning - Tag @link: reference not found: java.nio.channels.Selector
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87:
>  warning - Tag @link: reference not found: 
> java.nio.channels.Selector#wakeup() wakeup()
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java:34:
>  warning - Tag @link: reference not found: 
> org.apache.kafka.clients.producer.ProducerRecord ProducerRecord
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> {code}
> Those should be fixed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8333) Load high watermark checkpoint only once when handling LeaderAndIsr requests

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847071#comment-16847071
 ] 

ASF GitHub Bot commented on KAFKA-8333:
---

hachikuji commented on pull request #6800: KAFKA-8333; Load high watermark 
checkpoint lazily when initializing replicas
URL: https://github.com/apache/kafka/pull/6800
 
 
   Currently we load the high watermark checkpoint separately for every replica 
that we load. This patch makes this loading logic lazy and caches the loaded 
map while a LeaderAndIsr request is being handled.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Load high watermark checkpoint only once when handling LeaderAndIsr requests
> 
>
> Key: KAFKA-8333
> URL: https://issues.apache.org/jira/browse/KAFKA-8333
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Currently we reload the checkpoint file separately for every partition that 
> is first initialized on the broker. It would be more efficient to do this one 
> time only when we receive the LeaderAndIsr request and to reuse the state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-05-23 Thread Srikala (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847056#comment-16847056
 ] 

Srikala commented on KAFKA-7500:


[~ryannedolan],

I configured connect mirrormaker  for replicating topics between two clusters 
on  kafka 1.1.1  from source to target. Can you please provide your input to 
the following.
 # The topics are created with a replication factor 1 in the target cluster 
even though the source cluster has replication factor 4.  The default 
replication factor of the target cluster is 4. Obviously, the topic data is 
failing to replicate with the error: 
org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync 
replicas for partition target.TEST_TOPIC is [1], below required minimum [2]
 
 Replication works fine , if I manually create the topics in the target cluster 
with replication factor 4 before starting the replication.
 # I see the following lines in the log:
 
 [2019-05-23 20:50:39,033] WARN [Producer clientId=producer-9] Error while 
fetching metadata with correlation id 2079 : 
\{source.checkpoints.internal=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient:1023)
 [2019-05-23 20:50:39,039] WARN [Producer clientId=producer-8] Error while 
fetching metadata with correlation id 2175 : 
\{heartbeats=UNKNOWN_TOPIC_OR_PARTITION} 
(org.apache.kafka.clients.NetworkClient:1023)
 
 How are the topics created: source.checkpoints.internal and heartbeats?

      3. Is this the right forum to ask questions during the evaluation?

Thanks!

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ryanne Dolan
>Priority: Minor
> Fix For: 2.4
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8333) Load high watermark checkpoint only once when handling LeaderAndIsr requests

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847047#comment-16847047
 ] 

ASF GitHub Bot commented on KAFKA-8333:
---

hachikuji commented on pull request #6696: KAFKA-8333; Cache checkpointed high 
watermarks for reuse on LeaderAndIsr request
URL: https://github.com/apache/kafka/pull/6696
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Load high watermark checkpoint only once when handling LeaderAndIsr requests
> 
>
> Key: KAFKA-8333
> URL: https://issues.apache.org/jira/browse/KAFKA-8333
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Currently we reload the checkpoint file separately for every partition that 
> is first initialized on the broker. It would be more efficient to do this one 
> time only when we receive the LeaderAndIsr request and to reuse the state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8419) Enable KafkaLog4JAppender to use SASL Authentication Callback Handlers

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847043#comment-16847043
 ] 

ASF GitHub Bot commented on KAFKA-8419:
---

rnpridgeon commented on pull request #6799: KAFKA-8419 Add SASL callback 
handler support to KafkaLog4jAppender
URL: https://github.com/apache/kafka/pull/6799
 
 
   KIP-470
   
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+Enable+KafkaLog4JAppender+to+use+SASL+Authentication+Callback+Handlers
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable KafkaLog4JAppender to use SASL Authentication Callback Handlers
> --
>
> Key: KAFKA-8419
> URL: https://issues.apache.org/jira/browse/KAFKA-8419
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ryan P
>Assignee: Ryan P
>Priority: Major
>
> The log4j Kafka appender supports SASL but lacks support for the callback 
> handlers added with KIP-86. This is Jira was created to request that the sasl 
> callback handler client configuration be exposed in the KafkaLog4jAppender 
> class for use by the underlying producer. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-05-23 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847033#comment-16847033
 ] 

Sophie Blee-Goldman commented on KAFKA-8367:


How many state stores are present in your topology? How many partitions? 

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance

2019-05-23 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8421:


 Summary: Allow consumer.poll() to return data in the middle of 
rebalance
 Key: KAFKA-8421
 URL: https://issues.apache.org/jira/browse/KAFKA-8421
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Guozhang Wang


With KIP-429 in place, today when a consumer is about to send join-group 
request its owned partitions may not be empty, meaning that some of its fetched 
data can still be returned. Nevertheless, today the logic is strict:

{code}
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
{code}

I.e. if the consumer enters a rebalance it always returns no data. 

As an optimization, we can consider letting consumers to still return messages 
that still belong to its owned partitions even when it is within a rebalance, 
because we know it is safe that no one else would claim those partitions in 
this rebalance yet, and we can still commit offsets if, after this rebalance, 
the partitions need to be revoked then.

One thing we need to take care though is the rebalance timeout, i.e. when 
consumer's processing those records they may not call the next poll() in time 
(think: Kafka Streams num.iterations mechanism), which may leads to consumer 
dropping out of the group during rebalance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8371) Remove ReplicaManager dependence from Partition

2019-05-23 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8371.

Resolution: Fixed

> Remove ReplicaManager dependence from Partition
> ---
>
> Key: KAFKA-8371
> URL: https://issues.apache.org/jira/browse/KAFKA-8371
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: tech-debt
>
> The dependence on ReplicaManager from the Partition logic makes testing this 
> class very cumbersome. Often we are just using ReplicaManager as a way to get 
> access to an additional dependency. We should make the actual dependencies 
> explicit and we should introduce smaller traits which encapsulate the state 
> we actually need.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8371) Remove ReplicaManager dependence from Partition

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847026#comment-16847026
 ] 

ASF GitHub Bot commented on KAFKA-8371:
---

hachikuji commented on pull request #6705: KAFKA-8371: Remove dependence on 
ReplicaManager from Partition
URL: https://github.com/apache/kafka/pull/6705
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove ReplicaManager dependence from Partition
> ---
>
> Key: KAFKA-8371
> URL: https://issues.apache.org/jira/browse/KAFKA-8371
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: tech-debt
>
> The dependence on ReplicaManager from the Partition logic makes testing this 
> class very cumbersome. Often we are just using ReplicaManager as a way to get 
> access to an additional dependency. We should make the actual dependencies 
> explicit and we should introduce smaller traits which encapsulate the state 
> we actually need.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8420) Graceful handling when consumer switches from subscribe to manual assign

2019-05-23 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8420:


 Summary: Graceful handling when consumer switches from subscribe 
to manual assign
 Key: KAFKA-8420
 URL: https://issues.apache.org/jira/browse/KAFKA-8420
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Guozhang Wang


Today if a consumer switches between subscribe (and hence relies on group 
rebalance to get assignment) and manual assign, it may cause unnecessary 
rebalances. For example:

1. consumer.subscribe();
2. consumer.poll(); // join-group request sent, returns empty because poll 
timeout
3. consumer.unsubscribe();
4. consumer.assign(..);
5. consumer.poll(); // sync-group request received, and the assigned 
partitions does not match the current subscription-state. In this case it will 
tries to re-join which is not necessary.

In the worst case (i.e. leader keep sending incompatible assignment), this 
would case the consumer to fall into endless re-joins.

Although it is not a very common usage scenario, it still worth being better 
handled than the status-quo.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8419) Enable KafkaLog4JAppender to use SASL Authentication Callback Handlers

2019-05-23 Thread Ryan P (JIRA)
Ryan P created KAFKA-8419:
-

 Summary: Enable KafkaLog4JAppender to use SASL Authentication 
Callback Handlers
 Key: KAFKA-8419
 URL: https://issues.apache.org/jira/browse/KAFKA-8419
 Project: Kafka
  Issue Type: Improvement
Reporter: Ryan P
Assignee: Ryan P


The log4j Kafka appender supports SASL but lacks support for the callback 
handlers added with KIP-86. This is Jira was created to request that the sasl 
callback handler client configuration be exposed in the KafkaLog4jAppender 
class for use by the underlying producer. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8407) Connector client overrides broken on client configs with type 'Class' or 'List'

2019-05-23 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8407.
--
   Resolution: Fixed
 Reviewer: Randall Hauch
Fix Version/s: 2.3.0

Merged onto the `trunk` and `2.3` branches.

> Connector client overrides broken on client configs with type 'Class' or 
> 'List'
> ---
>
> Key: KAFKA-8407
> URL: https://issues.apache.org/jira/browse/KAFKA-8407
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
>  Labels: connect
> Fix For: 2.3.0
>
>
> When a connector request is submitted that overrides a client configuration 
> that is meant to contain the name of a class (such as 
> {{sasl.login.callback.handler.class}}), a 500 response is generated and the 
> following stack trace can be found in the logs for Connect:
>  
> {quote}[2019-05-22 14:51:36,123] ERROR Uncaught exception in REST call to 
> /connectors 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Class
> at 
> org.apache.kafka.common.config.ConfigDef.convertToString(ConfigDef.java:774)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.convertConfigValue(AbstractHerder.java:491)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:426)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:565)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:562)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:292)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:241)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {quote}
> This appears to be limited only to client configs that are meant to be 
> classes or lists due to the fact that {{ConfigDef.convertToString(...)}} 
> assumes its first argument is an instance of {{Class}} when its second 
> argument is {{ConfigDef.Type.CLASS}} and then casts accordingly, and acts 
> similarly for lists. If the second argument is anything else, {{toString()}} 
> is invoked on it without any casting, avoiding any problems.
>  
> The cause of this is due to the fact that the newly-introduced 
> {{ConnectorClientConfigOverridePolicy}} interface returns a list of 
> {{ConfigValue}} instances for its validation. The {{value()}} for each of 
> these can be any type, although with the default implementations available 
> ({{All}}, {{None}}, {{Principal}}) if one is returned at all it's just the 
> same type of what was passed in for that particular config. In the case of 
> the {{AbstractHerder.validateClientOverrides(...)}} method, the raw strings 
> for the client configs are used. However, the 
> {{AbstractHerder.convertConfigValue(...)}} is then called for those raw 
> strings but with the {{ConfigDef.Type}} of the config based on the relevant 
> client {{ConfigDef}} (i.e., {{ProducerConfig.configDef()}}, 
> {{ConsumerConfig.configDef()}}, or {{AdminClientConfig.configDef()}}). This 
> in turn can and will result in 
> {{ConfigDef.convertToString(someClassNameAsAString, ConfigDef.Type.CLASS)}} 
> being invoked.
>  
> Although this isn't technically a comprehensive fix, a quick option would be 
> to invoke {{ConfigDef.parse(...)}} using the relevant client {{ConfigDef}} 
> before passing overrides to the policy. Technically, this would still lead to 
> problems if the policy decided to return just the name of a class for a 
> config that of type class instead, so we may want to investigate other 
> options as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8407) Connector client overrides broken on client configs with type 'Class' or 'List'

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846991#comment-16846991
 ] 

ASF GitHub Bot commented on KAFKA-8407:
---

rhauch commented on pull request #6789: KAFKA-8407: Fix validation of class and 
list configs in connector client overrides
URL: https://github.com/apache/kafka/pull/6789
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connector client overrides broken on client configs with type 'Class' or 
> 'List'
> ---
>
> Key: KAFKA-8407
> URL: https://issues.apache.org/jira/browse/KAFKA-8407
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
>  Labels: connect
>
> When a connector request is submitted that overrides a client configuration 
> that is meant to contain the name of a class (such as 
> {{sasl.login.callback.handler.class}}), a 500 response is generated and the 
> following stack trace can be found in the logs for Connect:
>  
> {quote}[2019-05-22 14:51:36,123] ERROR Uncaught exception in REST call to 
> /connectors 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Class
> at 
> org.apache.kafka.common.config.ConfigDef.convertToString(ConfigDef.java:774)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.convertConfigValue(AbstractHerder.java:491)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:426)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:565)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:562)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:292)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:241)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {quote}
> This appears to be limited only to client configs that are meant to be 
> classes or lists due to the fact that {{ConfigDef.convertToString(...)}} 
> assumes its first argument is an instance of {{Class}} when its second 
> argument is {{ConfigDef.Type.CLASS}} and then casts accordingly, and acts 
> similarly for lists. If the second argument is anything else, {{toString()}} 
> is invoked on it without any casting, avoiding any problems.
>  
> The cause of this is due to the fact that the newly-introduced 
> {{ConnectorClientConfigOverridePolicy}} interface returns a list of 
> {{ConfigValue}} instances for its validation. The {{value()}} for each of 
> these can be any type, although with the default implementations available 
> ({{All}}, {{None}}, {{Principal}}) if one is returned at all it's just the 
> same type of what was passed in for that particular config. In the case of 
> the {{AbstractHerder.validateClientOverrides(...)}} method, the raw strings 
> for the client configs are used. However, the 
> {{AbstractHerder.convertConfigValue(...)}} is then called for those raw 
> strings but with the {{ConfigDef.Type}} of the config based on the relevant 
> client {{ConfigDef}} (i.e., {{ProducerConfig.configDef()}}, 
> {{ConsumerConfig.configDef()}}, or {{AdminClientConfig.configDef()}}). This 
> in turn can and will result in 
> {{ConfigDef.convertToString(someClassNameAsAString, ConfigDef.Type.CLASS)}} 
> being invoked.
>  
> Although this isn't technically a comprehensive fix, a quick option would be 
> to invoke {{ConfigDef.parse(...)}} using the relevant client {{ConfigDef}} 
> before passing overrides to the policy. Technically, this would still lead to 
> problems if the policy decided to return just the name of a class for a 
> config that of type class instead, so we may want to investigate other 
> options as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8415) Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation

2019-05-23 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8415.
--
Resolution: Fixed
  Reviewer: Randall Hauch

Merged onto the `trunk` and `2.3` branches.

> Interface ConnectorClientConfigOverridePolicy needs to be excluded from class 
> loading isolation
> ---
>
> Key: KAFKA-8415
> URL: https://issues.apache.org/jira/browse/KAFKA-8415
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 2.3.0
>
>
> Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} 
> were recently added in Connect as plugins that can be loaded in class loading 
> isolation. 
> However the interface itself was not excluded from isolation itself, which 
> results into definition conflicts. Any interface that is considered a base 
> Connect plugin interface needs to be excluded by isolation itself (it's 
> considered a "system" type). 
> Here's the exception: 
> {code:java}
> [2019-05-23 15:16:57,802] ERROR Stopping due to error 
> (org.apache.kafka.connect.cli.ConnectDistributed:84)
> java.util.ServiceConfigurationError: 
> org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy:
>  Provider 
> org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
>  not a subtype
> at java.util.ServiceLoader.fail(ServiceLoader.java:239)
> at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182)
> at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
> at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
> at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered

2019-05-23 Thread Oleksandr Diachenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Diachenko updated KAFKA-8418:
---
Description: 
I am getting an error while running Kafka tests:

{code}
Traceback (most recent call last): File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
 line 132, in run data = self.run_test() File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
 line 189, in run_test return self.test_context.function(self.test) File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py",
 line 89, in test_rest_api assert set([connector_plugin['class'] for 
connector_plugin in self.cc.list_connector_plugins()]) == 
\{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
 line 218, in list_connector_plugins return self._rest('/connector-plugins/', 
node=node) File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
 line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, 
resp.url) ConnectRestError
{code}

>From the logs, I see two messages:
{code}
[2019-05-23 16:09:59,373] INFO REST server listening at 
http://172.31.39.205:8083/, advertising URL http://worker1:8083/ 
(org.apache.kafka.connect.runtime.rest.RestServer)
{code}
and {code}
[2019-05-23 16:10:00,738] INFO REST resources initialized; server is started 
and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer)
{code}

 it takes 1365 ms to actually load REST resources, but the test is waiting on a 
port to be listening. 

  was:
I am getting an error while running Kafka tests:

{code}

Traceback (most recent call last): File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
 line 132, in run data = self.run_test() File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
 line 189, in run_test return self.test_context.function(self.test) File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py",
 line 89, in test_rest_api assert set([connector_plugin['class'] for 
connector_plugin in self.cc.list_connector_plugins()]) == 
\{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
 line 218, in list_connector_plugins return self._rest('/connector-plugins/', 
node=node) File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
 line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, 
resp.url) ConnectRestError

{code}


> Connect System tests are not waiting for REST resources to be registered
> 
>
> Key: KAFKA-8418
> URL: https://issues.apache.org/jira/browse/KAFKA-8418
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.0
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
> Fix For: 2.3.0
>
>
> I am getting an error while running Kafka tests:
> {code}
> Traceback (most recent call last): File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
>  line 132, in run data = self.run_test() File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
>  line 189, in run_test return self.test_context.function(self.test) File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py",
>  line 89, in test_rest_api assert set([connector_plugin['class'] for 
> connector_plugin in self.cc.list_connector_plugins()]) == 
> \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
>  line 218, in list_connector_plugins return self._rest('/connector-plugins/', 
> node=node) File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
>  line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, 
> resp.url) ConnectRestError
> {code}
> From the logs, I see two messages:
> {code}
> [2

[jira] [Commented] (KAFKA-8415) Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846985#comment-16846985
 ] 

ASF GitHub Bot commented on KAFKA-8415:
---

rhauch commented on pull request #6796: KAFKA-8415: Interface 
ConnectorClientConfigOverridePolicy needs to be excluded from class loading 
isolation
URL: https://github.com/apache/kafka/pull/6796
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Interface ConnectorClientConfigOverridePolicy needs to be excluded from class 
> loading isolation
> ---
>
> Key: KAFKA-8415
> URL: https://issues.apache.org/jira/browse/KAFKA-8415
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 2.3.0
>
>
> Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} 
> were recently added in Connect as plugins that can be loaded in class loading 
> isolation. 
> However the interface itself was not excluded from isolation itself, which 
> results into definition conflicts. Any interface that is considered a base 
> Connect plugin interface needs to be excluded by isolation itself (it's 
> considered a "system" type). 
> Here's the exception: 
> {code:java}
> [2019-05-23 15:16:57,802] ERROR Stopping due to error 
> (org.apache.kafka.connect.cli.ConnectDistributed:84)
> java.util.ServiceConfigurationError: 
> org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy:
>  Provider 
> org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
>  not a subtype
> at java.util.ServiceLoader.fail(ServiceLoader.java:239)
> at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182)
> at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
> at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
> at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered

2019-05-23 Thread Oleksandr Diachenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Diachenko updated KAFKA-8418:
---
Description: 
I am getting an error while running Kafka tests:

{code}

Traceback (most recent call last): File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
 line 132, in run data = self.run_test() File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
 line 189, in run_test return self.test_context.function(self.test) File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py",
 line 89, in test_rest_api assert set([connector_plugin['class'] for 
connector_plugin in self.cc.list_connector_plugins()]) == 
\{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
 line 218, in list_connector_plugins return self._rest('/connector-plugins/', 
node=node) File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
 line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, 
resp.url) ConnectRestError

{code}

> Connect System tests are not waiting for REST resources to be registered
> 
>
> Key: KAFKA-8418
> URL: https://issues.apache.org/jira/browse/KAFKA-8418
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.0
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
> Fix For: 2.3.0
>
>
> I am getting an error while running Kafka tests:
> {code}
> Traceback (most recent call last): File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
>  line 132, in run data = self.run_test() File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
>  line 189, in run_test return self.test_context.function(self.test) File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/connect/connect_rest_test.py",
>  line 89, in test_rest_api assert set([connector_plugin['class'] for 
> connector_plugin in self.cc.list_connector_plugins()]) == 
> \{self.FILE_SOURCE_CONNECTOR, self.FILE_SINK_CONNECTOR} File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
>  line 218, in list_connector_plugins return self._rest('/connector-plugins/', 
> node=node) File 
> "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/services/connect.py",
>  line 234, in _rest raise ConnectRestError(resp.status_code, resp.text, 
> resp.url) ConnectRestError
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered

2019-05-23 Thread Oleksandr Diachenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Diachenko updated KAFKA-8418:
---
Fix Version/s: 2.3.0

> Connect System tests are not waiting for REST resources to be registered
> 
>
> Key: KAFKA-8418
> URL: https://issues.apache.org/jira/browse/KAFKA-8418
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.0
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
> Fix For: 2.3.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered

2019-05-23 Thread Oleksandr Diachenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Diachenko updated KAFKA-8418:
---
Affects Version/s: 2.2.0

> Connect System tests are not waiting for REST resources to be registered
> 
>
> Key: KAFKA-8418
> URL: https://issues.apache.org/jira/browse/KAFKA-8418
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.0
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered

2019-05-23 Thread Oleksandr Diachenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Diachenko updated KAFKA-8418:
---
Component/s: KafkaConnect

> Connect System tests are not waiting for REST resources to be registered
> 
>
> Key: KAFKA-8418
> URL: https://issues.apache.org/jira/browse/KAFKA-8418
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8418) Connect System tests are not waiting for REST resources to be registered

2019-05-23 Thread Oleksandr Diachenko (JIRA)
Oleksandr Diachenko created KAFKA-8418:
--

 Summary: Connect System tests are not waiting for REST resources 
to be registered
 Key: KAFKA-8418
 URL: https://issues.apache.org/jira/browse/KAFKA-8418
 Project: Kafka
  Issue Type: Bug
Reporter: Oleksandr Diachenko
Assignee: Oleksandr Diachenko






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8309) KIP-465: Add Consolidated Connector Endpoint to Connect REST API

2019-05-23 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8309.
--
   Resolution: Fixed
 Assignee: dan norwood
 Reviewer: Randall Hauch
Fix Version/s: 2.3.0

Not sure why this was not closed when the PR was merged, but it's resolved now.

> KIP-465: Add Consolidated Connector Endpoint to Connect REST API
> 
>
> Key: KAFKA-8309
> URL: https://issues.apache.org/jira/browse/KAFKA-8309
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dan norwood
>Assignee: dan norwood
>Priority: Major
> Fix For: 2.3.0
>
>
> {color:#33}https://cwiki.apache.org/confluence/display/KAFKA/KIP-465%3A+Add+Consolidated+Connector+Endpoint+to+Connect+REST+API{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8417) Remove redundant network definition --net=host when starting testing docker containers

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846953#comment-16846953
 ] 

ASF GitHub Bot commented on KAFKA-8417:
---

cmccabe commented on pull request #6797: KAFKA-8417: Remove redundant network 
definition --net=host when starting testing docker containers
URL: https://github.com/apache/kafka/pull/6797
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove redundant network definition --net=host when starting testing docker 
> containers
> --
>
> Key: KAFKA-8417
> URL: https://issues.apache.org/jira/browse/KAFKA-8417
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
>
> The switches {{--net}} and {{--network}} are equivalent in docker, with the 
> latter being preferred. 
> (see 
> [https://github.com/docker/cli/blob/master/cli/command/container/opts.go] 
> where currently there's the comment: _// We allow for both "--net" and 
> "--network", although the latter is the recommended way._)
> However, in recent Docker versions, defining both as follows: 
> {{--net=host --network ducknet}}
> fails with error: 
> {{docker: conflicting options: cannot attach both user-defined and 
> non-user-defined network-modes.}}
> Removing {{--net=host}} and keeping only the user-defined network should fix 
> the issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8417) Remove redundant network definition --net=host when starting testing docker containers

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846948#comment-16846948
 ] 

ASF GitHub Bot commented on KAFKA-8417:
---

kkonstantine commented on pull request #6797: KAFKA-8417: Remove redundant 
network definition --net=host when starting testing docker containers
URL: https://github.com/apache/kafka/pull/6797
 
 
   * Remove non-user-defined network --net=host which is redundant when 
starting system test docker containers
   
   * Tested by running a round of system tests locally and on jenkins
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove redundant network definition --net=host when starting testing docker 
> containers
> --
>
> Key: KAFKA-8417
> URL: https://issues.apache.org/jira/browse/KAFKA-8417
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
>
> The switches {{--net}} and {{--network}} are equivalent in docker, with the 
> latter being preferred. 
> (see 
> [https://github.com/docker/cli/blob/master/cli/command/container/opts.go] 
> where currently there's the comment: _// We allow for both "--net" and 
> "--network", although the latter is the recommended way._)
> However, in recent Docker versions, defining both as follows: 
> {{--net=host --network ducknet}}
> fails with error: 
> {{docker: conflicting options: cannot attach both user-defined and 
> non-user-defined network-modes.}}
> Removing {{--net=host}} and keeping only the user-defined network should fix 
> the issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8417) Remove redundant network definition --net=host when starting testing docker containers

2019-05-23 Thread Konstantine Karantasis (JIRA)
Konstantine Karantasis created KAFKA-8417:
-

 Summary: Remove redundant network definition --net=host when 
starting testing docker containers
 Key: KAFKA-8417
 URL: https://issues.apache.org/jira/browse/KAFKA-8417
 Project: Kafka
  Issue Type: Bug
  Components: system tests
Reporter: Konstantine Karantasis
Assignee: Konstantine Karantasis


The switches {{--net}} and {{--network}} are equivalent in docker, with the 
latter being preferred. 

(see [https://github.com/docker/cli/blob/master/cli/command/container/opts.go] 
where currently there's the comment: _// We allow for both "--net" and 
"--network", although the latter is the recommended way._)

However, in recent Docker versions, defining both as follows: 
{{--net=host --network ducknet}}

fails with error: 

{{docker: conflicting options: cannot attach both user-defined and 
non-user-defined network-modes.}}

Removing {{--net=host}} and keeping only the user-defined network should fix 
the issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-8413.

Resolution: Not A Problem

> Add possibility to do repartitioning on KStream
> ---
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>.selectKey((key, value) -> value);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
> This code will generate following topology:
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>  --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-FILTER-04 (stores: [])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-FILTER-08 (stores: [])
>  --> KSTREAM-SINK-07
>  <-- KSTREAM-KEY-SELECT-01
>  Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-04
>  Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-08
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-02
>  Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-05
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-06
>  Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-09
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
>  
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic")
>.selectKey((key, value) -> value)
>.through("repartition-topic");
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
>  
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
> --> KSTREAM-SINK-02
> <-- KSTREAM-SOURCE-00
> Sink: KSTREAM-SINK-02 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-01
> Sub-topology: 1
> Source: KSTREAM-SOURCE-03 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05
> Processor: KSTREAM-AGGREGATE-04 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-03
> Processor: KSTREAM-AGGREGATE-05 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-03
> {code}
>   
> While this gives possibility to optimizes Kafka Streams application, user 
> still has to manually create repartition topic with correct number of 
> partitions based on input topic. It would be great if in DSL we could have 
> something like *repartition()* operation on *KStream* which can generate 
> repartition topic based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846927#comment-16846927
 ] 

Bill Bejeck commented on KAFKA-8413:


Hi [~lkokhreidze],

No problem at all!  It's a subtle point and we could probably do a better job 
of making sure of that step isn't overlooked. I've created 
https://issues.apache.org/jira/browse/KAFKA-8416 to help improve the 
documentation for enabling optimizations.

I'll go ahead and close this ticket then.

 

> Add possibility to do repartitioning on KStream
> ---
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>.selectKey((key, value) -> value);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
> This code will generate following topology:
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>  --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-FILTER-04 (stores: [])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-FILTER-08 (stores: [])
>  --> KSTREAM-SINK-07
>  <-- KSTREAM-KEY-SELECT-01
>  Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-04
>  Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-08
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-02
>  Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-05
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-06
>  Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-09
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
>  
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic")
>.selectKey((key, value) -> value)
>.through("repartition-topic");
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
>  
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
> --> KSTREAM-SINK-02
> <-- KSTREAM-SOURCE-00
> Sink: KSTREAM-SINK-02 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-01
> Sub-topology: 1
> Source: KSTREAM-SOURCE-03 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05
> Processor: KSTREAM-AGGREGATE-04 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-03
> Processor: KSTREAM-AGGREGATE-05 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-03
> {code}
>   
> While this gives possibility to optimizes Kafka Streams application, user 
> still has to manually create repartition topic with correct number of 
> partitions based on input topic. It would be great if in DSL we could have 
> something like *repartition()* operation on *KStream* which can generate 
> repartition topic based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8416) Improve Documentation for Enabling Optimizations

2019-05-23 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-8416:
---
Labels: newbie,  (was: )

> Improve Documentation for Enabling Optimizations
> 
>
> Key: KAFKA-8416
> URL: https://issues.apache.org/jira/browse/KAFKA-8416
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: newbie,
>
> To enable optimizations, users need to set the 
> {{StreamsConfig.TOPOLOGY_OPTIMIZATION}} setting to "all".  But in addition to 
> setting the config users need to pass in the {{Properties}} object to the 
> {{StreamBuilder#build()}} method as well.
>  
> We should make a pass over the existing documentation and Javadoc to make 
> sure this required step is clear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8416) Improve Documentation for Enabling Optimizations

2019-05-23 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-8416:
--

 Summary: Improve Documentation for Enabling Optimizations
 Key: KAFKA-8416
 URL: https://issues.apache.org/jira/browse/KAFKA-8416
 Project: Kafka
  Issue Type: Improvement
Reporter: Bill Bejeck


To enable optimizations, users need to set the 
{{StreamsConfig.TOPOLOGY_OPTIMIZATION}} setting to "all".  But in addition to 
setting the config users need to pass in the {{Properties}} object to the 
{{StreamBuilder#build()}} method as well.

 

We should make a pass over the existing documentation and Javadoc to make sure 
this required step is clear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8416) Improve Documentation for Enabling Optimizations

2019-05-23 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-8416:
---
Component/s: streams

> Improve Documentation for Enabling Optimizations
> 
>
> Key: KAFKA-8416
> URL: https://issues.apache.org/jira/browse/KAFKA-8416
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Priority: Major
>
> To enable optimizations, users need to set the 
> {{StreamsConfig.TOPOLOGY_OPTIMIZATION}} setting to "all".  But in addition to 
> setting the config users need to pass in the {{Properties}} object to the 
> {{StreamBuilder#build()}} method as well.
>  
> We should make a pass over the existing documentation and Javadoc to make 
> sure this required step is clear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Levani Kokhreidze (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846916#comment-16846916
 ] 

Levani Kokhreidze commented on KAFKA-8413:
--

Hi [~bbejeck],

I wasn't passing properties to StreamBuilder before, missed that part. Can 
confirm, after applying your suggestion there's only one reparation topic. 
Sorry about the confusion.

 

> Add possibility to do repartitioning on KStream
> ---
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>.selectKey((key, value) -> value);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
> This code will generate following topology:
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>  --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-FILTER-04 (stores: [])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-FILTER-08 (stores: [])
>  --> KSTREAM-SINK-07
>  <-- KSTREAM-KEY-SELECT-01
>  Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-04
>  Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-08
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-02
>  Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-05
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-06
>  Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-09
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
>  
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic")
>.selectKey((key, value) -> value)
>.through("repartition-topic");
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
>  
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
> --> KSTREAM-SINK-02
> <-- KSTREAM-SOURCE-00
> Sink: KSTREAM-SINK-02 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-01
> Sub-topology: 1
> Source: KSTREAM-SOURCE-03 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05
> Processor: KSTREAM-AGGREGATE-04 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-03
> Processor: KSTREAM-AGGREGATE-05 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-03
> {code}
>   
> While this gives possibility to optimizes Kafka Streams application, user 
> still has to manually create repartition topic with correct number of 
> partitions based on input topic. It would be great if in DSL we could have 
> something like *repartition()* operation on *KStream* which can generate 
> repartition topic based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846905#comment-16846905
 ] 

Bill Bejeck commented on KAFKA-8413:


Hi, [~lkokhreidze] thanks for reporting this.

 

When you build the topology can you confirm for me that you are calling 
{{StreamBuilder#build(properties)}}? To optimize the topology you need to pass 
in the properties to the {{StreamBuilder}} as well as set it in the 
configuration.

 

Thanks,

Bill

> Add possibility to do repartitioning on KStream
> ---
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>.selectKey((key, value) -> value);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
> This code will generate following topology:
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>  --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-FILTER-04 (stores: [])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-FILTER-08 (stores: [])
>  --> KSTREAM-SINK-07
>  <-- KSTREAM-KEY-SELECT-01
>  Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-04
>  Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-08
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-02
>  Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-05
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-06
>  Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-09
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
>  
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic")
>.selectKey((key, value) -> value)
>.through("repartition-topic");
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
>  
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
> --> KSTREAM-SINK-02
> <-- KSTREAM-SOURCE-00
> Sink: KSTREAM-SINK-02 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-01
> Sub-topology: 1
> Source: KSTREAM-SOURCE-03 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05
> Processor: KSTREAM-AGGREGATE-04 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-03
> Processor: KSTREAM-AGGREGATE-05 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-03
> {code}
>   
> While this gives possibility to optimizes Kafka Streams application, user 
> still has to manually create repartition topic with correct number of 
> partitions based on input topic. It would be great if in DSL we could have 
> something like *repartition()* operation on *KStream* which can generate 
> repartition topic based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8341) AdminClient should retry coordinator lookup after NOT_COORDINATOR error

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846904#comment-16846904
 ] 

ASF GitHub Bot commented on KAFKA-8341:
---

soondenana commented on pull request #6723: KAFKA-8341. Retry Consumer group 
operation for NOT_COORDINATOR error
URL: https://github.com/apache/kafka/pull/6723
 
 
   An api call for consumer groups is made up of two calls:
   1. Find the consumer group coordinator
   2. Send the request to the node found in step 1
   
   But the coordinator can get moved between step 1 and 2. In that case we
   currently fail. This change fixes that by detecting this error and then
   retrying.
   
   Following APIs are impacted by this behavior:
   1. listConsumerGroupOffsets
   2. deleteConsumerGroups
   3. describeConsumerGroups
   
   Each of these call result in AdminClient making multiple calls to the 
backend.
   As AdminClient code invokes each backend api in a separate event loop, the 
code
   that detects the error (step 2) need to restart whole operation including
   step 1. This needed a change to capture the "Call" object for step 1 in
   step 2.
   
   This change thus refactors the code to make it easy to perform a retry of
   whole operation. It creates a Context object to capture the api arguments
   that can then be referred by each "Call" objects. This is just for 
convenience
   and makes method signature simpler as we only need to pass one object instead
   of multiple api arguments.
   
   The creation of each "Call" object is done in a new method, so we can
   easily resubmit step 1 in step 2.
   
   This change also modifies corresponding unit test to test this scenario.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AdminClient should retry coordinator lookup after NOT_COORDINATOR error
> ---
>
> Key: KAFKA-8341
> URL: https://issues.apache.org/jira/browse/KAFKA-8341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Vikas Singh
>Priority: Major
>
> If a group operation (e.g. DescribeGroup) fails because the coordinator has 
> moved, the AdminClient should lookup the coordinator before retrying the 
> operation. Currently we will either fail or just retry anyway. This is 
> similar in some ways to controller rediscovery after getting NOT_CONTROLLER 
> errors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8341) AdminClient should retry coordinator lookup after NOT_COORDINATOR error

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846900#comment-16846900
 ] 

ASF GitHub Bot commented on KAFKA-8341:
---

soondenana commented on pull request #6723: KAFKA-8341. Retry Consumer group 
operation for NOT_COORDINATOR error
URL: https://github.com/apache/kafka/pull/6723
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AdminClient should retry coordinator lookup after NOT_COORDINATOR error
> ---
>
> Key: KAFKA-8341
> URL: https://issues.apache.org/jira/browse/KAFKA-8341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Vikas Singh
>Priority: Major
>
> If a group operation (e.g. DescribeGroup) fails because the coordinator has 
> moved, the AdminClient should lookup the coordinator before retrying the 
> operation. Currently we will either fail or just retry anyway. This is 
> similar in some ways to controller rediscovery after getting NOT_CONTROLLER 
> errors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846857#comment-16846857
 ] 

Matthias J. Sax commented on KAFKA-7994:


I think it might actually be worth to split the two issues into two tickets. 
One ticket to just preserve partition time over restarts (the original issue of 
this ticket), and do a new ticket for global stream time. There are still many 
open question what global stream time actually means and it will be a difficult 
and long design phase until we can merge any code. Hence, I would like to 
unblock the original issue of this ticket.

Thoughts?

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers

2019-05-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846851#comment-16846851
 ] 

Matthias J. Sax commented on KAFKA-8412:


Do you run with EOS enabled?

Also, closing() and flushing() is a little different... I am not saying there 
is no issue, I just try to figure out what the correct fix is. Just adding a 
`null`-check could actually just mask the root cause of the bug, but not fix 
the bug itself.

> Still a nullpointer exception thrown on shutdown while flushing before 
> closing producers
> 
>
> Key: KAFKA-8412
> URL: https://issues.apache.org/jira/browse/KAFKA-8412
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Sebastiaan
>Priority: Minor
>
> I found a closed issue and replied there but decided to open one myself 
> because although they're related they're slightly different. The original 
> issue is at https://issues.apache.org/jira/browse/KAFKA-7678
> The fix there has been to implement a null check around closing a producer 
> because in some cases the producer is already null there (has been closed 
> already)
> In version 2.1.1 we are getting a very similar exception, but in the 'flush' 
> method that is called pre-close. This is in the log:
> {code:java}
> message: stream-thread 
> [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
> while closing StreamTask 1_26 due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> Followed by:
>  
> {code:java}
> message: task [1_26] Could not close task due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.StreamTask
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> If I look at the source code at this point, I see a nice null check in the 
> close method, but not in the flush method that is called just before that:
> {code:java}
> public void flush() {
>     this.log.debug("Flushing producer");
>     this.producer.flush();
>     this.checkForException();
> }
> public void close() {
>     this.log.debug("Closing producer");
>     if (this.producer != null) {
>     this.producer.close();
>     this.producer = null;
>     }
>     this.checkForException();
> }{code}
> Seems to my (ignorant) eye that the flush method should also be wrapped in a 
> null check in the same way as has been done for close.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Levani Kokhreidze (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846847#comment-16846847
 ] 

Levani Kokhreidze commented on KAFKA-8413:
--

Thanks. I'll look into it

> Add possibility to do repartitioning on KStream
> ---
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>.selectKey((key, value) -> value);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
> This code will generate following topology:
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>  --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-FILTER-04 (stores: [])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-FILTER-08 (stores: [])
>  --> KSTREAM-SINK-07
>  <-- KSTREAM-KEY-SELECT-01
>  Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-04
>  Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-08
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-02
>  Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-05
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-06
>  Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-09
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
>  
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic")
>.selectKey((key, value) -> value)
>.through("repartition-topic");
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
>  
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
> --> KSTREAM-SINK-02
> <-- KSTREAM-SOURCE-00
> Sink: KSTREAM-SINK-02 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-01
> Sub-topology: 1
> Source: KSTREAM-SOURCE-03 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05
> Processor: KSTREAM-AGGREGATE-04 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-03
> Processor: KSTREAM-AGGREGATE-05 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-03
> {code}
>   
> While this gives possibility to optimizes Kafka Streams application, user 
> still has to manually create repartition topic with correct number of 
> partitions based on input topic. It would be great if in DSL we could have 
> something like *repartition()* operation on *KStream* which can generate 
> repartition topic based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846844#comment-16846844
 ] 

Matthias J. Sax commented on KAFKA-8413:


{quote}I've enabled topology optimization, but in this particular example, 
there're still 2 repartition topics created (Kafka Streams version 2.2.0).
{quote}
\cc [~bbejeck] – Can you look into this? This would be a bug.

About the other request: I agree that this might be helpful, and in fact there 
is a similar ticket, including a KIP draft for this:
 * https://issues.apache.org/jira/browse/KAFKA-6037
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint]

The KIP is inactive, to feel free to pick it up.

I would not add `repartition()` operation though, but stick with `through()` 
and make the topic-name optional to let KS manage the topic.

 

> Add possibility to do repartitioning on KStream
> ---
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>.selectKey((key, value) -> value);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
> This code will generate following topology:
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>  --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-FILTER-04 (stores: [])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-FILTER-08 (stores: [])
>  --> KSTREAM-SINK-07
>  <-- KSTREAM-KEY-SELECT-01
>  Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-04
>  Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-08
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-02
>  Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-05
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-06
>  Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-09
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
>  
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic")
>.selectKey((key, value) -> value)
>.through("repartition-topic");
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
>  
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
> --> KSTREAM-SINK-02
> <-- KSTREAM-SOURCE-00
> Sink: KSTREAM-SINK-02 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-01
> Sub-topology: 1
> Source: KSTREAM-SOURCE-03 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05
> Processor: KSTREAM-AGGREGATE-04 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-03
> Processor: KSTREAM-AGGREGATE-05 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-03
> {code}
>   
> While this gives possibility to optimizes Kafka Streams application, user 
> still has to manually create repartition topic with correct number of 
> partitions based on input topic. It would be great if in DSL we could have 
>

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-05-23 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846843#comment-16846843
 ] 

Ted Yu commented on KAFKA-5998:
---

Can you move state directory outside of /tmp which is subject to cleaning by 
the OS ?

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.S

[jira] [Comment Edited] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Levani Kokhreidze (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846838#comment-16846838
 ] 

Levani Kokhreidze edited comment on KAFKA-8413 at 5/23/19 4:28 PM:
---

Hi Matthias,

I've enabled topology optimization, but in this particular example, there're 
still 2 repartition topics created (Kafka Streams version 2.2.0).
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");{code}
Topics created:
{code:java}
867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-changelog
867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-repartition
867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-changelog
867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-repartition {code}
 

Actually, one other thing why introducing additional manual repartitoin may be 
valuable - correct me if I'm wrong, but Kafka Streams will try to optimize when 
key operation is followed by stateful operation, like 
*groupByKey().aggregate(...)* but there's may be the case, that in DSL user may 
be using stateful *transform(...)* operation for aggregation. Consider 
following example: 
{code:java}
final KStream streamByProfileId = streamsBuilder
 .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String()))
 .selectKey((key, value) -> value);

streamByProfileId.transform(...) // stateful tranformer with aggregation
streamByProfileId.transform(...) // stateful transformer with aggregation
{code}
 In this example there's no repartition topic created, one the other hand if 
had something like `repartition()` operation on KStream we could write 
something like this, which would be pretty cool imho:
{code:java}
final KStream streamByProfileId = streamsBuilder
 .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String()))
 .repartitionBy((key, value) -> new_key);

streamByProfileId.transform(...) // stateful transformer with aggregation
streamByProfileId.transform(...) // stateful transformer with aggregation{code}
 


was (Author: lkokhreidze):
Hi Matthias,

I've enabled topology optimization, but in this particular example, there're 
still 2 repartition topics created.

 
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");{code}
Kafka Streams version 2.2.0

 

Topics created:

 
{code:java}
867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-changelog
867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-repartition
867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-changelog
867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-repartition{code}
 

 

 

Actually, one other thing why introducing additional manual repartitoin may be 
valuable - correct me if I'm wrong, but Kafka Streams will try to optimize when 
key operation is followed by stateful operation, like 
*groupByKey().aggregate(...)* but there's may be the case, that in DSL user may 
be using stateful *transform(...)* operation for aggregation. Consider 
following example:

 

 
{code:java}
final KStream streamByProfileId = streamsBuilder
 .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String()))
 .selectKey((key, value) -> value);

streamByProfileId.transform(...) // stateful tranformer with aggregation
streamByProfileId.transform(...) // stateful transformer with aggregation
{code}
 

In this example there's no repartition topic created, one the other hand if had 
something like `repartition()` operation on KStream we could write something 
like this, which would be pretty cool imho:
{code:java}
final KStream streamByProfileId = streamsBuilder
 .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String()))
 .repartitionBy((key, value) -> new_key);

streamByProfileId.transform(...) // stateful transformer with aggregation
streamByProfileId.transform(...) // stateful transformer with aggregation{code}
 

> Add possibility to do repartitioning on KStream
> ---
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>.selectKey((

[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Levani Kokhreidze (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846838#comment-16846838
 ] 

Levani Kokhreidze commented on KAFKA-8413:
--

Hi Matthias,

I've enabled topology optimization, but in this particular example, there're 
still 2 repartition topics created.

 
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");{code}
Kafka Streams version 2.2.0

 

Topics created:

 
{code:java}
867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-changelog
867bef21-1e56-4ef4-918d-f701fe6000bc-store-1-repartition
867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-changelog
867bef21-1e56-4ef4-918d-f701fe6000bc-store-2-repartition{code}
 

 

 

Actually, one other thing why introducing additional manual repartitoin may be 
valuable - correct me if I'm wrong, but Kafka Streams will try to optimize when 
key operation is followed by stateful operation, like 
*groupByKey().aggregate(...)* but there's may be the case, that in DSL user may 
be using stateful *transform(...)* operation for aggregation. Consider 
following example:

 

 
{code:java}
final KStream streamByProfileId = streamsBuilder
 .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String()))
 .selectKey((key, value) -> value);

streamByProfileId.transform(...) // stateful tranformer with aggregation
streamByProfileId.transform(...) // stateful transformer with aggregation
{code}
 

In this example there's no repartition topic created, one the other hand if had 
something like `repartition()` operation on KStream we could write something 
like this, which would be pretty cool imho:
{code:java}
final KStream streamByProfileId = streamsBuilder
 .stream("input.topic", Consumed.with(Serdes.String(), Serdes.String()))
 .repartitionBy((key, value) -> new_key);

streamByProfileId.transform(...) // stateful transformer with aggregation
streamByProfileId.transform(...) // stateful transformer with aggregation{code}
 

> Add possibility to do repartitioning on KStream
> ---
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>.selectKey((key, value) -> value);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
> This code will generate following topology:
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>  --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-FILTER-04 (stores: [])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-FILTER-08 (stores: [])
>  --> KSTREAM-SINK-07
>  <-- KSTREAM-KEY-SELECT-01
>  Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-04
>  Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-08
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-02
>  Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-05
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-06
>  Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-09
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
>  
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic")
>.selectKey((key, value) -> value)
>.through("repartition-topic");
>

[jira] [Assigned] (KAFKA-7760) Add broker configuration to set minimum value for segment.bytes and segment.ms

2019-05-23 Thread Brandt Newton (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandt Newton reassigned KAFKA-7760:


Assignee: (was: Brandt Newton)

> Add broker configuration to set minimum value for segment.bytes and segment.ms
> --
>
> Key: KAFKA-7760
> URL: https://issues.apache.org/jira/browse/KAFKA-7760
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Priority: Major
>  Labels: kip, newbie
>
> If someone set segment.bytes or segment.ms at topic level to a very small 
> value (e.g. segment.bytes=1000 or segment.ms=1000), Kafka will generate a 
> very high number of segment files. This can bring down the whole broker due 
> to hitting the maximum open file (for log) or maximum number of mmap-ed file 
> (for index).
> To prevent that from happening, I would like to suggest adding two new items 
> to the broker configuration:
>  * min.topic.segment.bytes, defaults to 1048576: The minimum value for 
> segment.bytes. When someone sets topic configuration segment.bytes to a value 
> lower than this, Kafka throws an error INVALID VALUE.
>  * min.topic.segment.ms, defaults to 360: The minimum value for 
> segment.ms. When someone sets topic configuration segment.ms to a value lower 
> than this, Kafka throws an error INVALID VALUE.
> Thanks
> Badai



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8415) Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation

2019-05-23 Thread Konstantine Karantasis (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-8415:
--
Description: 
Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} 
were recently added in Connect as plugins that can be loaded in class loading 
isolation. 

However the interface itself was not excluded from isolation itself, which 
results into definition conflicts. Any interface that is considered a base 
Connect plugin interface needs to be excluded by isolation itself (it's 
considered a "system" type). 

Here's the exception: 
{code:java}
[2019-05-23 15:16:57,802] ERROR Stopping due to error 
(org.apache.kafka.connect.cli.ConnectDistributed:84)
java.util.ServiceConfigurationError: 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy: 
Provider 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
 not a subtype
at java.util.ServiceLoader.fail(ServiceLoader.java:239)
at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
at 
org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
at 
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)

{code}

  was:
Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} 
were recently added in Connect as plugins that can be loaded in class loading 
isolation. 

However the interface itself was not excluded from isolation itself, which 
results into definition conflicts. Any interface that is considered a base 
Connect plugin interface needs to be excluded by isolation itself (it's 
considered a "system" type). 

Here's the exception: 
{code:java}
[2019-05-23 15:16:57,802] ERROR Stopping due to error 
(org.apache.kafka.connect.cli.ConnectDistributed:84) 
java.util.ServiceConfigurationError: 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy: 
Provider 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
 not a subtype         at java.util.ServiceLoader.fail(ServiceLoader.java:239)  
       at java.util.ServiceLoader.access$300(ServiceLoader.java:185)         at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376)        
 at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)         
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343)
         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317)
         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244)
         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236)
         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205)
         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182)
         at 
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)      
   at 
org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
         at 
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
{code}


> Interface ConnectorClientConfigOverridePolicy needs to be excluded from class 
> loading isolation
> ---
>
> Key: KAFKA-8415
> URL: https://issues.apache.org/jira/browse/KAFKA-8415
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Konstant

[jira] [Commented] (KAFKA-8415) Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation

2019-05-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846816#comment-16846816
 ] 

ASF GitHub Bot commented on KAFKA-8415:
---

kkonstantine commented on pull request #6796: KAFKA-8415: Interface 
ConnectorClientConfigOverridePolicy needs to be excluded from class loading 
isolation
URL: https://github.com/apache/kafka/pull/6796
 
 
   * New Connect plugin interface ConnectorClientConfigOverridePolicy needs to 
be excluded from the class loading isolation
   
   * Added missing unit tests similar to the ones existing for previous plugins
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Interface ConnectorClientConfigOverridePolicy needs to be excluded from class 
> loading isolation
> ---
>
> Key: KAFKA-8415
> URL: https://issues.apache.org/jira/browse/KAFKA-8415
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 2.3.0
>
>
> Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} 
> were recently added in Connect as plugins that can be loaded in class loading 
> isolation. 
> However the interface itself was not excluded from isolation itself, which 
> results into definition conflicts. Any interface that is considered a base 
> Connect plugin interface needs to be excluded by isolation itself (it's 
> considered a "system" type). 
> Here's the exception: 
> {code:java}
> [2019-05-23 15:16:57,802] ERROR Stopping due to error 
> (org.apache.kafka.connect.cli.ConnectDistributed:84) 
> java.util.ServiceConfigurationError: 
> org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy:
>  Provider 
> org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
>  not a subtype         at 
> java.util.ServiceLoader.fail(ServiceLoader.java:239)         at 
> java.util.ServiceLoader.access$300(ServiceLoader.java:185)         at 
> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376)      
>    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)       
>   at java.util.ServiceLoader$1.next(ServiceLoader.java:480)         at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343)
>          at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317)
>          at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244)
>          at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236)
>          at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205)
>          at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182)
>          at 
> org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)    
>      at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
>          at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers

2019-05-23 Thread Sebastiaan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846808#comment-16846808
 ] 

Sebastiaan commented on KAFKA-8412:
---

[~mjsax] I can try to reproduce it in development some more but so far we've 
only seen it in production.

But my theory is that it is similar to the other ticket, a comment 
https://issues.apache.org/jira/browse/KAFKA-7678?focusedCommentId=16715220&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16715220
 says:

_"There are one or two edge cases which can cause record collector to be closed 
multiple times, we have noticed them recently and are thinking about cleanup 
the classes along the calling hierarchy (i.e. from Task Manager -> Task -> 
RecordCollector) for it. One example is:_

_1) a task is *suspended*, with EOS turned on (like your case), the record 
collector is closed()._
 _2) then the instance got killed (SIGTERM) , which causes all threads to be 
closed, which will then cause all their owned tasks to be *closed*. The same 
record collector close() call will be triggered again"_

 

 

So this could be the same issue but now not for close but for flush. The 
producer is already flushed and closed but the same thing is tried again. Of 
course I don't know anything about the internals of the client so take this 
with a grain of salt.

> Still a nullpointer exception thrown on shutdown while flushing before 
> closing producers
> 
>
> Key: KAFKA-8412
> URL: https://issues.apache.org/jira/browse/KAFKA-8412
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Sebastiaan
>Priority: Minor
>
> I found a closed issue and replied there but decided to open one myself 
> because although they're related they're slightly different. The original 
> issue is at https://issues.apache.org/jira/browse/KAFKA-7678
> The fix there has been to implement a null check around closing a producer 
> because in some cases the producer is already null there (has been closed 
> already)
> In version 2.1.1 we are getting a very similar exception, but in the 'flush' 
> method that is called pre-close. This is in the log:
> {code:java}
> message: stream-thread 
> [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
> while closing StreamTask 1_26 due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> Followed by:
>  
> {code:java}
> message: task [1_26] Could not close task due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.StreamTask
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> If I look at the source code at this point, I see a nice null check in the 
> close method, but not in the flush method that is called just before that:
> {code:java}
> public void flush() {
>     this.log.debug("Flush

[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846800#comment-16846800
 ] 

Matthias J. Sax commented on KAFKA-7245:


Yes. The classes you listed _implement_ the interface, hence they need to use 
`put()` with two parameters – those would all be removed when the method in 
removed from the interface.

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7201) Optimize repartition operations

2019-05-23 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-7201:
---
Affects Version/s: 2.1.0

> Optimize repartition operations
> ---
>
> Key: KAFKA-7201
> URL: https://issues.apache.org/jira/browse/KAFKA-7201
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> When the topology has a key changing operation, any downstream processors 
> using the key will automatically create a repartition topic.  In most cases 
> these multiple repartition topics can be collapsed into one repartition 
> operation occurring immediately after the key changing operation, thus 
> reducing streams overall footprint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8414) org.apache.kafka.common.metrics.MetricsTest.testConcurrentReadUpdateReport hang

2019-05-23 Thread dan norwood (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dan norwood updated KAFKA-8414:
---
Description: 
caveat: this only happens on AMD Epyc machines with >=48 cpus. i have below a 
bunch of machine info from various `*a.*` aws instance sizes i ran against. all 
tests were using `OpenJDK Runtime Environment (build 1.8.0_201-b09)` and 
`Amazon Linux 2 AMI 2.0.20190508 x86_64 HVM gp2`

 

 

i noticed what seems like a deadlock when running 
`org.apache.kafka.common.metrics.MetricsTest` on an aws instance with 96vCPUs 
(specifically a m5a.24xlarge). after some debugging it seems like the offending 
issue is 
[https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L776-L778]

 
{code:java}
public void run() {
  try {
while (alive.get()) {
  op.run();
}
  } catch (Throwable t) {
log.error("Metric {} failed with exception", opName, t);
  }
}
{code}
since the `op.run()` methods are all synchronized we end up nonstop hammering 
it. after adding some logging i saw steadily increasing wait times for entry in 
to each synchronized block. so this is not *really* a deadlock or hang, but a 
progressive slowdown that makes the test unrunnable.

 

 

the offending op seems to be 
[https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L747]

 
{code:java}
Future reportFuture = executorService.submit(new 
ConcurrentMetricOperation(alive, "report", () -> reporter.processMetrics()));
{code}
 

 

possible fix:

adding a `Thread.sleep(0, 1)` inside the runloop for 
`ConcurrentMetricOperation` seems to allow the test to pass. but i'm not sure 
that it wouldn't mask an issue that the test is meant to detect

 

 

 

 

Good:

t3a.large

 
{noformat}
Architecture:    x86_64
CPU op-mode(s):  32-bit, 64-bit
Byte Order:  Little Endian
CPU(s):  2
On-line CPU(s) list: 0,1
Thread(s) per core:  2
Core(s) per socket:  1
Socket(s):   1
NUMA node(s):    1
Vendor ID:   AuthenticAMD
CPU family:  23
Model:   1
Model name:  AMD EPYC 7571
Stepping:    2
CPU MHz: 2200.116
BogoMIPS:    4400.23
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:   32K
L1i cache:   64K
L2 cache:    512K
L3 cache:    8192K
NUMA node0 CPU(s):   0,1
Flags:   fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb 
rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid 
tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes 
xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a 
misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 
rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr 
arat npt nrip_save{noformat}
 

 

t3a.2xlarge

 
{noformat}
Architecture:    x86_64
CPU op-mode(s):  32-bit, 64-bit
Byte Order:  Little Endian
CPU(s):  8
On-line CPU(s) list: 0-7
Thread(s) per core:  2
Core(s) per socket:  4
Socket(s):   1
NUMA node(s):    1
Vendor ID:   AuthenticAMD
CPU family:  23
Model:   1
Model name:  AMD EPYC 7571
Stepping:    2
CPU MHz: 2199.916
BogoMIPS:    4399.83
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:   32K
L1i cache:   64K
L2 cache:    512K
L3 cache:    8192K
NUMA node0 CPU(s):   0-7
Flags:   fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb 
rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid 
tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes 
xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a 
misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 
rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr 
arat npt nrip_save{noformat}
 

 

m5a.4xlarge
{noformat}
Architecture:    x86_64
CPU op-mode(s):  32-bit, 64-bit
Byte Order:  Little Endian
CPU(s):  16
On-line CPU(s) list: 0-15
Thread(s) per core:  2
Core(s) per socket:  8
Socket(s):   1
NUMA node(s):    1
Vendor ID:   AuthenticAMD
CPU family:  23
Model:   1
Model name:  AMD EPYC 7571
Stepping:    2
CPU MHz: 2585.550
BogoMIPS:    4399.98
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:   32K
L1i cache:   64K
L2 cache:    512K
L3 cache:    8192K
NUMA node0 CPU(s):   0-15
Flags:   fpu vme de pse tsc msr pae mce cx8 apic sep mtr

[jira] [Resolved] (KAFKA-7201) Optimize repartition operations

2019-05-23 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-7201.

Resolution: Fixed

Resolved via https://issues.apache.org/jira/browse/KAFKA-6761 

> Optimize repartition operations
> ---
>
> Key: KAFKA-7201
> URL: https://issues.apache.org/jira/browse/KAFKA-7201
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> When the topology has a key changing operation, any downstream processors 
> using the key will automatically create a repartition topic.  In most cases 
> these multiple repartition topics can be collapsed into one repartition 
> operation occurring immediately after the key changing operation, thus 
> reducing streams overall footprint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7201) Optimize repartition operations

2019-05-23 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-7201:
---
Fix Version/s: 2.1.0

> Optimize repartition operations
> ---
>
> Key: KAFKA-7201
> URL: https://issues.apache.org/jira/browse/KAFKA-7201
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.1.0
>
>
> When the topology has a key changing operation, any downstream processors 
> using the key will automatically create a repartition topic.  In most cases 
> these multiple repartition topics can be collapsed into one repartition 
> operation occurring immediately after the key changing operation, thus 
> reducing streams overall footprint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7201) Optimize repartition operations

2019-05-23 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846795#comment-16846795
 ] 

Bill Bejeck commented on KAFKA-7201:


[~mjsax]

Apologies for the delayed response.  Yes this is resolved [link 
KAFKA-6761|https://issues.apache.org/jira/browse/KAFKA-6761]

> Optimize repartition operations
> ---
>
> Key: KAFKA-7201
> URL: https://issues.apache.org/jira/browse/KAFKA-7201
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> When the topology has a key changing operation, any downstream processors 
> using the key will automatically create a repartition topic.  In most cases 
> these multiple repartition topics can be collapsed into one repartition 
> operation occurring immediately after the key changing operation, thus 
> reducing streams overall footprint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7201) Optimize repartition operations

2019-05-23 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846795#comment-16846795
 ] 

Bill Bejeck edited comment on KAFKA-7201 at 5/23/19 3:37 PM:
-

[~mjsax]

Apologies for the delayed response.  Yes, this is resolved KAFKA-6761


was (Author: bbejeck):
[~mjsax]

Apologies for the delayed response.  Yes this is resolved [link 
KAFKA-6761|https://issues.apache.org/jira/browse/KAFKA-6761]

> Optimize repartition operations
> ---
>
> Key: KAFKA-7201
> URL: https://issues.apache.org/jira/browse/KAFKA-7201
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> When the topology has a key changing operation, any downstream processors 
> using the key will automatically create a repartition topic.  In most cases 
> these multiple repartition topics can be collapsed into one repartition 
> operation occurring immediately after the key changing operation, thus 
> reducing streams overall footprint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846791#comment-16846791
 ] 

Matthias J. Sax commented on KAFKA-8413:


This should already be fixed. You need to turn on topology optimization though. 
Compare https://issues.apache.org/jira/browse/KAFKA-6761

Seems we can close this ticket as "invalid" ?

> Add possibility to do repartitioning on KStream
> ---
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>.selectKey((key, value) -> value);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
> This code will generate following topology:
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>  --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-FILTER-04 (stores: [])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-FILTER-08 (stores: [])
>  --> KSTREAM-SINK-07
>  <-- KSTREAM-KEY-SELECT-01
>  Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-04
>  Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-08
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-02
>  Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-05
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-06
>  Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-09
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
>  
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic")
>.selectKey((key, value) -> value)
>.through("repartition-topic");
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
>  
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
> --> KSTREAM-SINK-02
> <-- KSTREAM-SOURCE-00
> Sink: KSTREAM-SINK-02 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-01
> Sub-topology: 1
> Source: KSTREAM-SOURCE-03 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05
> Processor: KSTREAM-AGGREGATE-04 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-03
> Processor: KSTREAM-AGGREGATE-05 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-03
> {code}
>   
> While this gives possibility to optimizes Kafka Streams application, user 
> still has to manually create repartition topic with correct number of 
> partitions based on input topic. It would be great if in DSL we could have 
> something like *repartition()* operation on *KStream* which can generate 
> repartition topic based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers

2019-05-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846787#comment-16846787
 ] 

Matthias J. Sax commented on KAFKA-8412:


Thanks for reporting this. What I don't understand is, why we would flush after 
we closed a task already. Hence, I am not sure if a null-guard is the correct 
fix, but to rather make sure we don't call flush() in the first place.

Can you maybe provide debug level logs? This might help to understand the 
scenario better.

> Still a nullpointer exception thrown on shutdown while flushing before 
> closing producers
> 
>
> Key: KAFKA-8412
> URL: https://issues.apache.org/jira/browse/KAFKA-8412
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Sebastiaan
>Priority: Minor
>
> I found a closed issue and replied there but decided to open one myself 
> because although they're related they're slightly different. The original 
> issue is at https://issues.apache.org/jira/browse/KAFKA-7678
> The fix there has been to implement a null check around closing a producer 
> because in some cases the producer is already null there (has been closed 
> already)
> In version 2.1.1 we are getting a very similar exception, but in the 'flush' 
> method that is called pre-close. This is in the log:
> {code:java}
> message: stream-thread 
> [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
> while closing StreamTask 1_26 due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> Followed by:
>  
> {code:java}
> message: task [1_26] Could not close task due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.StreamTask
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> If I look at the source code at this point, I see a nice null check in the 
> close method, but not in the flush method that is called just before that:
> {code:java}
> public void flush() {
>     this.log.debug("Flushing producer");
>     this.producer.flush();
>     this.checkForException();
> }
> public void close() {
>     this.log.debug("Closing producer");
>     if (this.producer != null) {
>     this.producer.close();
>     this.producer = null;
>     }
>     this.checkForException();
> }{code}
> Seems to my (ignorant) eye that the flush method should also be wrapped in a 
> null check in the same way as has been done for close.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8415) Interface ConnectorClientConfigOverridePolicy needs to be excluded from class loading isolation

2019-05-23 Thread Konstantine Karantasis (JIRA)
Konstantine Karantasis created KAFKA-8415:
-

 Summary: Interface ConnectorClientConfigOverridePolicy needs to be 
excluded from class loading isolation
 Key: KAFKA-8415
 URL: https://issues.apache.org/jira/browse/KAFKA-8415
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.3.0
Reporter: Konstantine Karantasis
Assignee: Konstantine Karantasis
 Fix For: 2.3.0


Classes or interfaces that implement {{ConnectorClientConfigOverridePolicy}} 
were recently added in Connect as plugins that can be loaded in class loading 
isolation. 

However the interface itself was not excluded from isolation itself, which 
results into definition conflicts. Any interface that is considered a base 
Connect plugin interface needs to be excluded by isolation itself (it's 
considered a "system" type). 

Here's the exception: 
{code:java}
[2019-05-23 15:16:57,802] ERROR Stopping due to error 
(org.apache.kafka.connect.cli.ConnectDistributed:84) 
java.util.ServiceConfigurationError: 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy: 
Provider 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy
 not a subtype         at java.util.ServiceLoader.fail(ServiceLoader.java:239)  
       at java.util.ServiceLoader.access$300(ServiceLoader.java:185)         at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376)        
 at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)         
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getServiceLoaderPluginDesc(DelegatingClassLoader.java:343)
         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:317)
         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:244)
         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:236)
         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:205)
         at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:182)
         at 
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)      
   at 
org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
         at 
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8414) org.apache.kafka.common.metrics.MetricsTest.testConcurrentReadUpdateReport hang

2019-05-23 Thread dan norwood (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dan norwood updated KAFKA-8414:
---
Description: 
caveat: this only happens on AMD Epyc machines with >=48 cpus. i have below a 
bunch of machine info from various `*a.*` aws instance sizes i ran against. all 
tests were using `OpenJDK Runtime Environment (build 1.8.0_201-b09)` and 
`Amazon Linux 2 AMI 2.0.20190508 x86_64 HVM gp2`

 

 

i noticed what seems like a deadlock when running 
`org.apache.kafka.common.metrics.MetricsTest` on an aws instance with 96vCPUs 
(specifically a m5a.24xlarge). after some debugging it seems like the offending 
issue is 
[https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L776-L778]

 
{code:java}
public void run() {
  try {
while (alive.get()) {
  op.run();
}
  } catch (Throwable t) {
log.error("Metric {} failed with exception", opName, t);
  }
}
{code}
since the `op.run()` methods are all synchronized we end up nonstop hammering 
it. after adding some logging i saw steadily increasing wait times for entry in 
to each synchronized block. so this is not *really* a deadlock or hang, but a 
progressive slowdown that makes the test unrunnable.

 

 

the offending op seems to be 
[https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L747]

 
{code:java}
Future reportFuture = executorService.submit(new 
ConcurrentMetricOperation(alive, "report", () -> reporter.processMetrics()));
{code}
 

 

possible fix:

adding a `Thread.sleep(0, 1)` inside the runloop for 
`ConcurrentMetricOperation` seems to allow the test to pass. but i'm not sure 
that it wouldn't mask an issue that the test is meant to detect

 

 

 

 

Good:

t3a.large

 
{noformat}
Architecture:    x86_64
CPU op-mode(s):  32-bit, 64-bit
Byte Order:  Little Endian
CPU(s):  2
On-line CPU(s) list: 0,1
Thread(s) per core:  2
Core(s) per socket:  1
Socket(s):   1
NUMA node(s):    1
Vendor ID:   AuthenticAMD
CPU family:  23
Model:   1
Model name:  AMD EPYC 7571
Stepping:    2
CPU MHz: 2200.116
BogoMIPS:    4400.23
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:   32K
L1i cache:   64K
L2 cache:    512K
L3 cache:    8192K
NUMA node0 CPU(s):   0,1
Flags:   fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb 
rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid 
tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes 
xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a 
misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 
rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr 
arat npt nrip_save{noformat}
 

 

t3a.2xlarge

 
{noformat}
Architecture:    x86_64
CPU op-mode(s):  32-bit, 64-bit
Byte Order:  Little Endian
CPU(s):  8
On-line CPU(s) list: 0-7
Thread(s) per core:  2
Core(s) per socket:  4
Socket(s):   1
NUMA node(s):    1
Vendor ID:   AuthenticAMD
CPU family:  23
Model:   1
Model name:  AMD EPYC 7571
Stepping:    2
CPU MHz: 2199.916
BogoMIPS:    4399.83
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:   32K
L1i cache:   64K
L2 cache:    512K
L3 cache:    8192K
NUMA node0 CPU(s):   0-7
Flags:   fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb 
rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid 
tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes 
xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a 
misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 
rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr 
arat npt nrip_save{noformat}
 

 

m5a.4xlarge

 
{noformat}
Architecture:    x86_64
CPU op-mode(s):  32-bit, 64-bit
Byte Order:  Little Endian
CPU(s):  16
On-line CPU(s) list: 0-15
Thread(s) per core:  2
Core(s) per socket:  8
Socket(s):   1
NUMA node(s):    1
Vendor ID:   AuthenticAMD
CPU family:  23
Model:   1
Model name:  AMD EPYC 7571
Stepping:    2
CPU MHz: 2585.550
BogoMIPS:    4399.98
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:   32K
L1i cache:   64K
L2 cache:    512K
L3 cache:    8192K
NUMA node0 CPU(s):   0-15
Flags:   fpu vme de pse tsc msr pae mce cx8 apic sep 

[jira] [Created] (KAFKA-8414) org.apache.kafka.common.metrics.MetricsTest.testConcurrentReadUpdateReport hang

2019-05-23 Thread dan norwood (JIRA)
dan norwood created KAFKA-8414:
--

 Summary: 
org.apache.kafka.common.metrics.MetricsTest.testConcurrentReadUpdateReport hang 
 Key: KAFKA-8414
 URL: https://issues.apache.org/jira/browse/KAFKA-8414
 Project: Kafka
  Issue Type: Bug
Reporter: dan norwood


caveat: this only happens on AMD Epyc machines with >=48 cpus. i have below a 
bunch of machine info from various `*a.*` aws instance sizes i ran against.

 

 

i noticed what seems like a deadlock when running 
`org.apache.kafka.common.metrics.MetricsTest` on an aws instance with 96vCPUs 
(specifically a m5a.24xlarge). after some debugging it seems like the offending 
issue is 
[https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L776-L778]

 
{code:java}
public void run() {
  try {
while (alive.get()) {
  op.run();
}
  } catch (Throwable t) {
log.error("Metric {} failed with exception", opName, t);
  }
}
{code}
since the `op.run()` methods are all synchronized we end up nonstop hammering 
it. after adding some logging i saw steadily increasing wait times for entry in 
to each synchronized block. so this is not *really* a deadlock or hang, but a 
progressive slowdown that makes the test unrunnable.

 

 

the offending op seems to be 
[https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java#L747]

 
{code:java}
Future reportFuture = executorService.submit(new 
ConcurrentMetricOperation(alive, "report", () -> reporter.processMetrics()));
{code}
 

 

possible fix:

adding a `Thread.sleep(0, 1)` inside the runloop for 
`ConcurrentMetricOperation` seems to allow the test to pass. but i'm not sure 
that it wouldn't mask an issue that the test is meant to detect

 

 

 

 

Good: 

t3a.large
```
Architecture:    x86_64
CPU op-mode(s):  32-bit, 64-bit
Byte Order:  Little Endian
CPU(s):  2
On-line CPU(s) list: 0,1
Thread(s) per core:  2
Core(s) per socket:  1
Socket(s):   1
NUMA node(s):    1
Vendor ID:   AuthenticAMD
CPU family:  23
Model:   1
Model name:  AMD EPYC 7571
Stepping:    2
CPU MHz: 2200.116
BogoMIPS:    4400.23
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:   32K
L1i cache:   64K
L2 cache:    512K
L3 cache:    8192K
NUMA node0 CPU(s):   0,1
Flags:   fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb 
rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid 
tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes 
xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a 
misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 
rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr 
arat npt nrip_save
```

t3a.2xlarge
```
Architecture:    x86_64
CPU op-mode(s):  32-bit, 64-bit
Byte Order:  Little Endian
CPU(s):  8
On-line CPU(s) list: 0-7
Thread(s) per core:  2
Core(s) per socket:  4
Socket(s):   1
NUMA node(s):    1
Vendor ID:   AuthenticAMD
CPU family:  23
Model:   1
Model name:  AMD EPYC 7571
Stepping:    2
CPU MHz: 2199.916
BogoMIPS:    4399.83
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:   32K
L1i cache:   64K
L2 cache:    512K
L3 cache:    8192K
NUMA node0 CPU(s):   0-7
Flags:   fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb 
rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid 
tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes 
xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a 
misalignsse 3dnowprefetch topoext cpb vmmcall fsgsbase bmi1 avx2 smep bmi2 
rdseed adx smap clflushopt sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr 
arat npt nrip_save
```

m5a.4xlarge
```
Architecture:    x86_64
CPU op-mode(s):  32-bit, 64-bit
Byte Order:  Little Endian
CPU(s):  16
On-line CPU(s) list: 0-15
Thread(s) per core:  2
Core(s) per socket:  8
Socket(s):   1
NUMA node(s):    1
Vendor ID:   AuthenticAMD
CPU family:  23
Model:   1
Model name:  AMD EPYC 7571
Stepping:    2
CPU MHz: 2585.550
BogoMIPS:    4399.98
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:   32K
L1i cache:   64K
L2 cache:    512K
L3 cache:    8192K
NUMA node0 CPU(s):   0-15
Flags:   fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca 
cmov pat p

[jira] [Updated] (KAFKA-8411) Option to keep tombstones forever in compacted logs

2019-05-23 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8411:
---
Priority: Minor  (was: Major)

> Option to keep tombstones forever in compacted logs 
> 
>
> Key: KAFKA-8411
> URL: https://issues.apache.org/jira/browse/KAFKA-8411
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Lukas Welte
>Priority: Minor
>
> Log compaction is great to balance between having all data in a topic and 
> reducing the amount of messages in a topic.
> However the delete tombstones are cleaned after a certain period. For some 
> use cases we would really like to keep the latest message of each key, which 
> means also the tombstone of that ID should be kept forever.
> An option to configure _delete.retention.ms_ to e.g. _-1_ to enable this 
> functionality would be great.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8411) Option to keep tombstones forever in compacted logs

2019-05-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846775#comment-16846775
 ] 

Matthias J. Sax commented on KAFKA-8411:


Well. The config is of type long. If you set it to Long.MAX_VALUE, you can 
configure many million years of retention was is effectively forever. This 
should help you to do what you want already. Just to enable your use case 
without this ticket. We can still allow -1, but it won't "change" anything, net 
effect.

> Option to keep tombstones forever in compacted logs 
> 
>
> Key: KAFKA-8411
> URL: https://issues.apache.org/jira/browse/KAFKA-8411
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Lukas Welte
>Priority: Major
>
> Log compaction is great to balance between having all data in a topic and 
> reducing the amount of messages in a topic.
> However the delete tombstones are cleaned after a certain period. For some 
> use cases we would really like to keep the latest message of each key, which 
> means also the tombstone of that ID should be kept forever.
> An option to configure _delete.retention.ms_ to e.g. _-1_ to enable this 
> functionality would be great.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Levani Kokhreidze (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze updated KAFKA-8413:
-
Description: 
Consider following code:
{code:java}
final KStream streamByProfileId = streamsBuilder
   .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
   .selectKey((key, value) -> value);

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-1")
   );

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-2")
   );
{code}
 

This code will generate following topology:
{code:java}
Topologies:
 Sub-topology: 0
 Source: KSTREAM-SOURCE-00 (topics: [input-topic])
 --> KSTREAM-KEY-SELECT-01
 Processor: KSTREAM-KEY-SELECT-01 (stores: [])
 --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
 <-- KSTREAM-SOURCE-00
 Processor: KSTREAM-FILTER-04 (stores: [])
 --> KSTREAM-SINK-03
 <-- KSTREAM-KEY-SELECT-01
 Processor: KSTREAM-FILTER-08 (stores: [])
 --> KSTREAM-SINK-07
 <-- KSTREAM-KEY-SELECT-01
 Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
 <-- KSTREAM-FILTER-04
 Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
 <-- KSTREAM-FILTER-08
Sub-topology: 1
 Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
 --> KSTREAM-AGGREGATE-02
 Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
 --> none
 <-- KSTREAM-SOURCE-05
Sub-topology: 2
 Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
 --> KSTREAM-AGGREGATE-06
 Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
 --> none
 <-- KSTREAM-SOURCE-09
 
{code}
Kafka Streams creates two repartition topics for each `groupByKey` operation. 
In this example, two repartition topics are not really necessary and processing 
can be done with one sub-topology.

 

Kafka Streams user, in DSL, may specify repartition topic manually using 
*KStream#through* method:
{code:java}
final KStream streamByProfileId = streamsBuilder
   .stream("input-topic")
   .selectKey((key, value) -> value)
   .through("repartition-topic");

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-1")
   );

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-2")
   );
{code}
 

 
{code:java}
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-00 (topics: [input-topic])
--> KSTREAM-KEY-SELECT-01
Processor: KSTREAM-KEY-SELECT-01 (stores: [])
--> KSTREAM-SINK-02
<-- KSTREAM-SOURCE-00
Sink: KSTREAM-SINK-02 (topic: repartition-topic)
<-- KSTREAM-KEY-SELECT-01

Sub-topology: 1
Source: KSTREAM-SOURCE-03 (topics: [repartition-topic])
--> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05
Processor: KSTREAM-AGGREGATE-04 (stores: [store-1])
--> none
<-- KSTREAM-SOURCE-03
Processor: KSTREAM-AGGREGATE-05 (stores: [store-2])
--> none
<-- KSTREAM-SOURCE-03
{code}
  

While this gives possibility to optimizes Kafka Streams application, user still 
has to manually create repartition topic with correct number of partitions 
based on input topic. It would be great if in DSL we could have something like 
*repartition()* operation on *KStream* which can generate repartition topic 
based on user command.

  was:
Consider following code:

 
{code:java}
final KStream streamByProfileId = streamsBuilder
   .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
   .selectKey((key, value) -> value);

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-1")
   );

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-2")
   );
{code}
 

This code will generate following topology:

 
{code:java}
Topologies:
 Sub-topology: 0
 Source: KSTREAM-SOURCE-00 (topics: [input-topic])
 --> KSTREAM-KEY-SELECT-01
 Processor: KSTREAM-KEY-SELECT-01 (stores: [])
 --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
 <-- KSTREAM-SOURCE-00
 Processor: KSTREAM-FILTER-04 (stores: [])
 --> KSTREAM-SINK-03
 <-- KSTREAM-KEY-SELECT-01
 Processor: KSTREAM-FILTER-08 (stores: [])
 --> KSTREAM-SINK-07
 <-- KSTREAM-KEY-SELECT-01
 Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
 <-- KSTREAM-FILTER-04
 Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
 <-- KSTREAM-FILTER-08
Sub-topology: 1
 Source: KSTREAM-

[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Levani Kokhreidze (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846740#comment-16846740
 ] 

Levani Kokhreidze commented on KAFKA-8413:
--

Happy to work on KIP if this feature makes sense.

> Add possibility to do repartitioning on KStream
> ---
>
> Key: KAFKA-8413
> URL: https://issues.apache.org/jira/browse/KAFKA-8413
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
> Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
>  
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>.selectKey((key, value) -> value);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
> This code will generate following topology:
>  
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>  --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-FILTER-04 (stores: [])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-KEY-SELECT-01
>  Processor: KSTREAM-FILTER-08 (stores: [])
>  --> KSTREAM-SINK-07
>  <-- KSTREAM-KEY-SELECT-01
>  Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-04
>  Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-08
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-02
>  Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-05
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-06
>  Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-09
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
>  
> {code:java}
> final KStream streamByProfileId = streamsBuilder
>.stream("input-topic")
>.selectKey((key, value) -> value)
>.through("repartition-topic");
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-1")
>);
> streamByProfileId
>.groupByKey()
>.aggregate(
>   () -> 0d,
>   (key, value, aggregate) -> aggregate,
>   Materialized.as("store-2")
>);
> {code}
>  
>  
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
> --> KSTREAM-SINK-02
> <-- KSTREAM-SOURCE-00
> Sink: KSTREAM-SINK-02 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-01
> Sub-topology: 1
> Source: KSTREAM-SOURCE-03 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05
> Processor: KSTREAM-AGGREGATE-04 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-03
> Processor: KSTREAM-AGGREGATE-05 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-03
> {code}
>  
>  
> While this gives possibility to optimizes Kafka Streams application, user 
> still has to manually create repartition topic with correct number of 
> partitions based on input topic. It would be great if in DSL we could have 
> something like *repartition()* operation on *KStream* which can generate 
> repartition topic based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8413) Add possibility to do repartitioning on KStream

2019-05-23 Thread Levani Kokhreidze (JIRA)
Levani Kokhreidze created KAFKA-8413:


 Summary: Add possibility to do repartitioning on KStream
 Key: KAFKA-8413
 URL: https://issues.apache.org/jira/browse/KAFKA-8413
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Levani Kokhreidze
 Attachments: topology-1.png, topology-2.png

Consider following code:

 
{code:java}
final KStream streamByProfileId = streamsBuilder
   .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
   .selectKey((key, value) -> value);

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-1")
   );

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-2")
   );
{code}
 

This code will generate following topology:

 
{code:java}
Topologies:
 Sub-topology: 0
 Source: KSTREAM-SOURCE-00 (topics: [input-topic])
 --> KSTREAM-KEY-SELECT-01
 Processor: KSTREAM-KEY-SELECT-01 (stores: [])
 --> KSTREAM-FILTER-04, KSTREAM-FILTER-08
 <-- KSTREAM-SOURCE-00
 Processor: KSTREAM-FILTER-04 (stores: [])
 --> KSTREAM-SINK-03
 <-- KSTREAM-KEY-SELECT-01
 Processor: KSTREAM-FILTER-08 (stores: [])
 --> KSTREAM-SINK-07
 <-- KSTREAM-KEY-SELECT-01
 Sink: KSTREAM-SINK-03 (topic: store-1-repartition)
 <-- KSTREAM-FILTER-04
 Sink: KSTREAM-SINK-07 (topic: store-2-repartition)
 <-- KSTREAM-FILTER-08
Sub-topology: 1
 Source: KSTREAM-SOURCE-05 (topics: [store-1-repartition])
 --> KSTREAM-AGGREGATE-02
 Processor: KSTREAM-AGGREGATE-02 (stores: [store-1])
 --> none
 <-- KSTREAM-SOURCE-05
Sub-topology: 2
 Source: KSTREAM-SOURCE-09 (topics: [store-2-repartition])
 --> KSTREAM-AGGREGATE-06
 Processor: KSTREAM-AGGREGATE-06 (stores: [store-2])
 --> none
 <-- KSTREAM-SOURCE-09
 
{code}

Kafka Streams creates two repartition topics for each `groupByKey` operation. 
In this example, two repartition topics are not really necessary and processing 
can be done with one sub-topology.


Kafka Streams user, in DSL, may specify repartition topic manually using 
*KStream#through* method:

 
{code:java}
final KStream streamByProfileId = streamsBuilder
   .stream("input-topic")
   .selectKey((key, value) -> value)
   .through("repartition-topic");

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-1")
   );

streamByProfileId
   .groupByKey()
   .aggregate(
  () -> 0d,
  (key, value, aggregate) -> aggregate,
  Materialized.as("store-2")
   );
{code}
 

 
{code:java}
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-00 (topics: [input-topic])
--> KSTREAM-KEY-SELECT-01
Processor: KSTREAM-KEY-SELECT-01 (stores: [])
--> KSTREAM-SINK-02
<-- KSTREAM-SOURCE-00
Sink: KSTREAM-SINK-02 (topic: repartition-topic)
<-- KSTREAM-KEY-SELECT-01

Sub-topology: 1
Source: KSTREAM-SOURCE-03 (topics: [repartition-topic])
--> KSTREAM-AGGREGATE-04, KSTREAM-AGGREGATE-05
Processor: KSTREAM-AGGREGATE-04 (stores: [store-1])
--> none
<-- KSTREAM-SOURCE-03
Processor: KSTREAM-AGGREGATE-05 (stores: [store-2])
--> none
<-- KSTREAM-SOURCE-03
{code}
 

 

While this gives possibility to optimizes Kafka Streams application, user still 
has to manually create repartition topic with correct number of partitions 
based on input topic. It would be great if in DSL we could have something like 
*repartition()* operation on *KStream* which can generate repartition topic 
based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-05-23 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846566#comment-16846566
 ] 

Andrew edited comment on KAFKA-5998 at 5/23/19 11:32 AM:
-

I have just seen this error with kafka 2.2.0 and confluent 5.2.1. We are 
running in docker on kubernetes, and the same container works when run against 
some topics but not others it seems.
{code:java}
java.io.FileNotFoundException: 
/tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather/0_11/.checkpoint.tmp
 (No such file or directory)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.(FileOutputStream.java:213)
    at java.io.FileOutputStream.(FileOutputStream.java:162)
    at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:325)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:474)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
{code}
When I `exec` into the running container I see this :
{code:java}
/tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather # ls
2_0   2_10  2_12  2_14  2_16  2_18  2_2   2_4   2_6   2_8   3_0   3_10  3_12  
3_14  3_16  3_18  3_2   3_4   3_6   3_8
{code}
Also I see 

{Code}

09:55:49,898 INFO org.apache.kafka.streams.processor.internals.StateDirectory - 
stream-thread 
[kafka-streams-join-to-nearest-spm-draft-position-weather-404a240b-1e95-4587-80bf-686047658254-CleanupThread]
 Deleting obsolete state directory 0_1 for task 0_1 as 600898ms has elapsed 
(cleanup delay is 60ms).
{Code}

 

And I see the following in the StreamsConfig log output :

{Code}
 state.cleanup.delay.ms = 60

state.dir = /tmp/kafka-streams

{Code}

 


was (Author: the4thamigo_uk):
I have just seen this error with kafka 2.2.0 and confluent 5.2.1. We are 
running in docker on kubernetes, and the same container works when run against 
some topics but not others it seems.
{code:java}
java.io.FileNotFoundException: 
/tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather/0_11/.checkpoint.tmp
 (No such file or directory)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.(FileOutputStream.java:213)
    at java.io.FileOutputStream.(FileOutputStream.java:162)
    at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:325)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:474)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
{code}

When I `exec` into the running container I see this :

{Code}
/tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather # ls
2_0   2_10  2_12  2_14  2_16  2_18  2_2   2_4   2_6   2_8   3_0   3_10  3_12  
3_14  3_16  3_18  3_2   3_4   3_6   3_8
{Code}

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props

[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-23 Thread Omkar Mestry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846598#comment-16846598
 ] 

Omkar Mestry commented on KAFKA-7245:
-

[~mjsax]  The core stream also has some class which are using the put method 
with two parameters.

[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java]


[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java]


[https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java]


[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java]

 

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-05-23 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846566#comment-16846566
 ] 

Andrew edited comment on KAFKA-5998 at 5/23/19 9:29 AM:


I have just seen this error with kafka 2.2.0 and confluent 5.2.1. We are 
running in docker on kubernetes, and the same container works when run against 
some topics but not others it seems.
{code:java}
java.io.FileNotFoundException: 
/tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather/0_11/.checkpoint.tmp
 (No such file or directory)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.(FileOutputStream.java:213)
    at java.io.FileOutputStream.(FileOutputStream.java:162)
    at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:325)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:474)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
{code}

When I `exec` into the running container I see this :

{Code}
/tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather # ls
2_0   2_10  2_12  2_14  2_16  2_18  2_2   2_4   2_6   2_8   3_0   3_10  3_12  
3_14  3_16  3_18  3_2   3_4   3_6   3_8
{Code}


was (Author: the4thamigo_uk):
I have just seen this error with kafka 2.2.0 and confluent 5.2.1. We are 
running in docker on kubernetes, and the same container works when run against 
some topics but not others it seems.

{Code}
java.io.FileNotFoundException: 
/tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather/0_11/.checkpoint.tmp
 (No such file or directory)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.(FileOutputStream.java:213)
    at java.io.FileOutputStream.(FileOutputStream.java:162)
    at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:325)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:474)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
{Code}

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.Fi

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-05-23 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846566#comment-16846566
 ] 

Andrew commented on KAFKA-5998:
---

I have just seen this error with kafka 2.2.0 and confluent 5.2.1. We are 
running in docker on kubernetes, and the same container works when run against 
some topics but not others it seems.

{Code}
java.io.FileNotFoundException: 
/tmp/kafka-streams/kafka-streams-join-to-nearest-spm-draft-position-weather/0_11/.checkpoint.tmp
 (No such file or directory)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.(FileOutputStream.java:213)
    at java.io.FileOutputStream.(FileOutputStream.java:162)
    at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:325)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:474)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
{Code}

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSH

[jira] [Created] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers

2019-05-23 Thread Sebastiaan (JIRA)
Sebastiaan created KAFKA-8412:
-

 Summary: Still a nullpointer exception thrown on shutdown while 
flushing before closing producers
 Key: KAFKA-8412
 URL: https://issues.apache.org/jira/browse/KAFKA-8412
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.1
Reporter: Sebastiaan


I found a closed issue and replied there but decided to open one myself because 
although they're related they're slightly different. The original issue is at 
https://issues.apache.org/jira/browse/KAFKA-7678

The fix there has been to implement a null check around closing a producer 
because in some cases the producer is already null there (has been closed 
already)

In version 2.1.1 we are getting a very similar exception, but in the 'flush' 
method that is called pre-close. This is in the log:
{code:java}
message: stream-thread 
[webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
while closing StreamTask 1_26 due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks

java.lang.NullPointerException: null
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}

Followed by:
 
{code:java}
message: task [1_26] Could not close task due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.StreamTask

java.lang.NullPointerException: null
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}

If I look at the source code at this point, I see a nice null check in the 
close method, but not in the flush method that is called just before that:
{code:java}
public void flush() {
    this.log.debug("Flushing producer");
    this.producer.flush();
    this.checkForException();
}

public void close() {
    this.log.debug("Closing producer");
    if (this.producer != null) {
    this.producer.close();
    this.producer = null;
    }

    this.checkForException();
}{code}

Seems to my (ignorant) eye that the flush method should also be wrapped in a 
null check in the same way as has been done for close.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2019-05-23 Thread Vivek Yadav (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846512#comment-16846512
 ] 

Vivek Yadav commented on KAFKA-4084:


[~jiaxinye] Is it works for you?

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8411) Option to keep tombstones forever in compacted logs

2019-05-23 Thread Lukas Welte (JIRA)
Lukas Welte created KAFKA-8411:
--

 Summary: Option to keep tombstones forever in compacted logs 
 Key: KAFKA-8411
 URL: https://issues.apache.org/jira/browse/KAFKA-8411
 Project: Kafka
  Issue Type: Improvement
  Components: log cleaner
Reporter: Lukas Welte


Log compaction is great to balance between having all data in a topic and 
reducing the amount of messages in a topic.

However the delete tombstones are cleaned after a certain period. For some use 
cases we would really like to keep the latest message of each key, which means 
also the tombstone of that ID should be kept forever.

An option to configure _delete.retention.ms_ to e.g. _-1_ to enable this 
functionality would be great.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)