[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651091#comment-16651091
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---

mjsax closed pull request #5787: KAFKA-7223: Suppression documentation
URL: https://github.com/apache/kafka/pull/5787
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/ops.html b/docs/ops.html
index d57f1cf20ff..158602b96a1 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -257,7 +257,7 @@ Managing C
 
   
 
-  Please note, that out of range offsets will be adjusted to available offset 
end. For example, if offset end is at 10 and offset shift request is 
+  Please note, that out of range offsets will be adjusted to available offset 
end. For example, if offset end is at 10 and offset shift request is
   of 15, then, offset at 10 will actually be selected.
 
   
@@ -1546,6 +1546,16 @@ 
 The total number of commit calls. 
 
kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)
   
+  
+record-lateness-avg
+The average observed lateness of records.
+
kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)
+  
+  
+record-lateness-max
+The max observed lateness of records.
+
kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)
+  
  
 
 
diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index 1622702c540..0ff28a8db69 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -58,6 +58,7 @@
 Hopping time windows
 Sliding time windows
 Session Windows
+Window Final Results
 
 
 
@@ -65,6 +66,7 @@
 Applying processors and transformers (Processor API 
integration)
 
 
+Controlling KTable update rate
 Writing streams back to 
Kafka
 Testing a Streams application
 Kafka Streams DSL for Scala
@@ -2969,6 +2971,73 @@ 
 t=5 (blue), which lead to a merge of sessions and an extension of a session, 
respectively.
 
 
+   
+   Window Final Results
+   In Kafka Streams, windowed computations update 
their results continuously.
+  As new data arrives for a window, freshly 
computed results are emitted downstream.
+  For many applications, this is ideal, since 
fresh results are always available.
+  and Kafka Streams is designed to make 
programming continuous computations seamless.
+  However, some applications need to take action 
only on the final result of a windowed computation.
+  Common examples of this are sending alerts or 
delivering results to a system that doesn't support updates.
+   
+   Suppose that you have an hourly windowed count 
of events per user.
+  If you want to send an alert when a user has 
less than three events in an hour, you have a real challange.
+  All users would match this condition at first, 
until they accrue enough events, so you cannot simply
+  send an alert when someone matches the 
condition; you have to wait until you know you won't see any more events for a 
particular window
+  and then send the alert.
+   
+Kafka Streams offers a clean way to define this 
logic: after defining your windowed computation, you can
+  suppress the 
intermediate results, emitting the final count for each user when the window is 
closed.
+   
+   For example:
+   
+
+KGroupedStreamUserId, Event grouped = ...;
+grouped
+.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
+.count()
+.suppress(Suppressed.untilWindowCloses(unbounded()))
+.filter((windowedUserId, count) - count  3)
+.toStream()
+.foreach((windowedUserId, count) - sendAlert(windowedUserId.window(), 
windowedUserId.key(), count));
+
+   
+   The key parts of this program are:
+

[jira] [Commented] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651087#comment-16651087
 ] 

ASF GitHub Bot commented on KAFKA-7080:
---

mjsax closed pull request #5804: KAFKA-7080 and KAFKA-7222: Cleanup overlapping 
KIP changes
URL: https://github.com/apache/kafka/pull/5804
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index b26f3c339cf..c0f28efe98b 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -134,6 +134,18 @@ Streams API
 see https://cwiki.apache.org/confluence/display/KAFKA/KIP-324%3A+Add+method+to+get+metrics%28%29+in+AdminClient;>KIP-324
 
 
+
+We deprecated the notion of segments in window stores as those are 
intended to be an implementation details.
+Thus, method Windows#segments() and variable 
Windows#segments were deprecated.
+If you implement custom windows, you should update your code 
accordingly.
+Similarly, WindowBytesStoreSupplier#segments() was 
deprecated and replaced with 
WindowBytesStoreSupplier#segmentInterval().
+If you implement custom window store, you need to update your code 
accordingly.
+   Finally, Stores#persistentWindowStore(...) were deprecated 
and replaced with a new overload that does not allow to specify the number of 
segments any longer.
+For more details, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier;>KIP-319
+(note: https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables;>KIP-328
 and 
+   https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times;>KIP-358
 'overlap' with KIP-319).
+
+
 Streams API changes in 2.0.0
 
 In 2.0.0 we have added a few new APIs on the 
ReadOnlyWindowStore interface (for details please read Streams API changes below).
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 4dfba2306cb..feaee1e1336 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -83,21 +83,6 @@ public long maintainMs() {
 return maintainDurationMs;
 }
 
-/**
- * Return the segment interval in milliseconds.
- *
- * @return the segment interval
- * @deprecated since 2.1. Instead, directly configure the segment interval 
in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
- */
-@Deprecated
-public long segmentInterval() {
-// Pinned arbitrarily to a minimum of 60 seconds. Profiling may 
indicate a different value is more efficient.
-final long minimumSegmentInterval = 60_000L;
-// Scaled to the (possibly overridden) retention period
-return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
-}
-
-
 /**
  * Set the number of segments to be used for rolling the window store.
  * This function is not exposed to users but can be called by developers 
that extend this class.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 30e51403714..f7a182472be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state;
 
-import java.time.Duration;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -32,6 +31,7 @@
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 
+import java.time.Duration;
 import java.util.Objects;
 
 /**
@@ -155,7 +155,7 @@ public String metricsScope() {
  *  careful to set it the same as the windowed 
keys you're actually storing.
  * @param retainDuplicates  whether or not to retain duplicates.
  * @return an instance of {@link WindowBytesStoreSupplier}
- * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, 
long, long, boolean, long)} instead
+ * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, 
Duration, Duration, boolean)} instead
  */
 

[jira] [Commented] (KAFKA-6764) ConsoleConsumer behavior inconsistent when specifying --partition with --from-beginning

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651075#comment-16651075
 ] 

