[jira] [Created] (KAFKA-7686) Remove PowerMock from Connect Tests

2018-11-28 Thread Magesh kumar Nandakumar (JIRA)
Magesh kumar Nandakumar created KAFKA-7686:
--

 Summary: Remove PowerMock from Connect Tests
 Key: KAFKA-7686
 URL: https://issues.apache.org/jira/browse/KAFKA-7686
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Magesh kumar Nandakumar
Assignee: Magesh kumar Nandakumar


Remove PowerMock from Connect Tests



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2018-11-28 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702657#comment-16702657
 ] 

Guozhang Wang commented on KAFKA-4212:
--

cc [~mjsax], I think you are already aware but there seems to be a correlation 
with KIP-258.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7566) Add sidecar job to leader (or a random single follower) only

2018-11-28 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702656#comment-16702656
 ] 

Guozhang Wang commented on KAFKA-7566:
--

Were you triggering this job via the punctuation function or make it data 
driven (i.e. in the process() call check if it is time to do gc)? In the latter 
case, you could actually be relying on the 
{{ProcessorContext.topic()/partition()}} and only do the job if the returned 
topic name is a specific value (in case you have multiple topics as sources) 
and the number is 0 (or actually, any number is fine, but 0 is always safe 
without knowing the total number of partitions), so that only one task at a 
time will be doing this.

On punctuation though, there will be no record context, and the above function 
will return `-1` indicating "not known".

> Add sidecar job to leader (or a random single follower) only
> 
>
> Key: KAFKA-7566
> URL: https://issues.apache.org/jira/browse/KAFKA-7566
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Minor
>
> Hey there,
> recently we need to add an archive job to a streaming application. The caveat 
> is that we need to make sure only one instance is doing this task to avoid 
> potential race condition, and we also don't want to schedule it as a regular 
> stream task so that we will be blocking normal streaming operation. 
> Although we could do so by doing a zk lease, I'm raising the case here since 
> this could be some potential use case for streaming job also. For example, 
> there are some `leader specific` operation we could schedule in DSL instead 
> of adhoc manner.
> Let me know if you think this makes sense to you, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-28 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702652#comment-16702652
 ] 

Guozhang Wang commented on KAFKA-7678:
--

Looked at the source code, and I think it is indeed a bug: we may close a 
suspended task, and hence causing recordCollector.close() to be called multiple 
times, and hence that function should be implemented in an idempotent way.

A more general thing is that internally the call trace from task-manager to 
tasks.close / suspend calls etc are quite messy, maybe some code cleanup would 
be worthy in the future.

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Minor
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7449) Kafka console consumer is not sending topic to deserializer

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702633#comment-16702633
 ] 

ASF GitHub Bot commented on KAFKA-7449:
---

junrao closed pull request #5704: KAFKA-7449 Forward topic from console 
consumer to deserializer
URL: https://github.com/apache/kafka/pull/5704
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 42c5c5bae1b..9a8c64842aa 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -509,9 +509,9 @@ class DefaultMessageFormatter extends MessageFormatter {
 output.write(lineSeparator)
 }
 
