[jira] [Resolved] (KAFKA-7140) Remove deprecated poll usages

2018-08-10 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7140.

Resolution: Fixed

> Remove deprecated poll usages
> -
>
> Key: KAFKA-7140
> URL: https://issues.apache.org/jira/browse/KAFKA-7140
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>Priority: Minor
>
> There are a couple of poll(long) usages of the consumer in test and non-test 
> code. This jira would aim to remove the non-test usages of the method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7140) Remove deprecated poll usages

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577062#comment-16577062
 ] 

ASF GitHub Bot commented on KAFKA-7140:
---

hachikuji closed pull request #5319: KAFKA-7140: Remove deprecated poll usages
URL: https://github.com/apache/kafka/pull/5319
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 47f8529e2d1..692331ed13f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -53,6 +53,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -441,7 +442,7 @@ public String toString() {
 }
 
 private ConsumerRecords pollConsumer(long timeoutMs) {
-ConsumerRecords msgs = consumer.poll(timeoutMs);
+ConsumerRecords msgs = 
consumer.poll(Duration.ofMillis(timeoutMs));
 
 // Exceptions raised from the task during a rebalance should be 
rethrown to stop the worker
 if (rebalanceException != null) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index de1ceb3be10..ea9b4c621f9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -35,6 +35,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -253,7 +254,7 @@ public void send(K key, V value, 
org.apache.kafka.clients.producer.Callback call
 
 private void poll(long timeoutMs) {
 try {
-ConsumerRecords records = consumer.poll(timeoutMs);
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(timeoutMs));
 for (ConsumerRecord record : records)
 consumedCallback.onCompletion(null, record);
 } catch (WakeupException e) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 1bf9c717068..6d92c34adef 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -65,6 +65,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -180,8 +181,8 @@ public void testErrorHandlingInSinkTasks() throws Exception 
{
 // bad json
 ConsumerRecord record2 = new ConsumerRecord<>(TOPIC, 
PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes());
 
-
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record1));
-
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record2));
+
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andReturn(records(record1));
+
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andReturn(records(record2));
 
 sinkTask.put(EasyMock.anyObject());
 EasyMock.expectLastCall().times(2);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 4a7c760fc74..33ab2ef06e0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -58,6 +58,7 @@
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -458,7 +459,7 @@ public void testWakeupInCommitSyncCausesRetry() throws 
Exception {
 sinkTask.open(partitions);
 EasyMock.expectLastCall();
 
-EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andAnswer(

[jira] [Commented] (KAFKA-7225) Kafka Connect ConfigProvider not invoked before validation

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576961#comment-16576961
 ] 

ASF GitHub Bot commented on KAFKA-7225:
---

rhauch opened a new pull request #5489: KAFKA-7225: Corrected system tests by 
generating external properties file
URL: https://github.com/apache/kafka/pull/5489
 
 
   Fix system tests from earlier #5445 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect ConfigProvider not invoked before validation
> --
>
> Key: KAFKA-7225
> URL: https://issues.apache.org/jira/browse/KAFKA-7225
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Nacho Munoz
>Assignee: Robert Yokota
>Priority: Minor
> Fix For: 2.0.1, 2.1.0
>
>
> When trying to register a JDBC connector with externalised secrets (e.g. 
> connection.password) the validation fails and the endpoint returns a 500. I 
> think that the problem is that the config transformer is not being invoked 
> before the validation so trying to exercise the credentials against the 
> database fails. I have checked that publishing the connector configuration 
> directly to the connect-config topic to skip the validation and restarting 
> the server is enough to get the connector working so that confirms that we 
> are just missing to call config transformer before validating the connector. 
> Please let me know if you need further information.
> I'm happy to open a PR to address this issue given that I think that this is 
> easy enough to fix for a new contributor to the project. So please feel free 
> to assign the resolution of the bug to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`

2018-08-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6966.

Resolution: Fixed

> Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
> 
>
> Key: KAFKA-6966
> URL: https://issues.apache.org/jira/browse/KAFKA-6966
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Assignee: Nishanth Pradeep
>Priority: Major
>  Labels: beginner, kip, newbie
> Fix For: 2.1.0
>
>
> With KIP-303, a dynamic routing feature was added and 
> `TopologyDescription.Sink#topic()` returns `null` if this feature is used.
> It would be useful to get the actually used `TopicNameExtractor` class from 
> the `TopologyDescription`.
> We suggest to add `Class 
> TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
> dynamic routing feature is not used.
> KIP-321: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`

2018-08-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6966:
---
Fix Version/s: 2.1.0

> Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
> 
>
> Key: KAFKA-6966
> URL: https://issues.apache.org/jira/browse/KAFKA-6966
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Assignee: Nishanth Pradeep
>Priority: Major
>  Labels: beginner, kip, newbie
> Fix For: 2.1.0
>
>
> With KIP-303, a dynamic routing feature was added and 
> `TopologyDescription.Sink#topic()` returns `null` if this feature is used.
> It would be useful to get the actually used `TopicNameExtractor` class from 
> the `TopologyDescription`.
> We suggest to add `Class 
> TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
> dynamic routing feature is not used.
> KIP-321: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`

2018-08-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6966:
---
Description: 
With KIP-303, a dynamic routing feature was added and 
`TopologyDescription.Sink#topic()` returns `null` if this feature is used.

It would be useful to get the actually used `TopicNameExtractor` class from the 
`TopologyDescription`.

We suggest to add `Class 
TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
dynamic routing feature is not used.

KIP-321: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes]

  was:
With KIP-303, a dynamic routing feature was added and 
`TopologyDescription.Sink#topic()` returns `null` if this feature is used.

It would be useful to get the actually used `TopicNameExtractor` class from the 
`TopologyDescription`.

We suggest to add `Class 
TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
dynamic routing feature is not used.

This is a public API change and requires a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals


> Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
> 
>
> Key: KAFKA-6966
> URL: https://issues.apache.org/jira/browse/KAFKA-6966
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Assignee: Nishanth Pradeep
>Priority: Major
>  Labels: beginner, kip, newbie
>
> With KIP-303, a dynamic routing feature was added and 
> `TopologyDescription.Sink#topic()` returns `null` if this feature is used.
> It would be useful to get the actually used `TopicNameExtractor` class from 
> the `TopologyDescription`.
> We suggest to add `Class 
> TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
> dynamic routing feature is not used.
> KIP-321: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`

2018-08-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6966:
---
Labels: beginner kip newbie  (was: beginner needs-kip newbie)

> Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
> 
>
> Key: KAFKA-6966
> URL: https://issues.apache.org/jira/browse/KAFKA-6966
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Assignee: Nishanth Pradeep
>Priority: Major
>  Labels: beginner, kip, newbie
>
> With KIP-303, a dynamic routing feature was added and 
> `TopologyDescription.Sink#topic()` returns `null` if this feature is used.
> It would be useful to get the actually used `TopicNameExtractor` class from 
> the `TopologyDescription`.
> We suggest to add `Class 
> TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
> dynamic routing feature is not used.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576949#comment-16576949
 ] 

ASF GitHub Bot commented on KAFKA-6966:
---

mjsax closed pull request #5284: KAFKA-6966: Extend TopologyDescription.Sink to 
return TopicNameExtractor
URL: https://github.com/apache/kafka/pull/5284
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 34f66ce53fe..35e1f77fd4c 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -90,6 +90,14 @@ Upgrade Guide and API Changes
 We have also removed some public APIs that are deprecated prior to 
1.0.x in 2.0.0.
 See below for a detailed list of removed APIs.
 
+Streams API changes in 2.1.0
+
+We updated TopologyDescription API to allow for better 
runtime checking.
+Users are encouraged to use #topicSet() and 
#topicPattern() accordingly on 
TopologyDescription.Source nodes,
+instead of using #topics(), which has since been 
deprecated. Similarly, use #topic() and 
#topicNameExtractor()
+to get descriptions of TopologyDescription.Sink nodes. 
For more details, see
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes;>KIP-321.
+
 
 Streams API changes in 2.0.0
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 04a292f9a97..870052d7399 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 
 import java.util.Set;
+import java.util.regex.Pattern;
 
 /**
  * A meta representation of a {@link Topology topology}.
@@ -113,8 +115,22 @@
 /**
  * The topic names this source node is reading from.
  * @return comma separated list of topic names or pattern (as String)
+ * @deprecated use {@link #topicSet()} or {@link #topicPattern()} 
instead
  */
+@Deprecated
 String topics();
+
+/**
+ * The topic names this source node is reading from.
+ * @return a set of topic names
+ */
+Set topicSet();
+
+/**
+ * The pattern used to match topic names that is reading from.
+ * @return the pattern used to match topic names
+ */
+Pattern topicPattern();
 }
 
 /**
@@ -134,10 +150,17 @@
 interface Sink extends Node {
 /**
  * The topic name this sink node is writing to.
- * Could be null if the topic name can only be dynamically determined 
based on {@code TopicNameExtractor}
+ * Could be {@code null} if the topic name can only be dynamically 
determined based on {@link TopicNameExtractor}
  * @return a topic name
  */
 String topic();
+
+/**
+ * The {@link TopicNameExtractor} class that this sink node uses to 
dynamically extract the topic name to write to.
+ * Could be {@code null} if the topic name is not dynamically 
determined.
+ * @return the {@link TopicNameExtractor} class used get the topic name
+ */
+TopicNameExtractor topicNameExtractor();
 }
 
 /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 250105ad2a3..2944f6ba29b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -282,15 +282,7 @@ private boolean isMatch(final String topic) {
 
 @Override
 Source describe() {
-final String sourceTopics;
-
-if (pattern == null) {
-sourceTopics = topics.toString();
-} else {
-sourceTopics = pattern.toString();
-}
-
-return new Source(name, sourceTopics);
+return new Source(name, new HashSet<>(topics), pattern);
 }
 }
 
@@ -1337,7 +1329,7 @@ public GlobalStore(final String sourceName,
final String storeName,
final String topicName,
   

[jira] [Comment Edited] (KAFKA-6571) KafkaProducer.close(0) should be non-blocking

2018-08-10 Thread Ahmed Al-Mehdi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576931#comment-16576931
 ] 

Ahmed Al-Mehdi edited comment on KAFKA-6571 at 8/10/18 10:35 PM:
-

After discussion with [~lindong], assigning bug to self.


was (Author: ahmeda):
After discussion with Dong, assigning bug to self.

> KafkaProducer.close(0) should be non-blocking
> -
>
> Key: KAFKA-6571
> URL: https://issues.apache.org/jira/browse/KAFKA-6571
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Ahmed Al-Mehdi
>Priority: Major
>
> According to the Java doc of producer.close(long timeout, TimeUnit timeUnit), 
> it is said that "Specifying a timeout of zero means do not wait for pending 
> send requests to complete". However, producer.close(0) can currently block on 
> waiting for the sender thread to exit, which in turn can block on user's 
> callback.
> We probably should not let producer.close(0) join the sender thread if user 
> has specified zero timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6571) KafkaProducer.close(0) should be non-blocking

2018-08-10 Thread Ahmed Al-Mehdi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576931#comment-16576931
 ] 

Ahmed Al-Mehdi commented on KAFKA-6571:
---

After discussion with Dong, assigning bug to self.

> KafkaProducer.close(0) should be non-blocking
> -
>
> Key: KAFKA-6571
> URL: https://issues.apache.org/jira/browse/KAFKA-6571
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> According to the Java doc of producer.close(long timeout, TimeUnit timeUnit), 
> it is said that "Specifying a timeout of zero means do not wait for pending 
> send requests to complete". However, producer.close(0) can currently block on 
> waiting for the sender thread to exit, which in turn can block on user's 
> callback.
> We probably should not let producer.close(0) join the sender thread if user 
> has specified zero timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6571) KafkaProducer.close(0) should be non-blocking

2018-08-10 Thread Ahmed Al-Mehdi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Al-Mehdi reassigned KAFKA-6571:
-

Assignee: Ahmed Al-Mehdi  (was: Dong Lin)

> KafkaProducer.close(0) should be non-blocking
> -
>
> Key: KAFKA-6571
> URL: https://issues.apache.org/jira/browse/KAFKA-6571
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Ahmed Al-Mehdi
>Priority: Major
>
> According to the Java doc of producer.close(long timeout, TimeUnit timeUnit), 
> it is said that "Specifying a timeout of zero means do not wait for pending 
> send requests to complete". However, producer.close(0) can currently block on 
> waiting for the sender thread to exit, which in turn can block on user's 
> callback.
> We probably should not let producer.close(0) join the sender thread if user 
> has specified zero timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6998) Remove caching wrapper stores if cache-size is configured to zero bytes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576926#comment-16576926
 ] 

ASF GitHub Bot commented on KAFKA-6998:
---

guozhangwang opened a new pull request #5488: KAFKA-6998: Disable Caching when 
max.cache.bytes are zero.
URL: https://github.com/apache/kafka/pull/5488
 
 
   1. As titled, add a `rewriteTopology` that 1) sets application id, 2) maybe 
disable caching, 3) adjust for source KTable. This optimization can hence be 
applied for both DSL or PAPI generated Topology.
   
   2. Defer the building of globalStateStores in `rewriteTopology` so that we 
can also disable caching. But we still need to build the state stores before 
`InternalTopologyBuilder.build()` since we should only build global stores once 
for all threads.
   
   3. Added withCachingDisabled to StoreBuilder, it is a public API change.
   
   4. [Optional] Fixed unit test config setting functionalities, and set the 
necessary config to shorten the unit test latency (now it reduces from 5min to 
3.5min on my laptop).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove caching wrapper stores if cache-size is configured to zero bytes
> ---
>
> Key: KAFKA-6998
> URL: https://issues.apache.org/jira/browse/KAFKA-6998
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Users can disable caching globally by setting the cache size to zero in their 
> config. However, this does only effectively disable the caching layer, but 
> the code is still in place.
> We should consider to remove the caching wrappers completely for this case. 
> The tricky part is, that we insert the caching layer at compile time, ie, 
> when calling `StreamsBuilder#build()` – at this point, we don't know the 
> configuration yet. Thus, we need to find a way to rewrite the topology after 
> it is passed to `KafkaStreams` if case caching size is set to zero.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7147) Allow kafka-reassign-partitions.sh and kafka-log-dirs.sh to take admin client property file

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576871#comment-16576871
 ] 