ASF GitHub Bot commented on KAFKA-6764:
---

cmccabe closed pull request #5637: MINOR : Fixed KAFKA-6764; Update usage for 
console-consumer whitelist option
URL: https://github.com/apache/kafka/pull/5637
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 365652a75b5..06705d59219 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -65,6 +65,7 @@ object ConsoleConsumer extends Logging {
   def run(conf: ConsumerConfig) {
 val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
 val consumer = new KafkaConsumer(consumerProps(conf), new 
ByteArrayDeserializer, new ByteArrayDeserializer)
+
 val consumerWrapper =
   if (conf.partitionArg.isDefined)
 new ConsumerWrapper(Option(conf.topicArg), conf.partitionArg, 
Option(conf.offsetArg), None, consumer, timeoutMs)
@@ -194,7 +195,7 @@ object ConsoleConsumer extends Logging {
   .withRequiredArg
   .describedAs("topic")
   .ofType(classOf[String])
-val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to 
include for consumption.")
+val whitelistOpt = parser.accepts("whitelist", "Regular expression 
specifying whitelist of topics to include for consumption.")
   .withRequiredArg
   .describedAs("whitelist")
   .ofType(classOf[String])
@@ -355,7 +356,7 @@ object ConsoleConsumer extends Logging {
 val groupIdsProvided = Set(
   Option(options.valueOf(groupIdOpt)), // via --group
   Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)), // via 
--consumer-property
-  Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via 
--cosumer.config
+  Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via 
--consumer.config
 ).flatten
 
 if (groupIdsProvided.size > 1) {
@@ -376,6 +377,9 @@ object ConsoleConsumer extends Logging {
 groupIdPassed = false
 }
 
+if (groupIdPassed && partitionArg.isDefined)
+  CommandLineUtils.printUsageAndDie(parser, "Options group and partition 
cannot be specified together.")
+
 def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
   try
 parser.parse(args: _*)
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 47b7fae3d9b..cdc146f3666 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -432,4 +432,67 @@ class ConsoleConsumerTest {
 
assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
   }
 
+  @Test
+  def shouldParseGroupIdFromBeginningGivenTogether() {
+// Start from earliest
+var args: Array[String] = Array(
+  "--bootstrap-server", "localhost:9092",
+  "--topic", "test",
+  "--group", "test-group",
+  "--from-beginning")
+
+var config = new ConsoleConsumer.ConsumerConfig(args)
+assertEquals("localhost:9092", config.bootstrapServer)
+assertEquals("test", config.topicArg)
+assertEquals(-2, config.offsetArg)
+assertEquals(true, config.fromBeginning)
+
+// Start from latest
+args = Array(
+  "--bootstrap-server", "localhost:9092",
+  "--topic", "test",
+  "--group", "test-group"
+)
+
+config = new ConsoleConsumer.ConsumerConfig(args)
+assertEquals("localhost:9092", config.bootstrapServer)
+assertEquals("test", config.topicArg)
+assertEquals(-1, config.offsetArg)
+assertEquals(false, config.fromBeginning)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldExitOnGroupIdAndPartitionGivenTogether() {
+Exit.setExitProcedure((_, message) => throw new 
IllegalArgumentException(message.orNull))
+//Given
+val args: Array[String] = Array(
+  "--bootstrap-server", "localhost:9092",
+  "--topic", "test",
+  "--group", "test-group",
+  "--partition", "0")
+
+//When
+try {
+  new ConsoleConsumer.ConsumerConfig(args)
+} finally {
+  Exit.resetExitProcedure()
+}
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldExitOnOffsetWithoutPartition() {
+Exit.setExitProcedure((_, message) => throw new 
IllegalArgumentException(message.orNull))
+//Given
+val args: Array[String] = Array(
+  "--bootstrap-server", "localhost:9092",
+

[jira] [Commented] (KAFKA-7396) KIP-365: Materialized, Serialized, Joined, Consumed and Produced with implicit Serde

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651023#comment-16651023
 ] 

ASF GitHub Bot commented on KAFKA-7396:
---

vvcephei closed pull request #5803: KAFKA-7396: document implicit Grouped
URL: https://github.com/apache/kafka/pull/5803
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index 1622702c540..37b6947a4dc 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3355,18 +3355,18 @@ 
   }
 }
   
-  In the above code snippet, we don't have to provide any 
SerDes, Serialized, Produced, Consumed or Joined explicitly. They will also not be dependent on 
any SerDes specified in the config.  In fact all SerDes specified in 
the config will be ignored by the Scala APIs. All SerDes and Serialized, Produced, Consumed or Joined will be handled 
through implicit SerDes as discussed later in the Implicit SerDes section. The complete 
independence from configuration based SerDes is what makes this library 
completely typesafe.  Any missing instances of SerDes, Serialized, Produced, Consumed or Joined will be flagged as a compile 
time error.
+  In the above code snippet, we don't have to provide any 
SerDes, Grouped, 
Produced, Consumed or Joined explicitly. 
They will also not be dependent on any SerDes specified in the config.  
In fact all SerDes specified in the config will be ignored by the Scala 
APIs. All SerDes and Grouped, Produced, Consumed or Joined will be handled through implicit SerDes as 
discussed later in the Implicit SerDes 
section. The complete independence from configuration based SerDes is what 
makes this library completely typesafe.  Any missing instances of SerDes, Grouped, Produced, Consumed or Joined will be flagged 
as a compile time error.
 
 
   Implicit SerDes
-  One of the common complaints of Scala users with the Java API 
has been the repetitive usage of the SerDes in API invocations. Many of the 
APIs need to take the SerDes through abstractions like Serialized, Produced, Consumed or Joined. And the user has to supply 
them every time through the with function of these classes.
-  The library uses the power of https://docs.scala-lang.org/tour/implicit-parameters.html;>Scala implicit 
parameters to alleviate this concern. As a user you can provide implicit 
SerDes or implicit values of Serialized, Produced, Consumed or Joined once and make your code less verbose. In fact 
you can just have the implicit SerDes in scope and the library will make the 
instances of Serialized, Produced, Consumed or Joined available in scope.
+  One of the common complaints of Scala users with the Java API 
has been the repetitive usage of the SerDes in API invocations. Many of the 
APIs need to take the SerDes through abstractions like Grouped, Produced, Consumed or Joined. And the user has to supply 
them every time through the with function of these classes.
+  The library uses the power of https://docs.scala-lang.org/tour/implicit-parameters.html;>Scala implicit 
parameters to alleviate this concern. As a user you can provide implicit 
SerDes or implicit values of Grouped, Produced, Consumed or Joined once and make your code less verbose. In fact 
you can just have the implicit SerDes in scope and the library will make the 
instances of Grouped, Produced, Consumed or Joined available in scope.
   The library also bundles all implicit SerDes of the commonly 
used primitive types in a Scala module - so just import the module vals and 
have all SerDes in scope.  A similar strategy of modular implicits can be 
adopted for any user-defined SerDes as well (User-defined SerDes are discussed 
in the next section).
   Here's an example:
   
 // DefaultSerdes brings into scope implicit SerDes (mostly for primitives)
-// that will set up all Serialized, Produced, Consumed and Joined instances.
-// So all APIs below that accept Serialized, Produced, Consumed or Joined will
+// that will set up all Grouped, Produced, Consumed and Joined instances.
+// So all APIs below that accept Grouped, Produced, Consumed or Joined will
 // get these instances automatically
 import Serdes._
 
@@ -3376,7 +3376,7 @@ 
 
 val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
 
-// The following code fragment does not have a single instance of Serialized,
+// The following code fragment does not have a single instance of Grouped,
 // Produced, Consumed or 

[jira] [Commented] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651008#comment-16651008
 ] 

ASF GitHub Bot commented on KAFKA-7080:
---

mjsax opened a new pull request #5804: KAFKA-7080 and KAFKA-7222: Cleanup 
overlapping KIP changes
URL: https://github.com/apache/kafka/pull/5804
 
 
   - KIP-319 and KIP-328 overlap and we can remove non-released deprecates 
methods
   - add upgrade docs for KIP-319
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WindowStoreBuilder incorrectly initializes CachingWindowStore
> -
>
> Key: KAFKA-7080
> URL: https://issues.apache.org/jira/browse/KAFKA-7080
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0, 1.0.1, 1.1.0, 2.0.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> When caching is enabled on the WindowStoreBuilder, it creates a 
> CachingWindowStore. However, it incorrectly passes storeSupplier.segments() 
> (the number of segments) to the segmentInterval argument.
>  
> The impact is low, since any valid number of segments is also a valid segment 
> size, but it likely results in much smaller segments than intended. For 
> example, the segments may be sized 3ms instead of 60,000ms.
>  
> Ideally the WindowBytesStoreSupplier interface would allow suppliers to 
> advertise their segment size instead of segment count. I plan to create a KIP 
> to propose this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-15 Thread Mr Kafka (JIRA)
Mr Kafka created KAFKA-7510:
---

 Summary: KStreams RecordCollectorImpl leaks data to logs on error
 Key: KAFKA-7510
 URL: https://issues.apache.org/jira/browse/KAFKA-7510
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Mr Kafka


org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data on 
error as it dumps the *value* / message payload to the logs.

This is problematic as it may contain personally identifiable information (pii) 
or other secret information to plain text log files which can then be 
propagated to other log systems i.e Splunk.

I suggest the *key*, and *value* fields be moved to debug level as it is useful 
for some people while error level contains the *errorMessage, timestamp, topic* 
and *stackTrace*.
{code:java}
private  void recordSendError(
final K key,
final V value,
final Long timestamp,
final String topic,
final Exception exception
) {
String errorLogMessage = LOG_MESSAGE;
String errorMessage = EXCEPTION_MESSAGE;
if (exception instanceof RetriableException) {
errorLogMessage += PARAMETER_HINT;
errorMessage += PARAMETER_HINT;
}
log.error(errorLogMessage, key, value, timestamp, topic, 
exception.toString());
sendException = new StreamsException(
String.format(
errorMessage,
logPrefix,
"an error caught",
key,
value,
timestamp,
topic,
exception.toString()
),
exception);
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7396) KIP-365: Materialized, Serialized, Joined, Consumed and Produced with implicit Serde

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650897#comment-16650897
 ] 

ASF GitHub Bot commented on KAFKA-7396:
---

vvcephei opened a new pull request #5803: KAFKA-7396: document implicit Grouped
URL: https://github.com/apache/kafka/pull/5803
 
 
   Updates the KIP-365 documentation in light of the replacement of Serialized 
with Grouped.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-365: Materialized, Serialized, Joined, Consumed and Produced with 
> implicit Serde
> 
>
> Key: KAFKA-7396
> URL: https://issues.apache.org/jira/browse/KAFKA-7396
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Joan Goyeau
>Assignee: Joan Goyeau
>Priority: Major
> Fix For: 2.1.0
>
>
> See KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-365%3A+Materialized%2C+Serialized%2C+Joined%2C+Consumed+and+Produced+with+implicit+Serde]
> Github PR: https://github.com/apache/kafka/pull/5551



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2018-10-15 Thread Randall Hauch (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650896#comment-16650896
 ] 

Randall Hauch commented on KAFKA-7509:
--

Created a patch with a proposed approach to prevent these warning messages: 
https://github.com/apache/kafka/pull/5802

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2018-10-15 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-7509:


 Summary: Kafka Connect logs unnecessary warnings about unused 
configurations
 Key: KAFKA-7509
 URL: https://issues.apache.org/jira/browse/KAFKA-7509
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 0.10.2.0
Reporter: Randall Hauch
Assignee: Randall Hauch


When running Connect, the logs contain quite a few warnings about "The 
configuration '{}' was supplied but isn't a known config." This occurs when 
Connect creates producers, consumers, and admin clients, because the 
AbstractConfig is logging unused configuration properties upon construction. 
It's complicated by the fact that the Producer, Consumer, and AdminClient all 
create their own AbstractConfig instances within the constructor, so we can't 
even call its {{ignore(String key)}} method.

See also KAFKA-6793 for a similar issue with Streams.

There are no arguments in the Producer, Consumer, or AdminClient constructors 
to control  whether the configs log these warnings, so a simpler workaround is 
to only pass those configuration properties to the Producer, Consumer, and 
AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
configdefs know about.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2018-10-15 Thread Allen Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650874#comment-16650874
 ] 

Allen Wang commented on KAFKA-7504:
---

[~junrao] Does that mean the patch does not immediately address our issue and 
further work is needed? I was about to give it a try.

 

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer with non-blocking 
> manner.
>  However, Kafka uses sendfile(2) for transferring log data to client sockets. 
> The target data might be in page cache, but old data which has written a bit 
> far ago and never read since then, are likely not.
>  If the target data isn't in page cache, kernel first needs to load the 
> target page into cache. This takes more than few milliseconds to tens of 
> milliseconds depending on disk hardware and current load being applied to it.
>  Linux kernel doesn't considers the moment loading data from disk into page 
> cache as "blocked", hence it awaits completion 

[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2018-10-15 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650798#comment-16650798
 ] 

Jun Rao commented on KAFKA-7504:


The case that Allen described is when a follower is fetching both in-sync and 
out-of-sync partitions. If the fetching of the out-of-sync partition's data is 
taking long, it will delay the propagation of the in-sync partition's data, 
which can increase the producer latency. This can happen even when the 
replication quota is enabled. To improve this case, we can potentially extend 
the idea in Yuto's patch for handling replication fetch request. Basically, we 
initiate the prefetching of a partition in a background thread. The fetch 
response can be sent when the prefetching of all partitions' data are completed 
or the timeout is reached. In the case of timeout, we only include those 
partitions whose prefetching has completed.

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer 

[jira] [Created] (KAFKA-7508) Kafka broker anonymous disconnected from Zookeeper

2018-10-15 Thread Sathish Yanamala (JIRA)
Sathish Yanamala created KAFKA-7508:
---

 Summary: Kafka broker anonymous disconnected from Zookeeper
 Key: KAFKA-7508
 URL: https://issues.apache.org/jira/browse/KAFKA-7508
 Project: Kafka
  Issue Type: Task
  Components: config
Reporter: Sathish Yanamala


Hello Team,

 

We are facing below Error , Kafka broker unable to connect Zookeeper , Please 
check and suggest is there any configuration changes required on Kafka Broker.

 

 ERROR:

2018-10-15 12:24:07,502 WARN kafka.network.Processor: Attempting to send 
response via channel for which there is no open connection, connection id 
- -:9093-- -:47542-25929

2018-10-15 12:24:09,428 INFO kafka.coordinator.group.GroupCoordinator: 
[GroupCoordinator 3]: Group KMOffsetCache-xxx  with generation 9 is now 
empty (__consumer_offsets-22)

2018-10-15 12:24:09,428 INFO kafka.server.epoch.LeaderEpochFileCache: Updated 
PartitionLeaderEpoch. New: \{epoch:1262, offset:151}, Current: \{epoch:1261, 
offset144} for Partition: __consumer_offsets-22. Cache now contains 15 entries.

{color:#d04437}*2018-10-15 12:24:10,905 ERROR kafka.utils.KafkaScheduler: 
Uncaught exception in scheduled task 'highwatermark-checkpoint'*{color}

{color:#d04437}*java.lang.OutOfMemoryError: Java heap space*{color}

{color:#d04437}    at{color} 
scala.collection.convert.DecorateAsScala$$Lambda$214/x.get$Lambda(Unknown 
Source)

    at 
java.lang.invoke.LambdaForm$DMH/xxx.invokeStatic_LL_L(LambdaForm$DMH)

    at 
java.lang.invoke.LambdaForm$MH/xx.linkToTargetMethod(LambdaForm$MH)

    at 
scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter(DecorateAsScala.scala:45)

    at 
scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter$(DecorateAsScala.scala:44)

    at 
scala.collection.JavaConverters$.collectionAsScalaIterableConverter(JavaConverters.scala:73)

    at kafka.utils.Pool.values(Pool.scala:85)

    at 
kafka.server.ReplicaManager.nonOfflinePartitionsIterator(ReplicaManager.scala:397)

    at 
kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1340)

    at 
kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:253)

    at kafka.server.ReplicaManager$$Lambda$608/xx.apply$mcV$sp(Unknown 
Source)

    at 
kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:110)

    at kafka.utils.KafkaScheduler$$Lambda$269/.apply$mcV$sp(Unknown 
Source)

    at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)

    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

 

Thank you,

Sathish Yanamala

M:832-382-4487



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7506) KafkaStreams repartition topic settings not suitable for processing old records