-def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) 
{
+def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], 
topic: String) {
   val nonNullBytes = 
Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
-  val convertedBytes = deserializer.map(_.deserialize(null, 
nonNullBytes).toString.
+  val convertedBytes = deserializer.map(_.deserialize(topic, 
nonNullBytes).toString.
 getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
   output.write(convertedBytes)
 }
@@ -527,12 +527,12 @@ class DefaultMessageFormatter extends MessageFormatter {
 }
 
 if (printKey) {
-  write(keyDeserializer, key)
+  write(keyDeserializer, key, topic)
   writeSeparator(printValue)
 }
 
 if (printValue) {
-  write(valueDeserializer, value)
+  write(valueDeserializer, value, topic)
   output.write(lineSeparator)
 }
   }
diff --git a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala 
b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
new file mode 100644
index 000..37b5b79a868
--- /dev/null
+++ b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
@@ -0,0 +1,53 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.tools
+
+import java.io.PrintStream
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.serialization.Deserializer
+import org.hamcrest.CoreMatchers
+import org.junit.Test
+import org.junit.Assert.assertThat
+import org.scalatest.mockito.MockitoSugar
+
+class CustomDeserializer extends Deserializer[String] {
+  override def configure(configs: java.util.Map[String, _], isKey: Boolean): 
Unit = {
+  }
+
+  override def deserialize(topic: String, data: Array[Byte]): String = {
+assertThat("topic must not be null", topic, CoreMatchers.notNullValue())
+new String(data)
+  }
+
+  override def close(): Unit = {
+  }
+}
+
+class CustomDeserializerTest extends MockitoSugar {
+
+  @Test
+  def checkDeserializerTopicIsNotNull(): Unit = {
+val formatter = new DefaultMessageFormatter()
+formatter.keyDeserializer = Some(new CustomDeserializer)
+
+formatter.writeTo(new ConsumerRecord("topic_test", 1, 1l, "key".getBytes, 
"value".getBytes), mock[PrintStream])
+
+formatter.close()
+  }
+}


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka console consumer is not sending topic to deserializer
> ---
>
> Key: KAFKA-7449
> URL: https://issues.apache.org/jira/browse/KAFKA-7449
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mathieu Chataigner
>Priority: Major
>  Labels: easyfix, pull-request-available
>
> We tried to create a 

[jira] [Resolved] (KAFKA-7449) Kafka console consumer is not sending topic to deserializer

2018-11-28 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7449.

   Resolution: Fixed
 Assignee: Mathieu Chataigner
Fix Version/s: 2.2.0

Merged to trunk.

> Kafka console consumer is not sending topic to deserializer
> ---
>
> Key: KAFKA-7449
> URL: https://issues.apache.org/jira/browse/KAFKA-7449
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mathieu Chataigner
>Assignee: Mathieu Chataigner
>Priority: Major
>  Labels: easyfix, pull-request-available
> Fix For: 2.2.0
>
>
> We tried to create a custom Deserializer to consume some protobuf topics.
> We have a mechanism for getting the protobuf class from topic name however 
> the console consumer is not forwarding the topic of the console consumer 
> record down to the deserializer.
> Topic information is available in the ConsumerRecord.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7671) A KStream/GlobalKTable join shouldn't reset the repartition flag

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702630#comment-16702630
 ] 

ASF GitHub Bot commented on KAFKA-7671:
---

guozhangwang closed pull request #5959: KAFKA-7671: Stream-Global Table join 
should not reset repartition flag
URL: https://github.com/apache/kafka/pull/5959
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 3b691513004..ed5625e3a17 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -753,7 +753,7 @@ public void process(final ProcessorSupplier processorSuppl
 builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
 
 // do not have serde for joined result
-return new KStreamImpl<>(name, keySerde, null, sourceNodes, false, 
streamTableJoinNode, builder);
+return new KStreamImpl<>(name, keySerde, null, sourceNodes, 
repartitionRequired, streamTableJoinNode, builder);
 }
 
 @SuppressWarnings("unchecked")
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 772836f9961..e0c38c656c2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -61,15 +61,18 @@
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @SuppressWarnings("unchecked")
@@ -437,6 +440,27 @@ public void 
shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningT
 }
 }
 }