ASF GitHub Bot commented on KAFKA-7147:
---

lindong28 closed pull request #5355: KAFKA-7147; ReassignPartitionsCommand 
should be able to connect to broker over SSL
URL: https://github.com/apache/kafka/pull/5355
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala 
b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
index 9257942d1c0..b51f25d6481 100644
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
 import scala.collection.Map
 import kafka.utils.{CommandLineUtils, Json}
 import joptsimple._
+import org.apache.kafka.common.utils.Utils
 
 /**
   * A command for querying log directory usage on the specified brokers
@@ -83,9 +84,12 @@ object LogDirsCommand {
 }
 
 private def createAdminClient(opts: LogDirsCommandOptions): JAdminClient = 
{
-val props = new Properties()
+val props = if (opts.options.has(opts.commandConfigOpt))
+Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+else
+new Properties()
 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
-props.put(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
+props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
 JAdminClient.create(props)
 }
 
@@ -95,6 +99,10 @@ object LogDirsCommand {
   .withRequiredArg
   .describedAs("The server(s) to use for bootstrapping")
   .ofType(classOf[String])
+val commandConfigOpt = parser.accepts("command-config", "Property file 
containing configs to be passed to Admin Client.")
+  .withRequiredArg
+  .describedAs("Admin client property file")
+  .ofType(classOf[String])
 val describeOpt = parser.accepts("describe", "Describe the specified 
log directories on the specified brokers.")
 val topicListOpt = parser.accepts("topic-list", "The list of topics to 
be queried in the form \"topic1,topic2,topic3\". " +
   "All topics will be queried if no topic list is specified")
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 4d9da90bc69..dab34a69267 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -72,9 +72,12 @@ object ReassignPartitionsCommand extends Logging {
 
   private def createAdminClient(opts: ReassignPartitionsCommandOptions): 
Option[JAdminClient] = {
 if (opts.options.has(opts.bootstrapServerOpt)) {
-  val props = new Properties()
+  val props = if (opts.options.has(opts.commandConfigOpt))
+Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+  else
+new Properties()
   props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
-  props.put(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool")
+  props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, 
"reassign-partitions-tool")
   Some(JAdminClient.create(props))
 } else {
   None
@@ -456,6 +459,10 @@ object ReassignPartitionsCommand extends Logging {
   .withRequiredArg
   .describedAs("Server(s) to use for bootstrapping")
   .ofType(classOf[String])
+val commandConfigOpt = parser.accepts("command-config", "Property file 
containing configs to be passed to Admin Client.")
+  .withRequiredArg
+  .describedAs("Admin client property file")
+  .ofType(classOf[String])
 val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection 
string for the zookeeper connection in the " +
   "form host:port. Multiple URLS can be given to allow 
fail-over.")
   .withRequiredArg


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow kafka-reassign-partitions.sh and kafka-log-dirs.sh to take admin client 
> property file
> 

[jira] [Resolved] (KAFKA-7147) Allow kafka-reassign-partitions.sh and kafka-log-dirs.sh to take admin client property file

2018-08-10 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7147.
-
Resolution: Fixed

> Allow kafka-reassign-partitions.sh and kafka-log-dirs.sh to take admin client 
> property file
> ---
>
> Key: KAFKA-7147
> URL: https://issues.apache.org/jira/browse/KAFKA-7147
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently both ReassignPartitionsCommand and LogDirsCommand instantiates 
> AdminClient using bootstrap.servers and client.id provided by the user. Since 
> it does not provide other ssl-related properties, these tools will not be 
> able to talk to broker over SSL.
> In order to solve this problem, these tools should allow users to provide 
> property file containing configs to be passed to AdminClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times

2018-08-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7277:
---
Labels: beginner needs-kip newbie  (was: needs-kip)

> Migrate Streams API to Duration instead of longMs times
> ---
>
> Key: KAFKA-7277
> URL: https://issues.apache.org/jira/browse/KAFKA-7277
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> Right now Streams API unversally represents time as ms-since-unix-epoch.
> There's nothing wrong, per se, with this, but Duration is more ergonomic for 
> an API.
> What we don't want is to present a heterogeneous API, so we need to make sure 
> the whole Streams API is in terms of Duration.
>  
> Implementation note: Durations potentially worsen memory pressure and gc 
> performance, so internally, we will still use longMs as the representation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times

2018-08-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7277:
---
Description: 
Right now Streams API unversally represents time as ms-since-unix-epoch.

There's nothing wrong, per se, with this, but Duration is more ergonomic for an 
API.

What we don't want is to present a heterogeneous API, so we need to make sure 
the whole Streams API is in terms of Duration.

 

Implementation note: Durations potentially worsen memory pressure and gc 
performance, so internally, we will still use longMs as the representation. 

KIP instructuions: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

  was:
Right now Streams API unversally represents time as ms-since-unix-epoch.

There's nothing wrong, per se, with this, but Duration is more ergonomic for an 
API.

What we don't want is to present a heterogeneous API, so we need to make sure 
the whole Streams API is in terms of Duration.

 

Implementation note: Durations potentially worsen memory pressure and gc 
performance, so internally, we will still use longMs as the representation. 


> Migrate Streams API to Duration instead of longMs times
> ---
>
> Key: KAFKA-7277
> URL: https://issues.apache.org/jira/browse/KAFKA-7277
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> Right now Streams API unversally represents time as ms-since-unix-epoch.
> There's nothing wrong, per se, with this, but Duration is more ergonomic for 
> an API.
> What we don't want is to present a heterogeneous API, so we need to make sure 
> the whole Streams API is in terms of Duration.
>  
> Implementation note: Durations potentially worsen memory pressure and gc 
> performance, so internally, we will still use longMs as the representation. 
> KIP instructuions: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times

2018-08-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7277:
---
Labels: needs-kip  (was: )

> Migrate Streams API to Duration instead of longMs times
> ---
>
> Key: KAFKA-7277
> URL: https://issues.apache.org/jira/browse/KAFKA-7277
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Right now Streams API unversally represents time as ms-since-unix-epoch.
> There's nothing wrong, per se, with this, but Duration is more ergonomic for 
> an API.
> What we don't want is to present a heterogeneous API, so we need to make sure 
> the whole Streams API is in terms of Duration.
>  
> Implementation note: Durations potentially worsen memory pressure and gc 
> performance, so internally, we will still use longMs as the representation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times

2018-08-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7277:
---
Component/s: streams

> Migrate Streams API to Duration instead of longMs times
> ---
>
> Key: KAFKA-7277
> URL: https://issues.apache.org/jira/browse/KAFKA-7277
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Right now Streams API unversally represents time as ms-since-unix-epoch.
> There's nothing wrong, per se, with this, but Duration is more ergonomic for 
> an API.
> What we don't want is to present a heterogeneous API, so we need to make sure 
> the whole Streams API is in terms of Duration.
>  
> Implementation note: Durations potentially worsen memory pressure and gc 
> performance, so internally, we will still use longMs as the representation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times

2018-08-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576854#comment-16576854
 ] 

John Roesler commented on KAFKA-7277:
-

I recommend that we validate each duration argument using a class like this:
{noformat}
package org.apache.kafka.streams.kstream;

import java.time.Duration;

final class ApiUtils {
private ApiUtils() {}

static Duration validateMillisecondDuration(final Duration duration, final 
String valueName) {
try {
//noinspection ResultOfMethodCallIgnored
duration.toMillis();
return duration;
} catch (final ArithmeticException e) {
throw new IllegalArgumentException(
valueName + " must be expressible in milliseconds (" + duration 
+ " is too big)",
e
);
}
}
}{noformat}
Otherwise, we will wind up throwing ArithmeticException randomly with little 
explanation.

> Migrate Streams API to Duration instead of longMs times
> ---
>
> Key: KAFKA-7277
> URL: https://issues.apache.org/jira/browse/KAFKA-7277
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Priority: Major
>
> Right now Streams API unversally represents time as ms-since-unix-epoch.
> There's nothing wrong, per se, with this, but Duration is more ergonomic for 
> an API.
> What we don't want is to present a heterogeneous API, so we need to make sure 
> the whole Streams API is in terms of Duration.
>  
> Implementation note: Durations potentially worsen memory pressure and gc 
> performance, so internally, we will still use longMs as the representation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times

2018-08-10 Thread John Roesler (JIRA)
John Roesler created KAFKA-7277:
---

 Summary: Migrate Streams API to Duration instead of longMs times
 Key: KAFKA-7277
 URL: https://issues.apache.org/jira/browse/KAFKA-7277
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Right now Streams API unversally represents time as ms-since-unix-epoch.

There's nothing wrong, per se, with this, but Duration is more ergonomic for an 
API.

What we don't want is to present a heterogeneous API, so we need to make sure 
the whole Streams API is in terms of Duration.

 

Implementation note: Durations potentially worsen memory pressure and gc 
performance, so internally, we will still use longMs as the representation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6998) Remove caching wrapper stores if cache-size is configured to zero bytes

2018-08-10 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576701#comment-16576701
 ] 

Guozhang Wang commented on KAFKA-6998:
--

Actually when I was looking into the code, I realized it is not really blocked 
on `internalTopologyBuilder.build()`. I'll provide a quick draft PR to 
illustrate my idea.

> Remove caching wrapper stores if cache-size is configured to zero bytes
> ---
>
> Key: KAFKA-6998
> URL: https://issues.apache.org/jira/browse/KAFKA-6998
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Users can disable caching globally by setting the cache size to zero in their 
> config. However, this does only effectively disable the caching layer, but 
> the code is still in place.
> We should consider to remove the caching wrappers completely for this case. 
> The tricky part is, that we insert the caching layer at compile time, ie, 
> when calling `StreamsBuilder#build()` – at this point, we don't know the 
> configuration yet. Thus, we need to find a way to rewrite the topology after 
> it is passed to `KafkaStreams` if case caching size is set to zero.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7276) Consider using re2j to speed up regex operations

2018-08-10 Thread Ted Yu (JIRA)
Ted Yu created KAFKA-7276:
-

 Summary: Consider using re2j to speed up regex operations
 Key: KAFKA-7276
 URL: https://issues.apache.org/jira/browse/KAFKA-7276
 Project: Kafka
  Issue Type: Task
Reporter: Ted Yu


https://github.com/google/re2j

re2j claims to do linear time regular expression matching in Java.

Its benefit is most obvious for deeply nested regex (such as a | b | c | d).

We should consider using re2j to speed up regex operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7275) Prototype lock-free metrics

2018-08-10 Thread John Roesler (JIRA)
John Roesler created KAFKA-7275:
---

 Summary: Prototype lock-free metrics
 Key: KAFKA-7275
 URL: https://issues.apache.org/jira/browse/KAFKA-7275
 Project: Kafka
  Issue Type: Improvement
  Components: metrics, streams
Reporter: John Roesler
Assignee: John Roesler


Currently, we have to be a little conservative in how granularly we measure 
things to avoid heavy synchronization costs in the metrics.

It should be possible to refactor the thread-safe implementation to use 
volatile and java.util.concurrent.atomic instead and realize a pretty large 
performance improvement.

However, before investing too much time in it, we should run some benchmarks to 
gauge how much improvement we can expect.

I'd propose to run the benchmarks on trunk with debug turned on, and then to 
just remove all synchronization and run again to get an upper-bound performance 
improvement.

If the results are promising, we can start prototyping a lock-free 
implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6998) Remove caching wrapper stores if cache-size is configured to zero bytes

2018-08-10 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576462#comment-16576462
 ] 

Guozhang Wang commented on KAFKA-6998:
--

Yes, that's exactly my plan :)

> Remove caching wrapper stores if cache-size is configured to zero bytes
> ---
>
> Key: KAFKA-6998
> URL: https://issues.apache.org/jira/browse/KAFKA-6998
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Users can disable caching globally by setting the cache size to zero in their 
> config. However, this does only effectively disable the caching layer, but 
> the code is still in place.
> We should consider to remove the caching wrappers completely for this case. 
> The tricky part is, that we insert the caching layer at compile time, ie, 
> when calling `StreamsBuilder#build()` – at this point, we don't know the 
> configuration yet. Thus, we need to find a way to rewrite the topology after 
> it is passed to `KafkaStreams` if case caching size is set to zero.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6998) Remove caching wrapper stores if cache-size is configured to zero bytes

2018-08-10 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576459#comment-16576459
 ] 

Matthias J. Sax commented on KAFKA-6998:


Just realized, that with the new optimization layer, we have access to the 
config and can check if the value is set to zero: for this case, we can call 
`disableCaching()` on all stores before the build the topology.

> Remove caching wrapper stores if cache-size is configured to zero bytes
> ---
>
> Key: KAFKA-6998
> URL: https://issues.apache.org/jira/browse/KAFKA-6998
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Users can disable caching globally by setting the cache size to zero in their 
> config. However, this does only effectively disable the caching layer, but 
> the code is still in place.
> We should consider to remove the caching wrappers completely for this case. 
> The tricky part is, that we insert the caching layer at compile time, ie, 
> when calling `StreamsBuilder#build()` – at this point, we don't know the 
> configuration yet. Thus, we need to find a way to rewrite the topology after 
> it is passed to `KafkaStreams` if case caching size is set to zero.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7269) KStream.merge is not documented

2018-08-10 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576449#comment-16576449
 ] 

Matthias J. Sax commented on KAFKA-7269:


I am not sure about "union" – union is a set operation, thus, it actually does 
not make sense mathematically, because "union" removes duplicates. (Even if 
most system don't enforce this mathematical property by default but required to 
specify a "distinct" keyword for this.)

> KStream.merge is not documented
> ---
>
> Key: KAFKA-7269
> URL: https://issues.apache.org/jira/browse/KAFKA-7269
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0
>Reporter: John Roesler
>Priority: Major
>  Labels: beginner, newbie
>
> If I understand the operator correctly, it should be documented as a 
> stateless transformation at 
> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7269) KStream.merge is not documented

2018-08-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576361#comment-16576361
 ] 

John Roesler commented on KAFKA-7269:
-

Hi [~virgilp], 

Thanks for the tip!

It's not in scope to rename the operator right now, but whoever picks this up 
can bear that in mind when they write the docs.

-John

> KStream.merge is not documented
> ---
>
> Key: KAFKA-7269
> URL: https://issues.apache.org/jira/browse/KAFKA-7269
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0
>Reporter: John Roesler
>Priority: Major
>  Labels: beginner, newbie
>
> If I understand the operator correctly, it should be documented as a 
> stateless transformation at 
> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7119) Intermittent test failure with GSSAPI authentication failure

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576023#comment-16576023
 ] 

ASF GitHub Bot commented on KAFKA-7119:
---

rajinisivaram opened a new pull request #5487: KAFKA-7119: Handle transient 
Kerberos errors as non-fatal exceptions
URL: https://github.com/apache/kafka/pull/5487
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Intermittent test failure with GSSAPI authentication failure
> 
>
> Key: KAFKA-7119
> URL: https://issues.apache.org/jira/browse/KAFKA-7119
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> I have seen this failure a couple of times in builds (e.g. 
> [https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/2412/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/)]
> {quote}
> org.apache.kafka.common.errors.SaslAuthenticationException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Request is a replay (34) - Request is a replay)]) occurred 
> when evaluating SASL token received from the Kafka Broker. Kafka Client will 
> go to AUTHENTICATION_FAILED state. Caused by: 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Request is a 
> replay (34) - Request is a replay)] at 
> jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356)
>  at java.base/java.security.AccessController.doPrivileged(Native Method) at 
> java.base/javax.security.auth.Subject.doAs(Subject.java:423) at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205)
>  at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) 
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) 
> at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) 
> at 
> kafka.api.AdminClientIntegrationTest.$anonfun$subscribeAndWaitForAssignment$2(AdminClientIntegrationTest.scala:980)
>  at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:781) at 
> kafka.api.AdminClientIntegrationTest.subscribeAndWaitForAssignment(AdminClientIntegrationTest.scala:979)
>  at 
> kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpoint(AdminClientIntegrationTest.scala:755)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> 

[jira] [Assigned] (KAFKA-7119) Intermittent test failure with GSSAPI authentication failure

2018-08-10 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reassigned KAFKA-7119:
-

Assignee: Rajini Sivaram

> Intermittent test failure with GSSAPI authentication failure
> 
>
> Key: KAFKA-7119
> URL: https://issues.apache.org/jira/browse/KAFKA-7119
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> I have seen this failure a couple of times in builds (e.g. 
> [https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/2412/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/)]
> {quote}
> org.apache.kafka.common.errors.SaslAuthenticationException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Request is a replay (34) - Request is a replay)]) occurred 
> when evaluating SASL token received from the Kafka Broker. Kafka Client will 
> go to AUTHENTICATION_FAILED state. Caused by: 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Request is a 
> replay (34) - Request is a replay)] at 
> jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356)
>  at java.base/java.security.AccessController.doPrivileged(Native Method) at 
> java.base/javax.security.auth.Subject.doAs(Subject.java:423) at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268)
>  at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205)
>  at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) 
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) 
> at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) 
> at 
> kafka.api.AdminClientIntegrationTest.$anonfun$subscribeAndWaitForAssignment$2(AdminClientIntegrationTest.scala:980)
>  at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:781) at 
> kafka.api.AdminClientIntegrationTest.subscribeAndWaitForAssignment(AdminClientIntegrationTest.scala:979)
>  at 
> kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpoint(AdminClientIntegrationTest.scala:755)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at 
> 

[jira] [Comment Edited] (KAFKA-6701) synchronize Log modification between delete cleanup and async delete

2018-08-10 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575873#comment-16575873
 ] 

M. Manna edited comment on KAFKA-6701 at 8/10/18 7:30 AM:
--

[~lindong] I believe this might also be a fix for 
https://issues.apache.org/jira/browse/KAFKA-6188 . But the issue still occurs 
on Windows as of 2.11-1.1.0 release.


was (Author: manme...@gmail.com):
[~lindong] I believe this might also be a fix for 
https://issues.apache.org/jira/browse/KAFKA-6188 .

> synchronize Log modification between delete cleanup and async delete
> 
>
> Key: KAFKA-6701
> URL: https://issues.apache.org/jira/browse/KAFKA-6701
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
>Priority: Major
>
> Kafka broker crashes without any evident disk failures 
> From [~becket_qin]: This looks a bug in kafka when topic deletion and log 
> retention cleanup happen at the same time, the log retention cleanup may see 
> ClosedChannelException after the log has been renamed for async deletion.
> The root cause is that the topic deletion should have set the isClosed flag 
> of the partition log to true and the retention should not bother to do the 
> old log segments deletion when the log is closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6701) synchronize Log modification between delete cleanup and async delete

2018-08-10 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575873#comment-16575873
 ] 

M. Manna commented on KAFKA-6701:
-

[~lindong] I believe this might also be a fix for 
https://issues.apache.org/jira/browse/KAFKA-6188 .

> synchronize Log modification between delete cleanup and async delete
> 
>
> Key: KAFKA-6701
> URL: https://issues.apache.org/jira/browse/KAFKA-6701
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
>Priority: Major
>
> Kafka broker crashes without any evident disk failures 
> From [~becket_qin]: This looks a bug in kafka when topic deletion and log 
> retention cleanup happen at the same time, the log retention cleanup may see 
> ClosedChannelException after the log has been renamed for async deletion.
> The root cause is that the topic deletion should have set the isClosed flag 
> of the partition log to true and the retention should not bother to do the 
> old log segments deletion when the log is closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6701) synchronize Log modification between delete cleanup and async delete

2018-08-10 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6701.
-
Resolution: Fixed

The issue appears to have been fixed in 
https://issues.apache.org/jira/browse/KAFKA-5163. More specifically, 
https://issues.apache.org/jira/browse/KAFKA-5163 added method `Log.renameDir()` 
and this method will grab the per-log lock before making modification to the 
log's directory etc.

> synchronize Log modification between delete cleanup and async delete
> 
>
> Key: KAFKA-6701
> URL: https://issues.apache.org/jira/browse/KAFKA-6701
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
>Priority: Major
>
> Kafka broker crashes without any evident disk failures 
> From [~becket_qin]: This looks a bug in kafka when topic deletion and log 
> retention cleanup happen at the same time, the log retention cleanup may see 
> ClosedChannelException after the log has been renamed for async deletion.
> The root cause is that the topic deletion should have set the isClosed flag 
> of the partition log to true and the retention should not bother to do the 
> old log segments deletion when the log is closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7269) KStream.merge is not documented

2018-08-10 Thread Virgil Palanciuc (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575845#comment-16575845
 ] 

Virgil Palanciuc commented on KAFKA-7269:
-

Might I suggest to add "union" as an alternative to "merge"? "union" is the 
Spark operator, and it's also the mathematical operator that makes most sense, 
IMO.

> KStream.merge is not documented
> ---
>
> Key: KAFKA-7269
> URL: https://issues.apache.org/jira/browse/KAFKA-7269
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0
>Reporter: John Roesler
>Priority: Major
>  Labels: beginner, newbie
>
> If I understand the operator correctly, it should be documented as a 
> stateless transformation at 
> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)