2018-10-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650532#comment-16650532
 ] 

Matthias J. Sax commented on KAFKA-7506:


[~niklas.lonn] If you want a back-port, pleas comment on the other ticket. 
However, it's not a bug fix (the other ticket is marked as "improvement"), but 
a default config change and thus should not be part of a bug fix release IMHO. 

Note that you don't really need the bug fix though. You can reconfigure the 
repartition topics manually after Streams created them via `bin/kafka-topic.sh` 
command. As an alternative, you can also pass in topic-level configs that 
overwrite default config via StreamsConfig (note, that those configs apply to 
all internal topic; thus, you need to double check if you can use this or not): 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+topic+configs

> KafkaStreams repartition topic settings not suitable for processing old 
> records
> ---
>
> Key: KAFKA-7506
> URL: https://issues.apache.org/jira/browse/KAFKA-7506
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: Niklas Lönn
>Priority: Major
> Attachments: kafka-7506.patch
>
>
> Hi, We are using Kafka Streams to process a compacted store, when resetting 
> the application/processing from scratch the default topic configuration for 
> repartition topics is 50MB and 10min segment sizes.
>  
> As the retention.ms is undefined, this leads to default retention.ms and log 
> cleaner starts competing with the application, effectively causing the 
> streams app to skip records.
> {{Application logs the following:}}
> {{Fetch offset 213792 is out of range for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting 
> offset}}
> {{Fetch offset 110227 is out of range for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting 
> offset}}
> {{Resetting offset for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset 
> 233302.}}
> {{Resetting offset for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset 
> 119914.}}
> By adding the following configuration to RepartitionTopicConfig.java the 
> issue is solved
> {{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // 
> Infinite}}
>  
>  My understanding is that this should be safe as KafkaStreams uses the admin 
> API to delete segments.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2018-10-15 Thread Allen Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650486#comment-16650486
 ] 