+
+@Test
+public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final GlobalKTable globalKTable = 
builder.globalTable("globalTopic");
+final KeyValueMapper kvMappper = (k, v) -> k + 
v;
+final ValueJoiner valueJoiner = (v1, v2) -> v1 
+ v2;
+builder.stream("topic").selectKey((k, v) -> v)
+.join(globalKTable, kvMappper, valueJoiner)
+.groupByKey()
+.count();
+
+final Pattern repartitionTopicPattern = Pattern.compile("Sink: 
.*-repartition");
+final String topology = builder.build().describe().toString();
+final Matcher matcher = repartitionTopicPattern.matcher(topology);
+assertTrue(matcher.find());
+final String match = matcher.group();
+assertThat(match, notNullValue());
+assertTrue(match.endsWith("repartition"));
+
+}
 
 @Test
 public void testToWithNullValueSerdeDoesntNPE() {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> A KStream/GlobalKTable join shouldn't reset the repartition flag
> 
>
> Key: KAFKA-7671
> URL: https://issues.apache.org/jira/browse/KAFKA-7671
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Andy Bryant
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> Currently a KStream/GlobalKTable join resets the repartition required flag to 
> false.
> I have a topology where I map a stream, join with a GlobalKTable, then 
> groupByKey then aggregate.
> The aggregate wasn't behaving correctly because it didn't force a repartition 
> as I expected. The KStream/GlobalKTable join had reset the flag and hence I 
> was getting the 

[jira] [Updated] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-28 Thread Patrik Kleindl (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrik Kleindl updated KAFKA-7660:
--
Attachment: heapdump-1543441898901.hprof

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-28 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702422#comment-16702422
 ] 

Patrik Kleindl commented on KAFKA-7660:
---

[~vvcephei] Well, at least that answers my next question why I couldn't find 
any references :P

And no, my code does not call any of these methods.

I'm attaching a heapdump from the last test.

The effect can mostly be seen on the KafkaMetric and MetricName, they start 
around 1900 right after the start.

Code is:
{code:java}
public class MemoryTest {
public static void main(String[] args) {


final String bootstrapServers = "broker0:9092";

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);

StreamsBuilder builder = new StreamsBuilder();

KTable table = builder.table("test");

final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsConfiguration);

streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
streams.close();
} catch (Exception e) {
// ignored
}
}));
}
}
{code}
and if I want to run it for some time with delays for the rebalances:
{code:java}
while (true) {
final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsConfiguration);

try {
Thread.sleep(2L);
streams.start();
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
streams.close();

}

}
{code}
Topic:
{code:java}
./kafka-topics --zookeeper broker0:2181 --create --topic test --partitions 10 
--replication-factor 3
{code}

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-28 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702387#comment-16702387
 ] 

John Roesler commented on KAFKA-7660:
-

[~pkleindl],

Ah, yes, now I'm remembering why I probably didn't wind up making this change 
before. I restructured the sensor tracking so that we don't use parentSensors 
for any of the Streams metrics. 

The parentSensors map should be non-empty only if your code calls one of:
 * 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetrics#addThroughputSensor
 * 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetrics#addLatencyAndThroughputSensor

Is this the case? If so, you would need to call 
org.apache.kafka.streams.StreamsMetrics#removeSensor in the relevant close() 
method of the object that created the metrics.

If this is not the case, then I'm curious what the references are. I don't 
suppose you can cause the apparent memory leak and then grab a heap dump? I'm 
not sure if you'd want to put the whole heap dump on this forum, but maybe you 
could take a look in it and see what objects are being referenced from the 
parentSensors map.

Of course, it's possible (likely?) that the apparent memory leak has a 
different cause, but it would be nice to rule this out, since it's on our radar.

 

I'll still keep my PR open, since, even if it's not affecting you, it would 
still be a memory leak for anyone creating sensors via those methods.

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-6988) Kafka windows classpath too long

2018-11-28 Thread lkgen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lkgen updated KAFKA-6988:
-
Comment: was deleted

(was: retest this please)

> Kafka windows classpath too long
> 
>
> Key: KAFKA-6988
> URL: https://issues.apache.org/jira/browse/KAFKA-6988
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 2.0.0
>Reporter: lkgen
>Priority: Major
>
> In Kafka windows, the kafka-run-class.bat script is building a CLASSPATH with 
> full path to each jar
> If installation is in a long path directory, the CLASSPATH becomes too long 
> and there is an error of
> {{**The input line is too long. }}
> {{when running zookeeper-server-start.bat and other commands}}
> {{a possible solution may be to expand all jars but add dir\* to classpath}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6988) Kafka windows classpath too long