Allen Wang commented on KAFKA-7504:
---

Great work. We have observed significant produce response time increase when a 
follower is catching up with the leader after the follower is down for a while. 
The leader has a lot of disk read at the time. It seems to be related to this 
issue. I am looking forward to the patch in 1.1 and 2.x branches.

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer with non-blocking 
> manner.
>  However, Kafka uses sendfile(2) for transferring log data to client sockets. 
> The target data might be in page cache, but old data which has written a bit 
> far ago and never read since then, are likely not.
>  If the target data isn't in page cache, kernel first needs to load the 
> target page into cache. This takes more than few milliseconds to tens of 
> milliseconds depending on disk 

[jira] [Commented] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650322#comment-16650322
 ] 

ASF GitHub Bot commented on KAFKA-7505:
---

rajinisivaram opened a new pull request #5800: KAFKA-7505: Process incoming 
bytes on write error to report SSL failures
URL: https://github.com/apache/kafka/pull/5800
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky test: SslTransportLayerTest.testListenerConfigOverride
> 
>
> Key: KAFKA-7505
> URL: https://issues.apache.org/jira/browse/KAFKA-7505
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Rajini Sivaram
>Priority: Major
>
> This test seems to fail quite a bit recently. I've seen it happen with Java 
> 11 quite a bit so it could be more likely to fail there.
> {code:java}
> java.lang.AssertionError: expected: but 
> was: at org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.failNotEquals(Assert.java:834) at 
> org.junit.Assert.assertEquals(Assert.java:118) at 
> org.junit.Assert.assertEquals(Assert.java:144) at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:104)
>  at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:109)
>  at 
> org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7507) Kafka Topics Error : "NotLeaderForPartitionException: This server is not the leader for that topic-partition"

2018-10-15 Thread Sathish Yanamala (JIRA)
Sathish Yanamala created KAFKA-7507:
---

 Summary: Kafka Topics Error : "NotLeaderForPartitionException: 
This server is not the leader for that topic-partition"
 Key: KAFKA-7507
 URL: https://issues.apache.org/jira/browse/KAFKA-7507
 Project: Kafka
  Issue Type: Bug
Reporter: Sathish Yanamala


Hello Team,

We are facing below Error on existing application , We have this error first 
time in our application.

Please suggest , Is there any configuration changes required on below issue, I 
just reviewed some of JIRA story’s related to below.

We running our Environment with 5 Brokers and each topic we are following as 
replication -3 and partitions – 6 forall our topics and we have Zookeeper 
Environment .  

 

 +*Error Log :*+

2018-10-15 03:42:16,596 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition 
__consumer_offsets-41 to broker 
1:{color:#d04437}org.apache.kafka.common.errors.NotLeaderForPartitionException: 
This server is not the leader for that topic-partition{color}.

2018-10-15 03:42:16,606 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=4, fetcherId=0] Error for partition 
CSL_SUR_DL_LXNX-3 to broker 
4:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,608 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition 
CSL_SUR_DL_SOC_EAS-4 to broker 
1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,609 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error for partition 
__consumer_offsets-26 to broker 
3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,613 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=5, fetcherId=0] Error for partition 
EDL_Datashare_Genesys_Request-3 to broker 
5:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,613 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=5, fetcherId=0] Error for partition 
CSL_TRANSMIT_Data-3 to broker 
5:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,613 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=4, fetcherId=0] Error for partition 
CHR_ResOrch_03-0 to broker 
4:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,616 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition 
CSL_SUR_LXNX_REQUEST-4 to broker 
1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,616 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=4, fetcherId=0] Error for partition 
EDL_Datashare_JCP_GPAs-4 to broker 
4:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,617 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=5, fetcherId=0] Error for partition 
CHR_ResOrch_03-5 to broker 
5:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,619 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error for partition 
CHR_ResOrch-0 to broker 
3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,621 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition 
__consumer_offsets-21 to broker 
1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,623 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error for partition 
CHR_ResOrch_05-3 to broker 
3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

2018-10-15 03:42:16,624 ERROR kafka.server.ReplicaFetcherThread: 
[ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition 
CHR_DataCompOrch-5 to broker 
1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that 

[jira] [Commented] (KAFKA-7498) common.requests.CreatePartitionsRequest uses clients.admin.NewPartitions

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650131#comment-16650131
 ] 

ASF GitHub Bot commented on KAFKA-7498:
---

rajinisivaram closed pull request #5784: KAFKA-7498: Remove references from 
`common.requests` to `clients`
URL: https://github.com/apache/kafka/pull/5784
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7810a3e8673..91d23f6424b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -131,7 +131,6 @@
 
 
 
-  
   
   
   
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 86e14476625..b4132840e4d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -81,6 +81,7 @@
 import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
 import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import 