2018-11-28 Thread lkgen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702333#comment-16702333
 ] 

lkgen commented on KAFKA-6988:
--

retest this please

> Kafka windows classpath too long
> 
>
> Key: KAFKA-6988
> URL: https://issues.apache.org/jira/browse/KAFKA-6988
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 2.0.0
>Reporter: lkgen
>Priority: Major
>
> In Kafka windows, the kafka-run-class.bat script is building a CLASSPATH with 
> full path to each jar
> If installation is in a long path directory, the CLASSPATH becomes too long 
> and there is an error of
> {{**The input line is too long. }}
> {{when running zookeeper-server-start.bat and other commands}}
> {{a possible solution may be to expand all jars but add dir\* to classpath}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7682) turning on request logging for a subset of request types

2018-11-28 Thread Gwen Shapira (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702304#comment-16702304
 ] 

Gwen Shapira commented on KAFKA-7682:
-

To add to the future KIP discussion:
In addition for extra logging on specific requests, it can be useful to be able 
to enable extra logging for requests arriving from specific clients (by 
client-id or principal or even source-IP?) - it is very common to have issues 
triggered by one specific clients.

> turning on request logging for a subset of request types
> 
>
> Key: KAFKA-7682
> URL: https://issues.apache.org/jira/browse/KAFKA-7682
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> Turning on request level logging can be useful for debugging. However, the 
> request logging can be quite verbose. It would be useful to turn if on for a 
> subset of the request types. We already have a jmx bean to turn on/off the 
> request logging dynamically. We could add a new jmx bean to control the 
> request types to be logged. This requires a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes

2018-11-28 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702291#comment-16702291
 ] 

Guozhang Wang commented on KAFKA-7669:
--

[~nijo] Also note that there is another KIP (KIP-307) for allowing users to 
define all of their operators in the DSL if they wanted to, which goes beyond 
stateful operations as well which can help resolve the ordering issue. But 
after all the DSL does require order in order to generate the same topology 
(same graph, same node names) independently on different instances.

> Stream topology definition is not robust to the ordering changes
> 
>
> Key: KAFKA-7669
> URL: https://issues.apache.org/jira/browse/KAFKA-7669
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> It seems that if the user does not guarantee the order of the stream topology 
> definition, he may end up with multiple stream branches having the same 
> internal changelog (and repartition, if created) topic. 
> Let's assume:
> {code:java}
> val initialStream = new StreamsBuilder().stream(sth);
> val someStrings = (1 to 10).map(_.toString)
> val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = 
> someStrings.map(s => s -> initialStream.filter(...)).toMap{code}
> When the user defines now common aggregation logic for the 
> notGuaranteedOrderOfStreams, and runs multiple instances of the application 
> the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will 
> contain results of the different streams from notGuaranteedOrderOfStreams map.
> All of this without a single warning that the topology (or just the order of 
> the topology definition) differs in different instances of the Kafka Streams 
> application.
> Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog 
> " match so well for the different application instances (and different 
> topologies).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7654) Relax requirements on serializing-only methods.

2018-11-28 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702279#comment-16702279
 ] 

Guozhang Wang commented on KAFKA-7654:
--

For 3), the main caveat is because of Java's type erasure, we cannot always 
infer the types after some operations. For example, if you do {{stream.map()}} 
which may change both keys and values, we cannot tell if the mapped keys or 
values are still JSon or has been transformed to another type: there are some 
operators that we are sure no types would be changed and hence we can still use 
the serde, but generally speaking we cannot always propagate the serde passed 
in from source throughout the topology.

> Relax requirements on serializing-only methods.
> ---
>
> Key: KAFKA-7654
> URL: https://issues.apache.org/jira/browse/KAFKA-7654
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Bieth
>Priority: Major
>
> Methods such as KStream#to shouldn't require a Produced as only the 
> serializing part is ever used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702277#comment-16702277
 ] 

ASF GitHub Bot commented on KAFKA-7610:
---