org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
@@ -142,6 +143,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.closeQuietly;
 
@@ -2083,7 +2085,8 @@ public CreatePartitionsResult 
createPartitions(Map newPar
 for (String topic : newPartitions.keySet()) {
 futures.put(topic, new KafkaFutureImpl<>());
 }
-final Map requestMap = new 
HashMap<>(newPartitions);
+final Map requestMap = 
newPartitions.entrySet().stream()
+.collect(Collectors.toMap(Map.Entry::getKey, e -> 
partitionDetails(e.getValue(;
 
 final long now = time.milliseconds();
 runnable.call(new Call("createPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
@@ -2482,6 +2485,10 @@ private boolean 
handleFindCoordinatorError(FindCoordinatorResponse response, Kaf
 return false;
 }
 
+private PartitionDetails partitionDetails(NewPartitions newPartitions) {
+return new PartitionDetails(newPartitions.totalCount(), 
newPartitions.assignments());
+}
+
 private final static class ListConsumerGroupsResults {
 private final List errors;
 private final HashMap listings;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
index 795a66a9ea6..7872cf9f8d0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
@@ -72,17 +71,47 @@
 // It is an error for duplicate topics to be present in the request,
 // so track duplicates here to allow KafkaApis to report per-topic errors.
 private final Set duplicates;
-private final Map newPartitions;
+private final Map newPartitions;
 private final int timeout;
 private final boolean validateOnly;
 
+public static class PartitionDetails {
+
+private final int totalCount;
+
+private final List> newAssignments;
+
+public PartitionDetails(int totalCount) {
+this(totalCount, null);
+}
+
+public PartitionDetails(int totalCount, List> 
newAssignments) {
+this.totalCount = totalCount;
+this.newAssignments = newAssignments;
+}
+
+public int totalCount() {
+return totalCount;
+}
+
+public List> newAssignments() {
+return newAssignments;
+}
+
+@Override
+public String toString() {
+return "(totalCount=" + totalCount() + ", newAssignments=" + 
newAssignments() + ")";
+}
+
+}
+
 public static class Builder extends 
AbstractRequest.Builder {
 
-private final Map 

[jira] [Assigned] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride

2018-10-15 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reassigned KAFKA-7505:
-

Assignee: Rajini Sivaram

> Flaky test: SslTransportLayerTest.testListenerConfigOverride
> 
>
> Key: KAFKA-7505
> URL: https://issues.apache.org/jira/browse/KAFKA-7505
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Rajini Sivaram
>Priority: Major
>
> This test seems to fail quite a bit recently. I've seen it happen with Java 
> 11 quite a bit so it could be more likely to fail there.
> {code:java}
> java.lang.AssertionError: expected: but 
> was: at org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.failNotEquals(Assert.java:834) at 
> org.junit.Assert.assertEquals(Assert.java:118) at 
> org.junit.Assert.assertEquals(Assert.java:144) at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:104)
>  at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:109)
>  at 
> org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride

2018-10-15 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650102#comment-16650102
 ] 

Rajini Sivaram commented on KAFKA-7505:
---

The stack trace shows that the client didn't see SSL handshake notification 
from the broker when the broker failed handshake and then closed the connection.

The sequence with older versions where this succeeds:
# Client writes SSL handshake data
# Broker process handshake data from client, fails handshake
# Broker flushes handshake failure notification, waiting if necessary for flush 
to complete
# Client reads data from broker, processes SSL handshake notification
# Client processes failure as an authentication exception

We see the same sequence with Java 11 most of the time, but sometimes it fails 
because client is attempting to write more data to the broker (which is 
possible with the TLS protocol). The sequence is:
# Client writes some SSL handshake data
# Broker process handshake data from client, fails handshake
# Broker flushes failure notification, waiting if necessary for flush to 
complete
# Client attempts to write more data, fails with an I/O exception since broker 
has closed the connection
# Client processes failure as an I/O exception

We guarantee that we never process an ordinary I/O exception as an 
authentication exception, but we don't actually guarantee the reverse. The 
tests however are strict because we want to try and handle all known 
authentication failure scenarios. I will see if we can fix this scenario in our 
implementation.


> Flaky test: SslTransportLayerTest.testListenerConfigOverride
> 
>
> Key: KAFKA-7505
> URL: https://issues.apache.org/jira/browse/KAFKA-7505
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Priority: Major
>
> This test seems to fail quite a bit recently. I've seen it happen with Java 
> 11 quite a bit so it could be more likely to fail there.
> {code:java}
> java.lang.AssertionError: expected: but 
> was: at org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.failNotEquals(Assert.java:834) at 
> org.junit.Assert.assertEquals(Assert.java:118) at 
> org.junit.Assert.assertEquals(Assert.java:144) at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:104)
>  at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:109)
>  at 
> org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7506) KafkaStreams repartition topic settings not suitable for processing old records

2018-10-15 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650014#comment-16650014
 ] 

Niklas Lönn commented on KAFKA-7506:


Found the duplicate ticket myself now.. sorry for the noise, however, why has 
this not been backported to 1.1.x? Jumping to 2.0.0 might not always be 
feasible?

> KafkaStreams repartition topic settings not suitable for processing old 
> records
> ---
>
> Key: KAFKA-7506
> URL: https://issues.apache.org/jira/browse/KAFKA-7506
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: Niklas Lönn
>Priority: Major
> Attachments: kafka-7506.patch
>
>
> Hi, We are using Kafka Streams to process a compacted store, when resetting 
> the application/processing from scratch the default topic configuration for 
> repartition topics is 50MB and 10min segment sizes.
>  
> As the retention.ms is undefined, this leads to default retention.ms and log 
> cleaner starts competing with the application, effectively causing the 
> streams app to skip records.
> {{Application logs the following:}}
> {{Fetch offset 213792 is out of range for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting 
> offset}}
> {{Fetch offset 110227 is out of range for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting 
> offset}}
> {{Resetting offset for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset 
> 233302.}}
> {{Resetting offset for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset 
> 119914.}}
> By adding the following configuration to RepartitionTopicConfig.java the 
> issue is solved
> {{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // 
> Infinite}}
>  
>  My understanding is that this should be safe as KafkaStreams uses the admin 
> API to delete segments.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7506) KafkaStreams repartition topic settings not suitable for processing old records

2018-10-15 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/KAFKA-7506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niklas Lönn updated KAFKA-7506:
---
Description: 
Hi, We are using Kafka Streams to process a compacted store, when resetting the 
application/processing from scratch the default topic configuration for 
repartition topics is 50MB and 10min segment sizes.

 

As the retention.ms is undefined, this leads to default retention.ms and log 
cleaner starts competing with the application, effectively causing the streams 
app to skip records.

{{Application logs the following:}}

{{Fetch offset 213792 is out of range for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting offset}}
{{Fetch offset 110227 is out of range for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting offset}}
{{Resetting offset for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset 233302.}}
{{Resetting offset for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset 119914.}}

By adding the following configuration to RepartitionTopicConfig.java the issue 
is solved

{{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // 
Infinite}}

 
 My understanding is that this should be safe as KafkaStreams uses the admin 
API to delete segments.
  

  was:
Hi, We are using Kafka Streams to process a compacted store, when resetting the 
application/processing from scratch the default topic configuration for 
repartition topics is 50MB and 10min segment sizes.

 

As the retention.ms is undefined, this leads to default retention.ms and log 
cleaner starts competing with the application, effectively causing the streams 
app to skip records.

{{Application logs the following:}}

{\{ Fetch offset 213792 is out of range for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting offset}}
 \{{ Fetch offset 110227 is out of range for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting offset}}
 \{{ Resetting offset for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset 233302.}}
 \{{ Resetting offset for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset 119914.}}

By adding the following configuration to RepartitionTopicConfig.java the issue 
is solved

{{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // 
Infinite}}

 
 My understanding is that this should be safe as KafkaStreams uses the admin 
API to delete segments.
  


> KafkaStreams repartition topic settings not suitable for processing old 
> records
> ---
>
> Key: KAFKA-7506
> URL: https://issues.apache.org/jira/browse/KAFKA-7506
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Niklas Lönn
>Priority: Major
>
> Hi, We are using Kafka Streams to process a compacted store, when resetting 
> the application/processing from scratch the default topic configuration for 
> repartition topics is 50MB and 10min segment sizes.
>  
> As the retention.ms is undefined, this leads to default retention.ms and log 
> cleaner starts competing with the application, effectively causing the 
> streams app to skip records.
> {{Application logs the following:}}
> {{Fetch offset 213792 is out of range for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting 
> offset}}
> {{Fetch offset 110227 is out of range for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting 
> offset}}
> {{Resetting offset for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset 
> 233302.}}
> {{Resetting offset for partition 
> app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset 
> 119914.}}
> By adding the following configuration to RepartitionTopicConfig.java the 
> issue is solved
> {{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // 
> Infinite}}
>  
>  My understanding is that this should be safe as KafkaStreams uses the admin 
> API to delete segments.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7479) Call to "producer.initTransaction" hangs when using IP for "bootstrap.servers"