hachikuji opened a new pull request #5962: KAFKA-7610; Proactively timeout new 
group members if rebalance is delayed
URL: https://github.com/apache/kafka/pull/5962
 
 
   When a consumer first joins a group, it doesn't have an assigned memberId. 
If the rebalance is delayed for some reason, the client may disconnect after a 
request timeout and retry. Since the client had not received its memberId, then 
we do not have a way to detect the retry and expire the previously generated 
member id. This can lead to unbounded growth in the size of the group until the 
rebalance has completed.
   
   This patch fixes the problem by proactively completing all JoinGroup 
requests for new members after a timeout of 5 minutes. If the client is still 
around, we expect it to retry.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Boyang Chen
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-28 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702239#comment-16702239
 ] 

Matthias J. Sax commented on KAFKA-7678:


Thank you!

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Minor
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6951) Implement offset expiration semantics for unsubscribed topics

2018-11-28 Thread James Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702167#comment-16702167
 ] 

James Cheng commented on KAFKA-6951:


[~vultron81]: I think that the kafka-consumer-groups.sh tool allows you to 
delete offsets for a specific topic within a consumer group. Something like

$ kafka-consumer-groups.sh --group groupId --topic topicName --delete

 

 

 

> Implement offset expiration semantics for unsubscribed topics
> -
>
> Key: KAFKA-6951
> URL: https://issues.apache.org/jira/browse/KAFKA-6951
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.2.0
>
>
> [This 
> portion|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets#KIP-211:ReviseExpirationSemanticsofConsumerGroupOffsets-UnsubscribingfromaTopic]
>  of KIP-211 will be implemented separately from the main PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type

2018-11-28 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702176#comment-16702176
 ] 

Jun Rao commented on KAFKA-7681:


Hi, [~mgharat],

The Broker Topic metrics only collect the byte read/write rate. This is a bit 
limiting since it only covers produce/fetch requests. Sometimes, other types of 
requests (e.g. metadata, joinGroup) could be hogging the request handling 
threads. Also, the byte rate doesn't tell us how much the request handler 
thread is being used. For example, the serving of a fetch request is mostly 
done in the network thread, instead of the request handling thread.

What I am thinking is the following. Each request handler thread is being used 
from the time that it takes a request from the request queue, until the local 
processing of the request is done (KafkaApis.handle() returns), which is the 
request localTime. If we aggregate the localTime per request type (e.g. 
producer/ fetch/metadata/joinGroup, etc) and calculate a rate of that value, it 
gives us the fraction of request handler usage by request type. This will tell 
us which type of request is using the requests handler threads the most and can 
be useful for debugging.

> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6988) Kafka windows classpath too long

2018-11-28 Thread lkgen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lkgen updated KAFKA-6988:
-
Affects Version/s: 2.0.0

> Kafka windows classpath too long
> 
>
> Key: KAFKA-6988
> URL: https://issues.apache.org/jira/browse/KAFKA-6988
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 2.0.0
>Reporter: lkgen
>Priority: Major
>
> In Kafka windows, the kafka-run-class.bat script is building a CLASSPATH with 
> full path to each jar
> If installation is in a long path directory, the CLASSPATH becomes too long 
> and there is an error of
> {{**The input line is too long. }}
> {{when running zookeeper-server-start.bat and other commands}}
> {{a possible solution may be to expand all jars but add dir\* to classpath}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6988) Kafka windows classpath too long

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702137#comment-16702137
 ] 

ASF GitHub Bot commented on KAFKA-6988:
---

lkgendev opened a new pull request #5960: KAFKA-6988: Improve Kafka classpath 
length by using all files in dir instead of adding one by one
URL: https://github.com/apache/kafka/pull/5960
 
 
   Fixing Windows too long command due to classpath in case kafka is installed 
in a directory the has large length, also shorten the Unix classpath as it can 
have similar problem on docker
   
   the contribution is my original work and I license the work to the project 