2018-10-15 Thread huxihx (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649996#comment-16649996
 ] 

huxihx commented on KAFKA-7479:
---

Seems it was already fixed by KAFKA-6446

> Call to "producer.initTransaction" hangs when using IP for "bootstrap.servers"
> --
>
> Key: KAFKA-7479
> URL: https://issues.apache.org/jira/browse/KAFKA-7479
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: Gene B.
>Priority: Major
> Attachments: KAFKA-7479.log, KafkaProducerSample.java
>
>
> When using IP address for "bootstrap.servers",
> And Kafka server is installed in a VM (Virtual Box)
> Then transactional Kafka client is hanging on call 
> "producer.initTransaction", and the call never completes.
> Current workaround is to add Kafka host's name to the "hosts" file, but this 
> approach will not scale. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7506) KafkaStreams repartition topic settings not suitable for processing old records

2018-10-15 Thread JIRA
Niklas Lönn created KAFKA-7506:
--

 Summary: KafkaStreams repartition topic settings not suitable for 
processing old records
 Key: KAFKA-7506
 URL: https://issues.apache.org/jira/browse/KAFKA-7506
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: Niklas Lönn


Hi, We are using Kafka Streams to process a compacted store, when resetting the 
application/processing from scratch the default topic configuration for 
repartition topics is 50MB and 10min segment sizes.

 

As the retention.ms is undefined, this leads to default retention.ms and log 
cleaner starts competing with the application, effectively causing the streams 
app to skip records.

{{Application logs the following:}}

{\{ Fetch offset 213792 is out of range for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting offset}}
 \{{ Fetch offset 110227 is out of range for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting offset}}
 \{{ Resetting offset for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset 233302.}}
 \{{ Resetting offset for partition 
app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset 119914.}}

By adding the following configuration to RepartitionTopicConfig.java the issue 
is solved

{{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // 
Infinite}}

 
 My understanding is that this should be safe as KafkaStreams uses the admin 
API to delete segments.
  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7412) Bug prone response from producer.send(ProducerRecord, Callback) if Kafka broker is not running

2018-10-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649959#comment-16649959
 ] 

ASF GitHub Bot commented on KAFKA-7412:
---

huxihx opened a new pull request #5798: KAFKA-7412: onComplete should not 
reassign `metadata` variable
URL: https://github.com/apache/kafka/pull/5798
 
 
   The Java doc for `InterceptorCallback#onComplete` says that exactly one of 
the arguments(metadata and exception) will be non-null. However, the commitment 
will be broken when TimeoutException is encountered since the code reassigns a 
new-created RecordMetadata object to variable `metadata`.
   
   The solution is to leave `metadata1` unchanged and pass a new RecordMetadata 