under the project's open source license.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka windows classpath too long
> 
>
> Key: KAFKA-6988
> URL: https://issues.apache.org/jira/browse/KAFKA-6988
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: lkgen
>Priority: Major
>
> In Kafka windows, the kafka-run-class.bat script is building a CLASSPATH with 
> full path to each jar
> If installation is in a long path directory, the CLASSPATH becomes too long 
> and there is an error of
> {{**The input line is too long. }}
> {{when running zookeeper-server-start.bat and other commands}}
> {{a possible solution may be to expand all jars but add dir\* to classpath}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7671) A KStream/GlobalKTable join shouldn't reset the repartition flag

2018-11-28 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-7671:
--

Assignee: Bill Bejeck

> A KStream/GlobalKTable join shouldn't reset the repartition flag
> 
>
> Key: KAFKA-7671
> URL: https://issues.apache.org/jira/browse/KAFKA-7671
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Andy Bryant
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> Currently a KStream/GlobalKTable join resets the repartition required flag to 
> false.
> I have a topology where I map a stream, join with a GlobalKTable, then 
> groupByKey then aggregate.
> The aggregate wasn't behaving correctly because it didn't force a repartition 
> as I expected. The KStream/GlobalKTable join had reset the flag and hence I 
> was getting the same keys in different partitions.
> Since a KStream/GlobalKTable join does not itself force a repartition, it 
> should simply propagate the flag down to the resultant KStream the same way 
> most other operators work.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7671) A KStream/GlobalKTable join shouldn't reset the repartition flag

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702045#comment-16702045
 ] 

ASF GitHub Bot commented on KAFKA-7671:
---

bbejeck opened a new pull request #5959: KAFKA-7671: Stream-Global Table join 
should not reset repartition flag
URL: https://github.com/apache/kafka/pull/5959
 
 
   This PR fixes an issue reported from a user.  When we join a `KStream` with 
a `GlobalKTable` we should not reset the repartition flag as the stream may 
have previously changed its key, and the resulting stream could be used in an 
aggregation operation or join with another stream which may require a 
repartition for correct results.
   
   I've added a test which fails without the fix.  
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> A KStream/GlobalKTable join shouldn't reset the repartition flag
> 
>
> Key: KAFKA-7671
> URL: https://issues.apache.org/jira/browse/KAFKA-7671
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Andy Bryant
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> Currently a KStream/GlobalKTable join resets the repartition required flag to 
> false.
> I have a topology where I map a stream, join with a GlobalKTable, then 
> groupByKey then aggregate.
> The aggregate wasn't behaving correctly because it didn't force a repartition 
> as I expected. The KStream/GlobalKTable join had reset the flag and hence I 
> was getting the same keys in different partitions.
> Since a KStream/GlobalKTable join does not itself force a repartition, it 
> should simply propagate the flag down to the resultant KStream the same way 
> most other operators work.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2607) Review `Time` interface and its usage

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701980#comment-16701980
 ] 

ASF GitHub Bot commented on KAFKA-2607:
---

Al12 opened a new pull request #5958: KAFKA-2607: Review Time interface and its 
usage
URL: https://github.com/apache/kafka/pull/5958
 
 
   This change renames methods in Time interface to make apparent which 
underlying implementation is used in each case


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Review `Time` interface and its usage
> -
>
> Key: KAFKA-2607
> URL: https://issues.apache.org/jira/browse/KAFKA-2607
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Ismael Juma
>Assignee: Aleksei Kogan
>Priority: Major
>  Labels: newbie
>
> Two of `Time` interface's methods are `milliseconds` and `nanoseconds` which 
> are implemented in `SystemTime` as follows:
> {code}
> @Override
> public long milliseconds() {
> return System.currentTimeMillis();
> }
> @Override
> public long nanoseconds() {
> return System.nanoTime();
> }
> {code}
> The issue with this interface is that it makes it seem that the difference is 
> about the unit (`ms` versus `ns`) whereas it's much more than that:
> https://blogs.oracle.com/dholmes/entry/inside_the_hotspot_vm_clocks
> We should probably change the names of the methods and review our usage to 
> see if we're using the right one in the various places.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6951) Implement offset expiration semantics for unsubscribed topics

2018-11-28 Thread Ben Williams (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701872#comment-16701872
 ] 

Ben Williams commented on KAFKA-6951:
-

Is there no way for a topic to expire or be unsubscribed from a Consumer Group 
without the change described in this issue?  I am looking for a work around to 
remove topic membership in a consumer group when that topic is no longer 
consumed from.

> Implement offset expiration semantics for unsubscribed topics
> -
>
> Key: KAFKA-6951
> URL: https://issues.apache.org/jira/browse/KAFKA-6951
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.2.0
>
>
> [This 
> portion|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets#KIP-211:ReviseExpirationSemanticsofConsumerGroupOffsets-UnsubscribingfromaTopic]
>  of KIP-211 will be implemented separately from the main PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7685) Support loading trust stores from classpath

2018-11-28 Thread Noa Resare (JIRA)
Noa Resare created KAFKA-7685:
-

 Summary: Support loading trust stores from classpath
 Key: KAFKA-7685
 URL: https://issues.apache.org/jira/browse/KAFKA-7685
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.1.0
Reporter: Noa Resare


Certificate pinning as well as authenticating kafka brokers using a non-public 
CA certificate maintained inside an organisation is desirable to a lot of 
users. This can be accomplished today using the {{ssl.truststore.location}} 
configuration property. Unfortunately, this value is always interpreted as a 
filesystem path which makes distribution of such an alternative truststore a 
needlessly cumbersome process. If we had the ability to load a trust store from 
the classpath as well as from a file, the trust store could be shipped in a jar 
that could be declared as a regular maven style dependency.

If we did this by supporting prefixing {{ssl.truststore.location}} with 
{{classpath:}} this could be a backwards compatible change, one that builds on 
prior design patterns established by for example the Spring project.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-28 Thread Justin Jack (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701746#comment-16701746
 ] 

Justin Jack commented on KAFKA-7641:


oka good and sorry for disturbing you

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-28 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701743#comment-16701743
 ] 

Patrik Kleindl commented on KAFKA-7660:
---

[~vvcephei] Hi John

I tried applying the change to my local version but the problem persists.

Unless I did something wrong this block in StreamsMetrics is never even called 
when I do the rebalancing which seems strange.

> Stream Metrics - Memory Analysis
> 
>
> Key: KAFKA-7660
> URL: https://issues.apache.org/jira/browse/KAFKA-7660
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.0.0
>Reporter: Patrik Kleindl
>Priority: Minor
> Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-28 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701741#comment-16701741
 ] 

Stanislav Kozlovski commented on KAFKA-7641:


Hi Jack,
Thanks for offering help on this feature. We are already working on the KIP and 
it is currently being discussed. See 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit
]

I'm driving it after we synced offline with Boyang

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-28 Thread Justin Jack (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701703#comment-16701703
 ] 

Justin Jack commented on KAFKA-7641:


This change will require a KIP as it is a public interface change. I'm willing 
to go ahead with this if you'd like, as I know you're already working on 
KIP-345.



[WhatStatus.co|https://whatstatus.co/racist-jokes/]

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701690#comment-16701690
 ] 

ASF GitHub Bot commented on KAFKA-7037:
---