instance to `onAcknowledgement`.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bug prone response from producer.send(ProducerRecord, Callback) if Kafka 
> broker is not running
> --
>
> Key: KAFKA-7412
> URL: https://issues.apache.org/jira/browse/KAFKA-7412
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.0.0
>Reporter: Michal Turek
>Priority: Major
> Attachments: both_metadata_and_exception.png, 
> metadata_when_kafka_is_stopped.png
>
>
> Hi there, I have probably found a bug in Java Kafka producer client.
> Scenario & current behavior:
> - Start Kafka broker, single instance.
> - Start application that produces messages to Kafka.
> - Let the application to load partitions for a topic to warm up the producer, 
> e.g. send a message to Kafka. I'm not sure if this is necessary step, but our 
> code does it.
> - Gracefully stop the Kafka broker.
> - Application logs now contains "org.apache.kafka.clients.NetworkClient: 
> [Producer clientId=...] Connection to node 0 could not be established. Broker 
> may not be available." so the client is aware about the Kafka unavailability.
> - Trigger the producer to send a message using 
> KafkaProducer.send(ProducerRecord, Callback) method.
> - The callback that notifies business code receives non-null RecordMetadata 
> and null Exception after request.timeout.ms. The metadata contains offset -1 
> which is value of ProduceResponse.INVALID_OFFSET.
> Expected behavior:
> - If the Kafka is not running and the message is not appended to the log, the 
> callback should contain null RecordMetadata and non-null Exception. At least 
> I subjectively understand the Javadoc this way, "exception on production 
> error" in simple words.
> - Developer that is not aware of this behavior and that doesn't test for 
> offset -1, may consider the message as successfully send and properly acked 
> by the broker.
> Known workaround
> - Together with checking for non-null exception in the callback, add another 
> condition for ProduceResponse.INVALID_OFFSET.
> {noformat}
> try {
> producer.send(record, (metadata, exception) -> {
> if (metadata != null) {
> if (metadata.offset() != 
> ProduceResponse.INVALID_OFFSET) {
> // Success
> } else {
> // Failure
> }
> } else {
> // Failure
> }
> });
> } catch (Exception e) {
> // Failure
> }
> {noformat}
> Used setup
> - Latest Kafka 2.0.0 for both broker and Java client.
> - Originally found with broker 0.11.0.1 and client 2.0.0.
> - Code is analogy of the one in Javadoc of KafkaProducer.send().
> - Used producer configuration (others use defaults).
> {noformat}
> bootstrap.servers = "localhost:9092"
> client.id = "..."
> acks = "all"
> retries = 1
> linger.ms = "20"
> compression.type = "lz4"
> 

[jira] [Assigned] (KAFKA-7412) Bug prone response from producer.send(ProducerRecord, Callback) if Kafka broker is not running

2018-10-15 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-7412:
-

Assignee: huxihx

> Bug prone response from producer.send(ProducerRecord, Callback) if Kafka 
> broker is not running
> --
>
> Key: KAFKA-7412
> URL: https://issues.apache.org/jira/browse/KAFKA-7412
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.0.0
>Reporter: Michal Turek
>Assignee: huxihx
>Priority: Major
> Attachments: both_metadata_and_exception.png, 
> metadata_when_kafka_is_stopped.png
>
>
> Hi there, I have probably found a bug in Java Kafka producer client.
> Scenario & current behavior:
> - Start Kafka broker, single instance.
> - Start application that produces messages to Kafka.
> - Let the application to load partitions for a topic to warm up the producer, 
> e.g. send a message to Kafka. I'm not sure if this is necessary step, but our 
> code does it.
> - Gracefully stop the Kafka broker.
> - Application logs now contains "org.apache.kafka.clients.NetworkClient: 
> [Producer clientId=...] Connection to node 0 could not be established. Broker 
> may not be available." so the client is aware about the Kafka unavailability.
> - Trigger the producer to send a message using 
> KafkaProducer.send(ProducerRecord, Callback) method.
> - The callback that notifies business code receives non-null RecordMetadata 
> and null Exception after request.timeout.ms. The metadata contains offset -1 
> which is value of ProduceResponse.INVALID_OFFSET.
> Expected behavior:
> - If the Kafka is not running and the message is not appended to the log, the 
> callback should contain null RecordMetadata and non-null Exception. At least 
> I subjectively understand the Javadoc this way, "exception on production 
> error" in simple words.
> - Developer that is not aware of this behavior and that doesn't test for 
> offset -1, may consider the message as successfully send and properly acked 
> by the broker.
> Known workaround
> - Together with checking for non-null exception in the callback, add another 
> condition for ProduceResponse.INVALID_OFFSET.
> {noformat}
> try {
> producer.send(record, (metadata, exception) -> {
> if (metadata != null) {
> if (metadata.offset() != 
> ProduceResponse.INVALID_OFFSET) {
> // Success
> } else {
> // Failure
> }
> } else {
> // Failure
> }
> });
> } catch (Exception e) {
> // Failure
> }
> {noformat}
> Used setup
> - Latest Kafka 2.0.0 for both broker and Java client.
> - Originally found with broker 0.11.0.1 and client 2.0.0.
> - Code is analogy of the one in Javadoc of KafkaProducer.send().
> - Used producer configuration (others use defaults).
> {noformat}
> bootstrap.servers = "localhost:9092"
> client.id = "..."
> acks = "all"
> retries = 1
> linger.ms = "20"
> compression.type = "lz4"
> request.timeout.ms = 5000 # The same behavior is with default, this is to 
> speed up the tests
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2018-10-15 Thread Jiaxin YE (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649839#comment-16649839
 ] 

Jiaxin YE commented on KAFKA-4084:
--

[~wushujames] : Wow, thanks! Let me try that out. 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)