omkreddy closed pull request #5193: KAFKA-7037: Improve the topic command 
description of `--topic` option  
URL: https://github.com/apache/kafka/pull/5193
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 9d284521a6e..37dd233913b 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -308,8 +308,9 @@ object TopicCommand extends Logging {
 val deleteOpt = parser.accepts("delete", "Delete a topic")
 val alterOpt = parser.accepts("alter", "Alter the number of partitions, 
replica assignment, and/or configuration for the topic.")
 val describeOpt = parser.accepts("describe", "List details for the given 
topics.")
-val topicOpt = parser.accepts("topic", "The topic to be create, alter or 
describe. Can also accept a regular " +
-   "expression except for --create 
option")
+val topicOpt = parser.accepts("topic", "The topic to create, alter, 
describe or delete. It also accepts a regular " +
+   "expression, except for --create 
option. Put topic name in double quotes and use the '\\' prefix " +
+   "to escape regular expression 
symbols; e.g. \"test\\.topic\".")
  .withRequiredArg
  .describedAs("topic")
  .ofType(classOf[String])
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index a469f8efe04..5d2d873425f 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -278,4 +278,37 @@ class TopicCommandTest extends ZooKeeperTestHarness with 
Logging with RackAwareT
 assertTrue(output.contains(topic))
 assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
   }
+
+  @Test
+  def testTopicOperationsWithRegexSymbolInTopicName(): Unit = {
+val topic1 = "test.topic"
+val topic2 = "test-topic"
+val escapedTopic = "\"test\\.topic\""
+val unescapedTopic = "test.topic"
+val numPartitionsOriginal = 1
+
+// create brokers
+val brokers = List(0, 1, 2)
+TestUtils.createBrokersInZk(zkClient, brokers)
+
+// create the topics
+val createOpts = new TopicCommandOptions(Array("--partitions", 
numPartitionsOriginal.toString,
+  "--replication-factor", "1", "--topic", topic1))
+TopicCommand.createTopic(zkClient, createOpts)
+val createOpts2 = new TopicCommandOptions(Array("--partitions", 
numPartitionsOriginal.toString,
+  "--replication-factor", "1", "--topic", topic2))
+TopicCommand.createTopic(zkClient, createOpts2)
+
+val escapedCommandOpts = new TopicCommandOptions(Array("--topic", 
escapedTopic))
+val unescapedCommandOpts = new TopicCommandOptions(Array("--topic", 
unescapedTopic))
+
+// topic actions with escaped regex do not affect 'test-topic'
+// topic actions with unescaped topic affect 'test-topic'
+
+
assertFalse(TestUtils.grabConsoleOutput(TopicCommand.describeTopic(zkClient, 
escapedCommandOpts)).contains(topic2))
+
assertTrue(TestUtils.grabConsoleOutput(TopicCommand.describeTopic(zkClient, 
unescapedCommandOpts)).contains(topic2))
+
+assertFalse(TestUtils.grabConsoleOutput(TopicCommand.deleteTopic(zkClient, 
escapedCommandOpts)).contains(topic2))
+assertTrue(TestUtils.grabConsoleOutput(TopicCommand.deleteTopic(zkClient, 
unescapedCommandOpts)).contains(topic2))
+  }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Assignee: Vahid Hashemian
>Priority: Major
> 

[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-28 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701647#comment-16701647
 ] 

Jonathan Santilli commented on KAFKA-7678:
--

Yes [~mjsax] , is like that, still finished successfully and gracefully.

Now that we can say is a bug, I have changed to *minor* and assigned to myself.

Hope to send the PR soon.

 

Cheers!

--

Jonathan

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Priority: Minor
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-28 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli reassigned KAFKA-7678:


Assignee: Jonathan Santilli

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Minor
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-28 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli updated KAFKA-7678:
-
Priority: Minor  (was: Major)

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Priority: Minor
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6149) LogCleanerManager should include topic partition name when warning of invalid cleaner offset

2018-11-28 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6149.
--
   Resolution: Fixed
Fix Version/s: 0.11.0.1
   1.0.0

This was fixed in 1.0 and 0.11.0.1+ releases

> LogCleanerManager should include topic partition name when warning of invalid 
> cleaner offset 
> -
>
> Key: KAFKA-6149
> URL: https://issues.apache.org/jira/browse/KAFKA-6149
> Project: Kafka
>  Issue Type: Improvement
>  Components: log, logging
>Reporter: Ryan P
>Priority: Major
> Fix For: 1.0.0, 0.11.0.1
>
>
> The following message would be a lot more helpful if the topic partition name 
> were included.
> if (!isCompactAndDelete(log))
>   warn(s"Resetting first dirty offset to log start offset 
> $logStartOffset since the checkpointed offset $offset is invalid.")



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)