[jira] [Commented] (KAFKA-7194) Error deserializing assignment after rebalance

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16553954#comment-16553954
 ] 

ASF GitHub Bot commented on KAFKA-7194:
---

rajinisivaram closed pull request #5417: KAFKA-7194; Fix buffer underflow if 
onJoinComplete is retried after failure
URL: https://github.com/apache/kafka/pull/5417
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index b5c7a66e100..53834fb81df 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -200,9 +200,8 @@ public AbstractCoordinator(LogContext logContext,
  Map allMemberMetadata);
 
 /**
- * Invoked when a group member has successfully joined a group. If this 
call is woken up (i.e.
- * if the invocation raises {@link 
org.apache.kafka.common.errors.WakeupException}), then it
- * will be retried on the next call to {@link #ensureActiveGroup()}.
+ * Invoked when a group member has successfully joined a group. If this 
call fails with an exception,
+ * then it will be retried using the same assignment state on the next 
call to {@link #ensureActiveGroup()}.
  *
  * @param generation The generation that was joined
  * @param memberId The identifier for the local member in the group
@@ -418,7 +417,9 @@ boolean joinGroupIfNeeded(final long timeoutMs, final long 
startTimeMs) {
 }
 
 if (future.succeeded()) {
-onJoinComplete(generation.generationId, generation.memberId, 
generation.protocol, future.value());
+// Duplicate the buffer in case `onJoinComplete` does not 
complete and needs to be retried.
+ByteBuffer memberAssignment = future.value().duplicate();
+onJoinComplete(generation.generationId, generation.memberId, 
generation.protocol, memberAssignment);
 
 // We reset the join group future only after the completion 
callback returns. This ensures
 // that if the callback is woken up, we will retry it on the 
next joinGroupIfNeeded.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index ea6d47249eb..f9b77e921cd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -269,10 +269,10 @@ protected void onJoinComplete(int generation,
 this.joinedSubscription = newJoinedSubscription;
 }
 
-// update the metadata and enforce a refresh to make sure the fetcher 
can start
-// fetching data in the next iteration
+// Update the metadata to include the full group subscription. The 
leader will trigger a rebalance
+// if there are any metadata changes affecting any of the consumed 
partitions (whether or not this
+// instance is subscribed to the topics).
 this.metadata.setTopics(subscriptions.groupSubscription());
-if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new 
TimeoutException();
 
 // give the assignor a chance to update internal state based on the 
received assignment
 assignor.onAssignment(assignment);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7c2638cf012..ba392c6f4cb 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -874,6 +874,57 @@ public boolean matches(AbstractRequest body) {
 assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), 
subscriptions.assignedPartitions());
 }
 
+@Test
+public void testWakeupFromAssignmentCallback() {
+ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), 
assignors,
+ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
+
+final String topic = "topic1";
+TopicPartition partition = new TopicPartition(topic, 0);
+final String 

[jira] [Resolved] (KAFKA-2963) Replace server internal usage of TopicAndPartition with TopicPartition

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2963.
--
Resolution: Fixed

This has been fixed in various cleanups

> Replace server internal usage of TopicAndPartition with TopicPartition
> --
>
> Key: KAFKA-2963
> URL: https://issues.apache.org/jira/browse/KAFKA-2963
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Jakub Nowak
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7194) Error deserializing assignment after rebalance

2018-07-24 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7194.
---
   Resolution: Fixed
 Reviewer: Konstantine Karantasis
Fix Version/s: 2.0.0

> Error deserializing assignment after rebalance
> --
>
> Key: KAFKA-7194
> URL: https://issues.apache.org/jira/browse/KAFKA-7194
> Project: Kafka
>  Issue Type: Bug
>Reporter: Konstantine Karantasis
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.0
>
>
> A simple sink connector task is failing in a test with the following 
> exception: 
> {noformat}
> [2018-07-02 12:31:13,200] ERROR WorkerSinkTask{id=verifiable-sink-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'version': java.nio.BufferUnderflowException
>         at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:243)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:353)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:338)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748){noformat}
>  
> After dumping the consumer offsets on the partition that this consumer group 
> is writing with: 
> {noformat}
> bin/kafka-dump-log.sh --offsets-decoder --files ./.log 
> {noformat}
> we get: 
> {noformat}
> Dumping ./.log
> Starting offset: 0
> offset: 0 position: 0 CreateTime: 1530534673177 isvalid: true keysize: 27 
> valuesize: 217 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
> sequence: -1 isTransactional: false headerKeys: [] key: 
> {"metadata":"connect-verifiable-sink"} payload: 
> {"protocolType":"consumer","protocol":"range","generationId":1,"assignment":"{consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4=[test-0]}"}
> offset: 1 position: 314 CreateTime: 1530534673206 isvalid: true keysize: 27 
> valuesize: 32 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 
> sequence: -1 isTransactional: false headerKeys: [] key: 
> {"metadata":"connect-verifiable-sink"} payload: 
> {"protocolType":"consumer","protocol":null,"generationId":2,"assignment":"{}"}{noformat}
>  
> Since the broker seems to send a non-empty response to the consumer, there's 
> a chance that the response buffer is consumed more than once at some point 
> when parsing the response in the client. 
> Here's what the kafka-request.log shows it sends to the client with the 
> `SYNC_GROUP` response that throws the error: 
> {noformat}
> [2018-07-02 12:31:13,185] DEBUG Completed 
> request:RequestHeader(apiKey=SYNC_GROUP, apiVersion=2, clientId=consumer-4, 
> correlationId=5) -- 
> {group_id=connect-verifiable-sink,generation_id=1,member_id=consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4,group_assignment=[{member_id=consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4,member_assignment=java.nio.HeapByteBuffer[pos=0
> 

[jira] [Assigned] (KAFKA-7195) StreamStreamJoinIntegrationTest fails in 2.0 Jenkins

2018-07-24 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reassigned KAFKA-7195:
-

Assignee: Manikumar

> StreamStreamJoinIntegrationTest fails in 2.0 Jenkins
> 
>
> Key: KAFKA-7195
> URL: https://issues.apache.org/jira/browse/KAFKA-7195
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
>
> From 
> https://builds.apache.org/job/kafka-2.0-jdk8/87/testReport/junit/org.apache.kafka.streams.integration/StreamStreamJoinIntegrationTest/testOuter_caching_enabled___false_/
>  :
> {code}
> java.lang.AssertionError: 
> Expected: is <[A-null]>
>  but: was <[A-a, A-b, A-c, A-d]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.checkResult(AbstractJoinIntegrationTest.java:171)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:212)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:184)
>   at 
> org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest.testOuter(StreamStreamJoinIntegrationTest.java:198)
> {code}
> However, some test output was missing:
> {code}
> [2018-07-23 20:51:36,363] INFO Socket c
> ...[truncated 1627692 chars]...
> 671)
> {code}
> I ran the test locally which passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7195) StreamStreamJoinIntegrationTest fails in 2.0 Jenkins

2018-07-24 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554006#comment-16554006
 ] 

Rajini Sivaram commented on KAFKA-7195:
---

[~omkreddy] I have merged the PR to 2.0 branch, but leaving the JIRA open for 
now to address review comments.

> StreamStreamJoinIntegrationTest fails in 2.0 Jenkins
> 
>
> Key: KAFKA-7195
> URL: https://issues.apache.org/jira/browse/KAFKA-7195
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
>
> From 
> https://builds.apache.org/job/kafka-2.0-jdk8/87/testReport/junit/org.apache.kafka.streams.integration/StreamStreamJoinIntegrationTest/testOuter_caching_enabled___false_/
>  :
> {code}
> java.lang.AssertionError: 
> Expected: is <[A-null]>
>  but: was <[A-a, A-b, A-c, A-d]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.checkResult(AbstractJoinIntegrationTest.java:171)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:212)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:184)
>   at 
> org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest.testOuter(StreamStreamJoinIntegrationTest.java:198)
> {code}
> However, some test output was missing:
> {code}
> [2018-07-23 20:51:36,363] INFO Socket c
> ...[truncated 1627692 chars]...
> 671)
> {code}
> I ran the test locally which passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7195) StreamStreamJoinIntegrationTest fails in 2.0 Jenkins

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554004#comment-16554004
 ] 

ASF GitHub Bot commented on KAFKA-7195:
---

rajinisivaram closed pull request #5418: KAFKA-7195: Fix 
StreamStreamJoinIntegrationTest test failures
URL: https://github.com/apache/kafka/pull/5418
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 80ab60647ad..3e29fc2a29b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -163,7 +163,7 @@ void prepareEnvironment() throws InterruptedException {
 
 @After
 public void cleanup() throws InterruptedException {
-CLUSTER.deleteTopicsAndWait(12, INPUT_TOPIC_LEFT, 
INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
+CLUSTER.deleteAllTopicsAndWait(12);
 }
 
 private void checkResult(final String outputTopic, final List 
expectedResult) throws InterruptedException {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index ce6324df617..ab52649dee4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -274,6 +274,24 @@ public void deleteTopicsAndWait(final long timeoutMs, 
final String... topics) th
 }
 }
 
+/**
+ * Deletes all topics and blocks until all topics got deleted.
+ *
+ * @param timeoutMs the max time to wait for the topics to be deleted 
(does not block if {@code <= 0})
+ */
+public void deleteAllTopicsAndWait(final long timeoutMs) throws 
InterruptedException {
+final Set topics = new 
HashSet<>(JavaConverters.seqAsJavaListConverter(zkUtils.getAllTopics()).asJava());
+for (final String topic : topics) {
+try {
+brokers[0].deleteTopic(topic);
+} catch (final UnknownTopicOrPartitionException e) { }
+}
+
+if (timeoutMs > 0) {
+TestUtils.waitForCondition(new TopicsDeletedCondition(topics), 
timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
+}
+}
+
 public void deleteAndRecreateTopics(final String... topics) throws 
InterruptedException {
 deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
 createTopics(topics);
@@ -295,6 +313,10 @@ private TopicsDeletedCondition(final String... topics) {
 Collections.addAll(deletedTopics, topics);
 }
 
+public TopicsDeletedCondition(final Set topics) {
+deletedTopics.addAll(topics);
+}
+
 @Override
 public boolean conditionMet() {
 final Set allTopics = new HashSet<>(


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamStreamJoinIntegrationTest fails in 2.0 Jenkins
> 
>
> Key: KAFKA-7195
> URL: https://issues.apache.org/jira/browse/KAFKA-7195
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
>
> From 
> https://builds.apache.org/job/kafka-2.0-jdk8/87/testReport/junit/org.apache.kafka.streams.integration/StreamStreamJoinIntegrationTest/testOuter_caching_enabled___false_/
>  :
> {code}
> java.lang.AssertionError: 
> Expected: is <[A-null]>
>  but: was <[A-a, A-b, A-c, A-d]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.checkResult(AbstractJoinIntegrationTest.java:171)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:212)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:184)
>   at 
> org.apache.kafka.stre

[jira] [Commented] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists

2018-07-24 Thread Adrian McCague (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554059#comment-16554059
 ] 

Adrian McCague commented on KAFKA-6767:
---

Hi [~guozhang] we are seeing this issue as well on a relatively frequent basis 
(Streams 1.1.0) here is my case:
{code:java}
task [1_1] Failed to write offset checkpoint file to 
/data/my-topology/1_1/.checkpoint: {}  
java.io.FileNotFoundException: /data/my-topology/1_1/.checkpoint.tmp (No such 
file or directory) at java.io.FileOutputStream.open0(FileOutputStream.java) at 
java.io.FileOutputStream.open(FileOutputStream.java:270) at 
java.io.FileOutputStream.(FileOutputStream.java:213) at 
java.io.FileOutputStream.(FileOutputStream.java:162) at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
 at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
 at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
 at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
This is the sub-topology:
{code:java}
Sub-topology: 1
Source: KSTREAM-SOURCE-11 (topics: [x])
--> KSTREAM-PEEK-12
Source: KSTREAM-SOURCE-15 (topics: [y])
--> KSTREAM-PEEK-16
Processor: KSTREAM-PEEK-12 (stores: [])
--> KSTREAM-KEY-SELECT-13
<-- KSTREAM-SOURCE-11
Processor: KSTREAM-PEEK-16 (stores: [])
--> KSTREAM-KEY-SELECT-17
<-- KSTREAM-SOURCE-15
Processor: KSTREAM-KEY-SELECT-13 (stores: [])
--> KSTREAM-MAPVALUES-14
<-- KSTREAM-PEEK-12
Processor: KSTREAM-KEY-SELECT-17 (stores: [])
--> KSTREAM-MAPVALUES-18
<-- KSTREAM-PEEK-16
Processor: KSTREAM-MAPVALUES-14 (stores: [])
--> KSTREAM-MERGE-19
<-- KSTREAM-KEY-SELECT-13
Processor: KSTREAM-MAPVALUES-18 (stores: [])
--> KSTREAM-MERGE-19
<-- KSTREAM-KEY-SELECT-17
Processor: KSTREAM-MERGE-19 (stores: [])
--> KSTREAM-FILTER-22
<-- KSTREAM-MAPVALUES-14, KSTREAM-MAPVALUES-18
Processor: KSTREAM-FILTER-22 (stores: [])
--> KSTREAM-SINK-21
<-- KSTREAM-MERGE-19
Sink: KSTREAM-SINK-21 (topic: z-store-repartition)
<-- KSTREAM-FILTER-22{code}
So I believe this supports your theory that stateless tasks are attempting to 
checkpoint. In this case it appears the final sink is related to a repartition 
before a DSL Aggregate, which may hint towards the bug.

> OffsetCheckpoint write assumes parent directory exists
> --
>
> Key: KAFKA-6767
> URL: https://issues.apache.org/jira/browse/KAFKA-6767
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an 
> instance dies it is created from scratch, rather than reusing the existing 
> RocksDB.)
> We routinely see:
> {code:java}
> 2018-04-09T19:14:35.004Z WARN <> 
> [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset 
> checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
> java.io.FileNotFoundException: 
> /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> at java.io.FileOutputStream.(FileOutputStream.java:162)
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorSta

[jira] [Updated] (KAFKA-7195) StreamStreamJoinIntegrationTest fails in 2.0 Jenkins

2018-07-24 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7195:
--
Affects Version/s: 2.0.0
Fix Version/s: 2.0.0

> StreamStreamJoinIntegrationTest fails in 2.0 Jenkins
> 
>
> Key: KAFKA-7195
> URL: https://issues.apache.org/jira/browse/KAFKA-7195
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.0
>
>
> From 
> https://builds.apache.org/job/kafka-2.0-jdk8/87/testReport/junit/org.apache.kafka.streams.integration/StreamStreamJoinIntegrationTest/testOuter_caching_enabled___false_/
>  :
> {code}
> java.lang.AssertionError: 
> Expected: is <[A-null]>
>  but: was <[A-a, A-b, A-c, A-d]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.checkResult(AbstractJoinIntegrationTest.java:171)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:212)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:184)
>   at 
> org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest.testOuter(StreamStreamJoinIntegrationTest.java:198)
> {code}
> However, some test output was missing:
> {code}
> [2018-07-23 20:51:36,363] INFO Socket c
> ...[truncated 1627692 chars]...
> 671)
> {code}
> I ran the test locally which passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-07-24 Thread Laven Sukumaran (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Laven Sukumaran updated KAFKA-6388:
---
Comment: was deleted

(was: .)

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184)
> ... 13 more
> [2017-12-19 15:16:24,302] INFO [ReplicaFetcher replicaId=2, lead

[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-07-24 Thread Laven Sukumaran (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554236#comment-16554236
 ] 

Laven Sukumaran commented on KAFKA-6388:


.

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184)
> ... 13 more
> [2017-12-19 15:16:24,302] INFO [Replica

[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-07-24 Thread Laven Sukumaran (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554237#comment-16554237
 ] 

Laven Sukumaran commented on KAFKA-5882:


Thanks for the update [~mjsax], it helped fixed the issue we were seeing. I'm 
posting our experience here in case it helps anyone else. 

We were experiencing the same exception upon a rolling update of our service 
even when we did not explicitly change the topology. We used the describe 
method on my topology ("topology.describe()"), to log the topology on every 
rolling update. What we found was that the topology printed out in the logs was 
frequently inconsistent and likely causing our issue as per your comment above. 
From this observation we could conclude that the issue must lie in how the 
topology was being built. Our service leverages Spring Boot DI when setting up 
our kafka streams. We had 3 kafka streams beans/components that were being 
created with the kafka streamsbuilder however there wasn't any logic in place 
to control the order in which these kafka streams were being created, therefore 
leading to an inconsistent topology on every launch of our service. We have not 
seen the issue any more since forcing the order of creation of our kafka 
streams.

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5882) NullPointerException in StreamTask

2018-07-24 Thread Laven Sukumaran (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554237#comment-16554237
 ] 

Laven Sukumaran edited comment on KAFKA-5882 at 7/24/18 1:22 PM:
-

Thanks for the update [~mjsax], it helped us pinpoint and fix the issue we were 
seeing. I'm posting our experience here in case it helps anyone else. 

We were experiencing the same exception upon a rolling update of our service 
even when we did not explicitly change the topology. We used the describe 
method on my topology ("topology.describe()"), to log the topology on every 
rolling update. What we found was that the topology printed out in the logs was 
frequently inconsistent and likely causing our issue as per your comment above. 
From this observation we could conclude that the issue must lie in how the 
topology was being built. Our service leverages Spring Boot DI when setting up 
our kafka streams. We had 3 kafka streams beans/components that were being 
created with the kafka streamsbuilder however there wasn't any logic in place 
to control the order in which these kafka streams were being created, therefore 
leading to an inconsistent topology on every launch of our service. We have not 
seen the issue any more since forcing the order of creation of our kafka 
streams.


was (Author: laven):
Thanks for the update [~mjsax], it helped fixed the issue we were seeing. I'm 
posting our experience here in case it helps anyone else. 

We were experiencing the same exception upon a rolling update of our service 
even when we did not explicitly change the topology. We used the describe 
method on my topology ("topology.describe()"), to log the topology on every 
rolling update. What we found was that the topology printed out in the logs was 
frequently inconsistent and likely causing our issue as per your comment above. 
From this observation we could conclude that the issue must lie in how the 
topology was being built. Our service leverages Spring Boot DI when setting up 
our kafka streams. We had 3 kafka streams beans/components that were being 
created with the kafka streamsbuilder however there wasn't any logic in place 
to control the order in which these kafka streams were being created, therefore 
leading to an inconsistent topology on every launch of our service. We have not 
seen the issue any more since forcing the order of creation of our kafka 
streams.

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(

[jira] [Created] (KAFKA-7197) Release a milestone build for Scala 2.13.0 M3

2018-07-24 Thread JIRA
Martynas Mickevičius created KAFKA-7197:
---

 Summary: Release a milestone build for Scala 2.13.0 M3
 Key: KAFKA-7197
 URL: https://issues.apache.org/jira/browse/KAFKA-7197
 Project: Kafka
  Issue Type: Improvement
Reporter: Martynas Mickevičius


Releasing a milestone version for Scala 2.13.0-M3 (and maybe even for 
2.13.0-M4, which has new collections) would be helpful to kickstart Kafka 
ecosystem adoption for 2.13.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7195) StreamStreamJoinIntegrationTest fails in 2.0 Jenkins

2018-07-24 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7195:
--
Fix Version/s: (was: 2.0.0)

> StreamStreamJoinIntegrationTest fails in 2.0 Jenkins
> 
>
> Key: KAFKA-7195
> URL: https://issues.apache.org/jira/browse/KAFKA-7195
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
>
> From 
> https://builds.apache.org/job/kafka-2.0-jdk8/87/testReport/junit/org.apache.kafka.streams.integration/StreamStreamJoinIntegrationTest/testOuter_caching_enabled___false_/
>  :
> {code}
> java.lang.AssertionError: 
> Expected: is <[A-null]>
>  but: was <[A-a, A-b, A-c, A-d]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.checkResult(AbstractJoinIntegrationTest.java:171)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:212)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:184)
>   at 
> org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest.testOuter(StreamStreamJoinIntegrationTest.java:198)
> {code}
> However, some test output was missing:
> {code}
> [2018-07-23 20:51:36,363] INFO Socket c
> ...[truncated 1627692 chars]...
> 671)
> {code}
> I ran the test locally which passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7198) Enhance KafkaStreams start method javadoc

2018-07-24 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-7198:
--

 Summary: Enhance KafkaStreams start method javadoc
 Key: KAFKA-7198
 URL: https://issues.apache.org/jira/browse/KAFKA-7198
 Project: Kafka
  Issue Type: Improvement
Reporter: Bill Bejeck
 Fix For: 2.1.0


The {{KafkaStreams.start}} method javadoc states that once called the streams 
threads are started in the background hence the method does not block.  However 
you have GlobalKTables in your topology, the threads aren't started until the 
GlobalKTables bootstrap fully so the javadoc for the {{start}} method should be 
updated to reflect this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7199) Support BigInteger data type

2018-07-24 Thread Jiri Pechanec (JIRA)
Jiri Pechanec created KAFKA-7199:


 Summary: Support BigInteger data type
 Key: KAFKA-7199
 URL: https://issues.apache.org/jira/browse/KAFKA-7199
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Jiri Pechanec


When a data are sourced from database to the Connect it is possible that 
`unsigned long` or larger integer values need to be represented. Currently only 
{{BigDecimal}} or {{String}} are types that could be used. Unfortunately in 
this case the semantic information about the value being the integer one is 
lost. We thus propose to introduce {{BigInteger}} logical datattype as a 
conterpart to the {{BigDecimal}}.

Would there be an interest in contribution from our side?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists

2018-07-24 Thread Adrian McCague (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554541#comment-16554541
 ] 

Adrian McCague commented on KAFKA-6767:
---

Also perhaps a coincidence but topology was running fine for a month before a 
rebalance occurred, at which point these errors started cropping up

> OffsetCheckpoint write assumes parent directory exists
> --
>
> Key: KAFKA-6767
> URL: https://issues.apache.org/jira/browse/KAFKA-6767
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an 
> instance dies it is created from scratch, rather than reusing the existing 
> RocksDB.)
> We routinely see:
> {code:java}
> 2018-04-09T19:14:35.004Z WARN <> 
> [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset 
> checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
> java.io.FileNotFoundException: 
> /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> at java.io.FileOutputStream.(FileOutputStream.java:162)
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
> Inspecting the state store directory, I can indeed see that {{chat/0_11}} 
> does not exist (although many other partitions do).
>  
> Looking at the OffsetCheckpoint write method, it seems to try to open a new 
> checkpoint file without first ensuring that the parent directory exists.
>  
> {code:java}
>     public void write(final Map offsets) throws 
> IOException {
>     // if there is no offsets, skip writing the file to save disk IOs
>     if (offsets.isEmpty()) {
>     return;
>     }
>     synchronized (lock) {
>     // write to temp file and then swap with the existing file
>     final File temp = new File(file.getAbsolutePath() + ".tmp");{code}
>  
> Either the OffsetCheckpoint class should initialize the directories if 
> needed, or some precondition of it being called should ensure that is the 
> case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7144) Kafka Streams doesn't properly balance partition assignment

2018-07-24 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-7144:
--

Assignee: Bill Bejeck

> Kafka Streams doesn't properly balance partition assignment
> ---
>
> Key: KAFKA-7144
> URL: https://issues.apache.org/jira/browse/KAFKA-7144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: James Cheng
>Assignee: Bill Bejeck
>Priority: Major
> Attachments: OneThenTwelve.java
>
>
> Kafka Streams doesn't always spread the tasks across all available 
> instances/threads
> I have a topology which consumes a single partition topic and goes .through() 
> a 12 partition topic. The makes 13 partitions.
>  
> I then started 2 instances of the application. I would have expected the 13 
> partitions to be split across the 2 instances roughly evenly (7 partitions on 
> one, 6 partitions on the other).
> Instead, one instance gets 12 partitions, and the other instance gets 1 
> partition.
>  
> Repro case attached. I ran it a couple times, and it was fairly repeatable.
> Setup for the repro:
> {code:java}
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic one 
> --partitions 1 --replication-factor 1 
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic twelve 
> --partitions 12 --replication-factor 1
> $ echo foo | kafkacat -P -b 127.0.0.1 -t one
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2075) Validate that all kafka.api requests has been removed and clean up compatibility code

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2075.
--
Resolution: Fixed

Old scala request/reponse classes are removed as part of KAFKA-2983:

> Validate that all kafka.api requests has been removed and clean up 
> compatibility code
> -
>
> Key: KAFKA-2075
> URL: https://issues.apache.org/jira/browse/KAFKA-2075
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Priority: Major
>
> Once we finished all other subtasks - the old kafka.api requests/responses 
> shouldn't be used anywhere.
> We need to validate that the classes are indeed gone, remove the unittests 
> for serializing/deserializing them and clean up the compatibility code added 
> in KAFKA-2044.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-07-24 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554586#comment-16554586
 ] 

Matthias J. Sax commented on KAFKA-5882:


Thanks for sharing your findings [~laven]!

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7200) Preserve serdes used

2018-07-24 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-7200:
--

 Summary: Preserve serdes used 
 Key: KAFKA-7200
 URL: https://issues.apache.org/jira/browse/KAFKA-7200
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bill Bejeck
Assignee: Bill Bejeck
 Fix For: 2.1.0


We need to preserve the serdes used and pass them either to parent nodes or 
child nodes in the topology graph required for optimization when removing 
repartition topics and adding a new one upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2969) Refactor MetadataCache to be optimal for o.a.k.c requests

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2969.
--
Resolution: Fixed

Closing this as scala request/responses classes are removed and MetadataCache 
updated with new o.a.k.c requests

> Refactor MetadataCache to be optimal for o.a.k.c requests
> -
>
> Key: KAFKA-2969
> URL: https://issues.apache.org/jira/browse/KAFKA-2969
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Major
>
> Once UpdateMetadataRequest and TopicMetadata are minimally migrated to the 
> new o.a.k.c requests. Refactor MetadataCache to use the new classes more 
> cleanly and use o.a.k.c classes internally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2968) Refactor ReplicaManager to be optimal for o.a.k.c requests

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2968.
--
Resolution: Fixed

Closing this as scala request/responses classes are removed and ReplicaManager 
is updated with new o.a.k.c requests

> Refactor ReplicaManager to be optimal for o.a.k.c requests
> --
>
> Key: KAFKA-2968
> URL: https://issues.apache.org/jira/browse/KAFKA-2968
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Major
>
> Once LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest are 
> minimally migrated to the new o.a.k.c requests. Refactor ReplicaManager to 
> use the new classes more cleanly and use o.a.k.c classes internally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7201) Optimize repartition operations

2018-07-24 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-7201:
--

 Summary: Optimize repartition operations
 Key: KAFKA-7201
 URL: https://issues.apache.org/jira/browse/KAFKA-7201
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bill Bejeck
Assignee: Bill Bejeck
 Fix For: 2.1.0


When the topology has a key changing operation, any downstream processors using 
the key will automatically create a repartition topic.  In most cases these 
multiple repartition topics can be collapsed into one repartition operation 
occurring immediately after the key changing operation, thus reducing streams 
overall footprint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2890) Strange behaviour during partitions reassignment.

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2890.
--
Resolution: Auto Closed

Closing inactive issue. Please reopen if the issue still exists in newer 
versions.

> Strange behaviour during partitions reassignment.
> -
>
> Key: KAFKA-2890
> URL: https://issues.apache.org/jira/browse/KAFKA-2890
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Alexander Kukushkin
>Assignee: Ismael Juma
>Priority: Major
> Attachments: reassign.json, reassign.out
>
>
> Hi.
> I am playing with the new version of kafka (0.9.0.0).
> Initially I've created cluster of 3 nodes, and created some topics there. 
> Later I've added one more node and triggered partitions reassignment. It's 
> kind of working, but on the new node in the log file there are strange 
> warnings:
> [2015-11-25 14:06:52,998] WARN [ReplicaFetcherThread-1-152], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@3f442c7b. Possible cause: 
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'responses': Error reading field 'topic': java.nio.BufferUnderflowException 
> (kafka.server.ReplicaFetcherThread)
> I've found similar log messages in the following ticket: 
> https://issues.apache.org/jira/browse/KAFKA-2756
> But there such messages were related to the replication between different 
> versions (0.8 and 0.9).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3040) Broker didn't report new data after change in leader

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3040.
--
Resolution: Auto Closed

Closing inactive issue. Please reopen if the issue still exists in newer 
versions.

> Broker didn't report new data after change in leader
> 
>
> Key: KAFKA-3040
> URL: https://issues.apache.org/jira/browse/KAFKA-3040
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: Debian 3.2.54-2 x86_64 GNU/Linux
>Reporter: Imran Patel
>Priority: Critical
>  Labels: reliability
>
> Recently we had an event that causes large Kafka backlogs to develop 
> suddenty. This happened across multiple partitions. We noticed that after a 
> brief connection loss to Zookeeper, Kafka brokers were not reporting no new 
> data to our (SimpleConsumer) consumer although the producers were enqueueing 
> fine. This went on until another zk blip led to a reconfiguration which 
> suddenly caused the consumers to "see" the data. Our consumers and our 
> monitoring tools did not see the offsets move during the outage window. Here 
> is the sequence of events for a single partition (with logs attached below). 
> The brokers are running 0.9, the producer is using library version 
> kafka_2.10:0.8.2.1 and consumer is using kafka_2.10:0.8.0 (both are Java 
> programs). Our monitoring tool uses kafka-python-9.0
> Can you tell us if this could be due to a consumer bug (the libraries being 
> too "old" to operate with 0.9 broker, for e.g.)? Or does it look a Kafka core 
> issue? Please note that we recently upgraded the brokers to 0.9 and hadn't 
> seen a similar issue prior to that.
> - after a brief connection loss to zookeeper, the partition leader (broker 9 
> for partition 29 in logs below) came back and shrunk the ISR to itself. 
> - producers kept on successfully sending data to Kafka and the remaining 
> replicas (brokers 3 and 4) recorded this data. AFAICT, 3 was the new leader. 
> Broker 9 did NOT replicate this data. It did repeatedly print the ISR 
> shrinking message over and over again.
> - consumer on the other hand reported no new data presumably because it was 
> talking to 9 and that broker was doing nothing.
> - 6 hours later, another zookeeper blip causes the brokers to reconfigure and 
> now consumers started seeing new data. 
> Broker 9:
> [2015-12-16 19:46:01,523] INFO Partition [messages,29] on broker 9: Expanding 
> ISR for partition [messages,29] from 9,4 to 9,4,3 (kafka.cluster.Partition
> [2015-12-18 00:59:25,511] INFO New leader is 9 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2015-12-18 01:00:18,451] INFO Partition [messages,29] on broker 9: Shrinking 
> ISR for partition [messages,29] from 9,4,3 to 9 (kafka.cluster.Partition)
> [2015-12-18 01:00:18,458] INFO Partition [messages,29] on broker 9: Cached 
> zkVersion [472] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
> [2015-12-18 07:04:44,552] INFO Truncating log messages-29 to offset 
> 14169556269. (kafka.log.Log)
> [2015-12-18 07:04:44,649] INFO [ReplicaFetcherManager on broker 9] Added 
> fetcher for partitions List([[messages,61], initOffset 14178575900 to broker 
> BrokerEndPoint(6,kafka006-prod.c.foo.internal,9092)] , [[messages,13], 
> initOffset 14156091271 to broker 
> BrokerEndPoint(2,kafka002-prod.c.foo.internal,9092)] , [[messages,45], 
> initOffset 14135826155 to broker 
> BrokerEndPoint(4,kafka004-prod.c.foo.internal,9092)] , [[messages,41], 
> initOffset 14157926400 to broker 
> BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] , [[messages,29], 
> initOffset 14169556269 to broker 
> BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] , [[messages,57], 
> initOffset 14175218230 to broker 
> BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] ) 
> (kafka.server.ReplicaFetcherManager)
> Broker 3:
> [2015-12-18 01:00:01,763] INFO [ReplicaFetcherManager on broker 3] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> [2015-12-18 07:09:04,631] INFO Partition [messages,29] on broker 3: Expanding 
> ISR for partition [messages,29] from 4,3 to 4,3,9 (kafka.cluster.Partition)
> [2015-12-18 07:09:49,693] INFO [ReplicaFetcherManager on broker 3] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> Broker 4:
> [2015-12-18 01:00:01,783] INFO [ReplicaFetcherManager on broker 4] Removed 
> fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
> [2015-12-18 01:00:01,866] INFO [ReplicaFetcherManager on broker 4] Added 
> fetcher for partitions List([[messages,29], initOffset 14169556262 to broker 
> BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] ) 
> (kafka.server.ReplicaFetcherManager)
> [2015-12-1

[jira] [Resolved] (KAFKA-2318) replica manager repeatedly tries to fetch from partitions already moved during controlled shutdown

2018-07-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2318.
--
Resolution: Auto Closed

Closing inactive issue. Please reopen if the issue still exists in newer 
versions.

> replica manager repeatedly tries to fetch from partitions already moved 
> during controlled shutdown
> --
>
> Key: KAFKA-2318
> URL: https://issues.apache.org/jira/browse/KAFKA-2318
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: Jason Rosenberg
>Priority: Major
>
> Using version 0.8.2.1.
> During a controlled shutdown, it seems like the left-hand is often not 
> talking to the right :)
> In this case, we see the ReplicaManager remove a fetcher for a partition, 
> truncate it's log, and then apparently try to fetch data from that partition 
> repeatedly, spamming the log with "failed due to Leader not local for 
> partition" warnings.
> Below is a snippet (in this case it happened for partition 
> '__consumer_offsets,7' and '__consumer_offsets,47').  It went on for quite a 
> bit longer than included here.  The current broker is '99' here.
> {code}
> 2015-07-07 18:54:26,415  INFO [kafka-request-handler-0] 
> server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 99] Removed 
> fetcher for partitions [__consumer_offsets,7]
> 2015-07-07 18:54:26,415  INFO [kafka-request-handler-0] log.Log - Truncating 
> log __consumer_offsets-7 to offset 0.
> 2015-07-07 18:54:26,421  WARN [kafka-request-handler-3] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 6832556 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,429  WARN [kafka-request-handler-4] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345717 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,430  WARN [kafka-request-handler-2] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345718 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,431  WARN [kafka-request-handler-4] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345719 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,432  WARN [kafka-request-handler-5] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345720 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,433  WARN [kafka-request-handler-2] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345721 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,434  WARN [kafka-request-handler-3] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345722 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,436  WARN [kafka-request-handler-1] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345723 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,437  WARN [kafka-request-handler-2] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345724 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,438  WARN [kafka-request-handler-7] server.ReplicaManager 
> - [Replica Manager on Broker 99]: Fetch request with correlation id 4345725 
> from client ReplicaFetcherThread-0-99 on partition [__consumer_offsets,7] 
> failed due to Leader not local for partition [__consumer_offsets,7] on broker 
> 99
> 2015-07-07 18:54:26,438  INFO [kafka-request-handler-6] 
> server.ReplicaFetcherManage

[jira] [Resolved] (KAFKA-7195) StreamStreamJoinIntegrationTest fails in 2.0 Jenkins

2018-07-24 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7195.
--
   Resolution: Fixed
Fix Version/s: 2.0.0

> StreamStreamJoinIntegrationTest fails in 2.0 Jenkins
> 
>
> Key: KAFKA-7195
> URL: https://issues.apache.org/jira/browse/KAFKA-7195
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.0
>
>
> From 
> https://builds.apache.org/job/kafka-2.0-jdk8/87/testReport/junit/org.apache.kafka.streams.integration/StreamStreamJoinIntegrationTest/testOuter_caching_enabled___false_/
>  :
> {code}
> java.lang.AssertionError: 
> Expected: is <[A-null]>
>  but: was <[A-a, A-b, A-c, A-d]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.checkResult(AbstractJoinIntegrationTest.java:171)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:212)
>   at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:184)
>   at 
> org.apache.kafka.streams.integration.StreamStreamJoinIntegrationTest.testOuter(StreamStreamJoinIntegrationTest.java:198)
> {code}
> However, some test output was missing:
> {code}
> [2018-07-23 20:51:36,363] INFO Socket c
> ...[truncated 1627692 chars]...
> 671)
> {code}
> I ran the test locally which passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-07-24 Thread Harjit Singh (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554726#comment-16554726
 ] 

Harjit Singh commented on KAFKA-6520:
-

Even if I don't run Kafka  the wordCountDemo program doesn't complain not being 
able to connect to the broker. Shouldn't we be seeing the error on startup ?

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Milind Jain
>Priority: Major
>  Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-07-24 Thread Harjit Singh (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554726#comment-16554726
 ] 

Harjit Singh edited comment on KAFKA-6520 at 7/24/18 7:50 PM:
--

Even if I don't run Kafka,  the wordCountDemo program doesn't complain about 
not being able to connect to the broker. Shouldn't we be seeing the error on 
startup?


was (Author: harjitdotsingh):
Even if I don't run Kafka  the wordCountDemo program doesn't complain not being 
able to connect to the broker. Shouldn't we be seeing the error on startup ?

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Milind Jain
>Priority: Major
>  Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-07-24 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554730#comment-16554730
 ] 

Matthias J. Sax commented on KAFKA-6520:


[~harjitdotsingh] Well. That is what this ticket is about, isn't it? It doesn't 
matter if brokers are not reachable on startup or go down while the application 
is running. For both cases, it just tries to reconnect to the brokers silently.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Milind Jain
>Priority: Major
>  Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-07-24 Thread Harjit Singh (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554739#comment-16554739
 ] 

Harjit Singh commented on KAFKA-6520:
-

Should the stream stop running or log an error  ?

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Milind Jain
>Priority: Major
>  Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2018-07-24 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-4969:
---
Description: 
Currently, {{StreamPartitionsAssigner}} does not distinguish different "types" 
of tasks. For example, task can be stateless of have one or multiple stores.

This can lead to an suboptimal task placement: assume there are 2 stateless and 
2 stateful tasks and the app is running with 2 instances. To share the "store 
load" it would be good to place one stateless and one stateful task per 
instance. Right now, there is no guarantee about this, and it can happen, that 
one instance processed both stateless tasks while the other processes both 
stateful tasks.

We should improve {{StreamPartitionAssignor}} and introduce "task types" 
including a cost model for task placement. We should consider the following 
parameters:
 - number of stores
 - number of sources/sinks
 - number of processors
 - regular task vs standby task
 - in the case of standby tasks, which tasks have progressed the most with 
respect restoration

This improvement should be backed by a design document in the project wiki (no 
KIP required though) as it's a fairly complex change.

  was:
Currently, {{StreamPartitionsAssigner}} does not distinguish different "types" 
of tasks. For example, task can be stateless of have one or multiple stores.

This can lead to an suboptimal task placement: assume there are 2 stateless and 
2 stateful tasks and the app is running with 2 instances. To share the "store 
load" it would be good to place one stateless and one stateful task per 
instance. Right now, there is no guarantee about this, and it can happen, that 
one instance processed both stateless tasks while the other processes both 
stateful tasks.

We should improve {{StreamPartitionAssignor}} and introduce "task types" 
including a cost model for task placement. We should consider the following 
parameters:
 - number of stores
 - number of sources/sinks
 - number of processors
 - regular task vs standby task

This improvement should be backed by a design document in the project wiki (no 
KIP required though) as it's a fairly complex change.


> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.1.0
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
>  - in the case of standby tasks, which tasks have progressed the most with 
> respect restoration
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2018-07-24 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-4969:
---
Description: 
Currently, {{StreamPartitionsAssigner}} does not distinguish different "types" 
of tasks. For example, task can be stateless of have one or multiple stores.

This can lead to an suboptimal task placement: assume there are 2 stateless and 
2 stateful tasks and the app is running with 2 instances. To share the "store 
load" it would be good to place one stateless and one stateful task per 
instance. Right now, there is no guarantee about this, and it can happen, that 
one instance processed both stateless tasks while the other processes both 
stateful tasks.

We should improve {{StreamPartitionAssignor}} and introduce "task types" 
including a cost model for task placement. We should consider the following 
parameters:
 - number of stores
 - number of sources/sinks
 - number of processors
 - regular task vs standby task
 - in the case of standby tasks, which tasks have progressed the most with 
respect to restoration

This improvement should be backed by a design document in the project wiki (no 
KIP required though) as it's a fairly complex change.

  was:
Currently, {{StreamPartitionsAssigner}} does not distinguish different "types" 
of tasks. For example, task can be stateless of have one or multiple stores.

This can lead to an suboptimal task placement: assume there are 2 stateless and 
2 stateful tasks and the app is running with 2 instances. To share the "store 
load" it would be good to place one stateless and one stateful task per 
instance. Right now, there is no guarantee about this, and it can happen, that 
one instance processed both stateless tasks while the other processes both 
stateful tasks.

We should improve {{StreamPartitionAssignor}} and introduce "task types" 
including a cost model for task placement. We should consider the following 
parameters:
 - number of stores
 - number of sources/sinks
 - number of processors
 - regular task vs standby task
 - in the case of standby tasks, which tasks have progressed the most with 
respect restoration

This improvement should be backed by a design document in the project wiki (no 
KIP required though) as it's a fairly complex change.


> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.1.0
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
>  - in the case of standby tasks, which tasks have progressed the most with 
> respect to restoration
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2018-07-24 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-4969:
---
Description: 
Currently, {{StreamPartitionsAssigner}} does not distinguish different "types" 
of tasks. For example, task can be stateless of have one or multiple stores.

This can lead to an suboptimal task placement: assume there are 2 stateless and 
2 stateful tasks and the app is running with 2 instances. To share the "store 
load" it would be good to place one stateless and one stateful task per 
instance. Right now, there is no guarantee about this, and it can happen, that 
one instance processed both stateless tasks while the other processes both 
stateful tasks.

We should improve {{StreamPartitionAssignor}} and introduce "task types" 
including a cost model for task placement. We should consider the following 
parameters:
 - number of stores
 - number of sources/sinks
 - number of processors
 - regular task vs standby task
 - in the case of standby tasks, which tasks have progressed the most with 
respect to restoration

This improvement should be backed by a design document in the project wiki (no 
KIP required though) as it's a fairly complex change.

 

There have been some additional discussions around task assignment on a related 
PR https://github.com/apache/kafka/pull/5390

  was:
Currently, {{StreamPartitionsAssigner}} does not distinguish different "types" 
of tasks. For example, task can be stateless of have one or multiple stores.

This can lead to an suboptimal task placement: assume there are 2 stateless and 
2 stateful tasks and the app is running with 2 instances. To share the "store 
load" it would be good to place one stateless and one stateful task per 
instance. Right now, there is no guarantee about this, and it can happen, that 
one instance processed both stateless tasks while the other processes both 
stateful tasks.

We should improve {{StreamPartitionAssignor}} and introduce "task types" 
including a cost model for task placement. We should consider the following 
parameters:
 - number of stores
 - number of sources/sinks
 - number of processors
 - regular task vs standby task
 - in the case of standby tasks, which tasks have progressed the most with 
respect to restoration

This improvement should be backed by a design document in the project wiki (no 
KIP required though) as it's a fairly complex change.


> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.1.0
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
>  - in the case of standby tasks, which tasks have progressed the most with 
> respect to restoration
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.
>  
> There have been some additional discussions around task assignment on a 
> related PR https://github.com/apache/kafka/pull/5390



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-07-24 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554771#comment-16554771
 ] 

Matthias J. Sax commented on KAFKA-6520:


As the ticket description above says:

> It would be useful if the Stream State had a DISCONNECTED status.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Milind Jain
>Priority: Major
>  Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7202) Support multiple auto-generated docs formats

2018-07-24 Thread Joel Hamill (JIRA)
Joel Hamill created KAFKA-7202:
--

 Summary: Support multiple auto-generated docs formats
 Key: KAFKA-7202
 URL: https://issues.apache.org/jira/browse/KAFKA-7202
 Project: Kafka
  Issue Type: New Feature
Reporter: Joel Hamill


Currently the configuration parameters for Confluent/Kafka are autogenerated as 
HTML (and hosted at [https://kafka.apache.org/documentation/#configuration]). 
This request is to expand this to support other formats (e.g. RST) so that they 
can be easily leveraged by other authorign language formats.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-07-24 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554817#comment-16554817
 ] 

Guozhang Wang commented on KAFKA-6520:
--

Just want to re-emphasize that, it is not only a Streams feature request: if 
you start a Java consumer today talking to a broker, and the broker is not 
available, that consumer client would not throw an exception either. This was 
by-design of the consumer at that time:

{code}
When KafkaConsumer.poll is called, in which updateAssignmentMetadataIfNeeded is 
called and coordinator.poll() is called. If the broker(s) are not available 
then the last call will always return false after timeout, and hence 
KafkaConsumer.poll will return empty set without any indicator that brokers are 
disconnect. In fact, if the brokers are unavailable at the very beginning when 
streams application starts, we will hit the same issue as the instance will 
also be in the RUNNING state since the onPartitionRevoked would not be called 
ever.
...
On the other hand, it is by-design of the consumer to not let users worry about 
the connectivity of the broker, or to handle any disconnect issues. Here is my 
proposed options:
{code}

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Milind Jain
>Priority: Major
>  Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7044) kafka-consumer-groups.sh NullPointerException describing round robin or sticky assignors

2018-07-24 Thread ErikPearson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554905#comment-16554905
 ] 

ErikPearson commented on KAFKA-7044:


I tried increasing the Duration explicitly in getConsumer.endOffsets() but I 
get the same error.

I narrowed down where the request loses the partition.  It looks like 
Fetcher.groupListOffsetRequests() is where the topicPartition is being dropped. 
 When the function is called the second time with a timestampsToSearch Map of 
16, the sum of topicPartitions in the returned in the Map of Maps is 15.

It looks like Fetcher.groupListOffsetRequests() is called for each Client ID in 
the consumer group.  On the first invocation there is no metadata and an empty 
map is returned, which eventually causes a metadata refresh in 
Fetcher.sendListOffsetsRequests()
{quote}Map> timestampsToSearchByNode = 
groupListOffsetRequests(timestampsToSearch);

if (timestampsToSearchByNode.isEmpty())
    return RequestFuture.failure(new StaleMetadataException());
{quote}
On the second invocation of Fetcher.groupListsOffsetRequests(), the dropped 
partition is the last partition in the for loop.  It's also the first reference 
to a specific topic, whose remaining partitions are being handled by a 
yet-to-processed Client ID.  The topic wasn't refreshed from the first 
invocation since the first Client ID didn't process any partitions for this 
topic.  Calling:
{quote}PartitionInfo info = metadata.fetch().partition(tp);
{quote}
returns null.  There's a call to add the topic to the metadata and request a 
metadata refresh, but the topicPartition is never added to the result.  The 
loop ends and the result is returned with 1 less partition in the Map of Maps.

I'm not too familiar with the code, but I wanted to see if a metadata refresh 
would fix it since the topic was added for metadata refresh.  I added a second 
check after the .isEmpty check to verify the number of offsets to search are 
the same.  Please take this will many grains of salt; there probably are better 
ways to fix this issue:
{quote}// Hacking around to avoid null exception. Assume if the 
timestampsToSearchByNode
// does NOT have the same number of offsets as timestampsToSearch then the 
metadata is stale.
int sumTopicPartitionsByNode = 0;
for (Map nodeMap : timestampsToSearchByNode.values()) {
    sumTopicPartitionsByNode += nodeMap.size();
}
if (sumTopicPartitionsByNode != timestampsToSearch.size()) {
    log.warn("Expected offsets: " + timestampsToSearch.size() + " Offsets 
across nodes: " + sumTopicPartitionsByNode);
    return RequestFuture.failure(new StaleMetadataException());
}{quote}
Now the kafka-consumer-group command works.  It also shows it hits that 
log.warn() at least on another Client ID in my problematic group as well.

 

 

 

> kafka-consumer-groups.sh NullPointerException describing round robin or 
> sticky assignors
> 
>
> Key: KAFKA-7044
> URL: https://issues.apache.org/jira/browse/KAFKA-7044
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0
> Environment: CentOS 7.4, Oracle JDK 1.8
>Reporter: Jeff Field
>Assignee: Vahid Hashemian
>Priority: Minor
>
> We've recently moved to using the round robin assignor for one of our 
> consumer groups, and started testing the sticky assignor. In both cases, 
> using Kafka 1.1.0 we get a null pointer exception *unless* the group being 
> described is rebalancing:
> {code:java}
> kafka-consumer-groups --bootstrap-server fqdn:9092 --describe --group 
> groupname-for-consumer
> Error: Executing consumer group command failed due to null
> [2018-06-12 01:32:34,179] DEBUG Exception in consumer group command 
> (kafka.admin.ConsumerGroupCommand$)
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:612)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:610)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:296)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:610)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describePartitions(ConsumerGroupCommand.scala:328)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupServic

[jira] [Commented] (KAFKA-7044) kafka-consumer-groups.sh NullPointerException describing round robin or sticky assignors

2018-07-24 Thread ErikPearson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554909#comment-16554909
 ] 

ErikPearson commented on KAFKA-7044:


Seems I needed to escape those leading braces in the for/if.  Sorry.

> kafka-consumer-groups.sh NullPointerException describing round robin or 
> sticky assignors
> 
>
> Key: KAFKA-7044
> URL: https://issues.apache.org/jira/browse/KAFKA-7044
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0
> Environment: CentOS 7.4, Oracle JDK 1.8
>Reporter: Jeff Field
>Assignee: Vahid Hashemian
>Priority: Minor
>
> We've recently moved to using the round robin assignor for one of our 
> consumer groups, and started testing the sticky assignor. In both cases, 
> using Kafka 1.1.0 we get a null pointer exception *unless* the group being 
> described is rebalancing:
> {code:java}
> kafka-consumer-groups --bootstrap-server fqdn:9092 --describe --group 
> groupname-for-consumer
> Error: Executing consumer group command failed due to null
> [2018-06-12 01:32:34,179] DEBUG Exception in consumer group command 
> (kafka.admin.ConsumerGroupCommand$)
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:612)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:610)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:296)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:610)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describePartitions(ConsumerGroupCommand.scala:328)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.collectConsumerAssignment(ConsumerGroupCommand.scala:308)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectConsumerAssignment(ConsumerGroupCommand.scala:544)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:571)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:565)
> at scala.collection.immutable.List.flatMap(List.scala:338)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:565)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:558)
> at scala.Option.map(Option.scala:146)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:558)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeGroup(ConsumerGroupCommand.scala:271)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:544)
> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:77)
> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> [2018-06-12 01:32:34,255] DEBUG Removed sensor with name connections-closed: 
> (org.apache.kafka.common.metrics.Metrics){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6793) Unnecessary warning log message

2018-07-24 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554980#comment-16554980
 ] 

Vahid Hashemian commented on KAFKA-6793:


This warning seems to have started showing up after [this 
commit|https://github.com/apache/kafka/commit/234ec8a8af76bfb7874dd99714a65089d6048953.].
 cc [~mjsax] 

> Unnecessary warning log message 
> 
>
> Key: KAFKA-6793
> URL: https://issues.apache.org/jira/browse/KAFKA-6793
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Anna O
>Priority: Minor
>
> When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log 
> started to appear:
> level: WARN
> logger: org.apache.kafka.clients.consumer.ConsumerConfig
> message: The configuration 'admin.retries' was supplied but isn't a known 
> config.
> The config is not explicitly supplied to the streams.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6793) Unnecessary warning log message

2018-07-24 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554980#comment-16554980
 ] 

Vahid Hashemian edited comment on KAFKA-6793 at 7/25/18 1:09 AM:
-

This warning seems to have started showing up after [this 
commit|https://github.com/apache/kafka/commit/234ec8a8af76bfb7874dd99714a65089d6048953].
 cc [~mjsax] 


was (Author: vahid):
This warning seems to have started showing up after [this 
commit|https://github.com/apache/kafka/commit/234ec8a8af76bfb7874dd99714a65089d6048953.].
 cc [~mjsax] 

> Unnecessary warning log message 
> 
>
> Key: KAFKA-6793
> URL: https://issues.apache.org/jira/browse/KAFKA-6793
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Anna O
>Priority: Minor
>
> When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log 
> started to appear:
> level: WARN
> logger: org.apache.kafka.clients.consumer.ConsumerConfig
> message: The configuration 'admin.retries' was supplied but isn't a known 
> config.
> The config is not explicitly supplied to the streams.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6793) Unnecessary warning log message

2018-07-24 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555038#comment-16555038
 ] 

Matthias J. Sax commented on KAFKA-6793:


[~vahid] you are right and it is known – it's by design (I missed when this 
Jira was created).

I don't think we can fix it. Any ideas?

> Unnecessary warning log message 
> 
>
> Key: KAFKA-6793
> URL: https://issues.apache.org/jira/browse/KAFKA-6793
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Anna O
>Priority: Minor
>
> When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log 
> started to appear:
> level: WARN
> logger: org.apache.kafka.clients.consumer.ConsumerConfig
> message: The configuration 'admin.retries' was supplied but isn't a known 
> config.
> The config is not explicitly supplied to the streams.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555045#comment-16555045
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

guozhangwang opened a new pull request #5421: KAFKA-7192: Wipe out if EOS is 
turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5421
 
 
   1. As titled and as described in comments.
   2. Modified unit test slightly to insert for new keys in committed data to 
expose this issue.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Jon Bates
>Priority: Critical
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-24 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555049#comment-16555049
 ] 

Guozhang Wang commented on KAFKA-7192:
--

Hello [~jonmbates] Thanks for reporting this issue. I've successfully 
reproduced this issue and found the root cause.

I've submitted the PR and added a unit test to expose this issue, and confirmed 
that without the PR that unit test will fail.

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Jon Bates
>Priority: Critical
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7024) Rocksdb state directory should be created before opening the DB

2018-07-24 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555132#comment-16555132
 ] 

Guozhang Wang commented on KAFKA-7024:
--

[~abhishek.agarwal] I've added you to the contributor list, and please feel 
free to follow the guidance to provide a PR: 

https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes


And sorry for being late on your comment.

> Rocksdb state directory should be created before opening the DB
> ---
>
> Key: KAFKA-7024
> URL: https://issues.apache.org/jira/browse/KAFKA-7024
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Abhishek Agarwal
>Priority: Minor
>  Labels: user-experience
>
> After enabling rocksDB logging, We continually see these errors in kafka 
> stream logs, everytime a new window segment is created
> ```
> Error when reading  
> ```
> While its not a problem in itself, since rocksDB internally will create the 
> directory but It will do so only after logging the above error. It would 
> avoid unnecessary logging if the segment directory can be created in advance. 
> Right now, only the parent directories are created for a rocksDB segment. 
> Logging is more prominent when there are many partitions and segment size is 
> smaller (minute or two). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7144) Kafka Streams doesn't properly balance partition assignment

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555137#comment-16555137
 ] 

ASF GitHub Bot commented on KAFKA-7144:
---

guozhangwang closed pull request #5390: KAFKA-7144: Fix task assignment to be 
even
URL: https://github.com/apache/kafka/pull/5390
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 5b54d08c032..8767d0f6bea 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -270,7 +270,7 @@ boolean hasNewPair(final TaskId task1,
 if (!active && !pairs.contains(pair(task1, taskId))) {
 return true;
 }
-if (!pairs.contains(pair(task1, taskId)) && task1.topicGroupId 
!= taskId.topicGroupId) {
+if (!pairs.contains(pair(task1, taskId))) {
 return true;
 }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index ed22e3c30de..d431dbeae27 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -151,6 +151,25 @@ public void shouldAssignBasedOnCapacity() {
 assertThat(clients.get(p2).activeTasks().size(), equalTo(2));
 }
 
+@Test
+public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() {
+
+createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, 
task03,
+task04, task05, 
task10);
+
+createClient(p2, 1);
+
+final StickyTaskAssignor taskAssignor = createTaskAssignor(task10, 
task00, task01, task02, task03, task04, task05);
+
+final Set expectedClientITasks = new 
HashSet<>(Arrays.asList(task00, task01, task10, task05));
+final Set expectedClientIITasks = new 
HashSet<>(Arrays.asList(task02, task03, task04));
+
+taskAssignor.assign(0);
+
+assertThat(clients.get(p1).activeTasks(), 
equalTo(expectedClientITasks));
+assertThat(clients.get(p2).activeTasks(), 
equalTo(expectedClientIITasks));
+}
+
 @Test
 public void shouldKeepActiveTaskStickynessWhenMoreClientThanActiveTasks() {
 final int p5 = 5;


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams doesn't properly balance partition assignment
> ---
>
> Key: KAFKA-7144
> URL: https://issues.apache.org/jira/browse/KAFKA-7144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: James Cheng
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.1.0
>
> Attachments: OneThenTwelve.java
>
>
> Kafka Streams doesn't always spread the tasks across all available 
> instances/threads
> I have a topology which consumes a single partition topic and goes .through() 
> a 12 partition topic. The makes 13 partitions.
>  
> I then started 2 instances of the application. I would have expected the 13 
> partitions to be split across the 2 instances roughly evenly (7 partitions on 
> one, 6 partitions on the other).
> Instead, one instance gets 12 partitions, and the other instance gets 1 
> partition.
>  
> Repro case attached. I ran it a couple times, and it was fairly repeatable.
> Setup for the repro:
> {code:java}
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic one 
> --partitions 1 --replication-factor 1 
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic twelve 
> --partitions 12 --replication-factor 1
> $ echo foo | kafkacat -P -b 127.0.0.1 -t one
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7144) Kafka Streams doesn't properly balance partition assignment

2018-07-24 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7144.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

> Kafka Streams doesn't properly balance partition assignment
> ---
>
> Key: KAFKA-7144
> URL: https://issues.apache.org/jira/browse/KAFKA-7144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: James Cheng
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.1.0
>
> Attachments: OneThenTwelve.java
>
>
> Kafka Streams doesn't always spread the tasks across all available 
> instances/threads
> I have a topology which consumes a single partition topic and goes .through() 
> a 12 partition topic. The makes 13 partitions.
>  
> I then started 2 instances of the application. I would have expected the 13 
> partitions to be split across the 2 instances roughly evenly (7 partitions on 
> one, 6 partitions on the other).
> Instead, one instance gets 12 partitions, and the other instance gets 1 
> partition.
>  
> Repro case attached. I ran it a couple times, and it was fairly repeatable.
> Setup for the repro:
> {code:java}
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic one 
> --partitions 1 --replication-factor 1 
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic twelve 
> --partitions 12 --replication-factor 1
> $ echo foo | kafkacat -P -b 127.0.0.1 -t one
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7203) Improve Streams StickyTaskAssingor

2018-07-24 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7203:


 Summary: Improve Streams StickyTaskAssingor
 Key: KAFKA-7203
 URL: https://issues.apache.org/jira/browse/KAFKA-7203
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


This is a inspired discussion while trying to fix KAFKA-7144.

Currently we are not striking a very good trade-off sweet point between 
stickiness and workload balance: we are honoring the former more than the 
latter. One idea to improve on this is the following:

{code}
I'd like to propose a slightly different approach to fix 7114 while making 
no-worse tradeoffs between stickiness and sub-topology balance. The key idea is 
to try to adjust the assignment to gets the distribution as closer as to the 
sub-topologies' num.tasks distribution.

Here is a detailed workflow:

1. at the beginning, we first calculate for each client C, how many tasks 
should it be assigned ideally, as num.total_tasks / num.total_capacity * 
C_capacity rounded down, call it C_a. Note that since we round down this 
number, the summing C_a across all C would be <= num.total_tasks, but this does 
not matter.

2. and then for each client C, based on its num. previous assigned tasks C_p, 
we calculate how many tasks it should take over, or give up as C_a - C_p (if it 
is positive, it should take over some, otherwise it should give up some).

Note that because of the round down, when we calculate the C_a - C_p for each 
client, we need to make sure that the total number of give ups and total number 
of take overs should be equal, some ad-hoc heuristics can be used.

3. then we calculate the tasks distribution across the sub-topologies as a 
whole. For example, if we have three sub-topologies, st0 and st1, and st0 has 4 
total tasks, st1 has 4 total tasks, and st2 has 8 total tasks, then the 
distribution between st0, st1 and st2 should be 1:1:2. Let's call it the global 
distribution, and note that currently since num.tasks per sub-topology never 
change, this distribution should NEVER change.

4. then for each client that should give up some, we decides which tasks it 
should give up so that the remaining tasks distribution is proportional to the 
above global distribution.

For example, if a client previously own 4 tasks of st0, no tasks of st1, and 2 
tasks of st2, and now it needs to give up 3 tasks, I should then give up 2 of 
st0 and 1 of st1, so that the remaining distribution is closer to 1:1:2.

5. now we've collected a list of given-up tasks plus the ones that does not 
have any prev active assignment (normally operations it should not happen since 
all tasks should have been created since day one), we now migrate them to those 
who needs to take over some, similarly proportional to the global distribution.

For example if a client previously own 1 task of st0, and nothing of st1 and 
st2, and now it needs to take over 3 tasks, we would try to give it 1 task of 
st1 and 2 tasks of st2, so that the resulted distribution becomes 1:1:2. And we 
ONLY consider prev-standby tasks when we decide which one of st1 / st2 should 
we get for that client.

Now, consider the following scenarios:

a) this is a clean start and there is no prev-assignment at all, step 4 would 
be a no-op; the result should still be fine.

b) a client leaves the group, no client needs to give up and all clients may 
need to take over some, so step 4 is no-op, and the cumulated step 5 only 
contains the tasks of the left client.

c) a new client joins the group, all clients need to give up some, and only the 
new client need to take over all the given-up ones. Hence step 5 is 
straight-forward.
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7131) Update release script to generate announcement email text

2018-07-24 Thread bibin sebastian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555168#comment-16555168
 ] 

bibin sebastian commented on KAFKA-7131:


can i pick this up? i can work on this with some guidance/help on the required 
changes to the script

> Update release script to generate announcement email text
> -
>
> Key: KAFKA-7131
> URL: https://issues.apache.org/jira/browse/KAFKA-7131
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: newbie
>
> When a release is finalized, we send out an email to announce the release. 
> Atm, we have a template in the wiki 
> ([https://cwiki.apache.org/confluence/display/KAFKA/Release+Process|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]).
>  However, the template needs some manual changes to fill in the release 
> number, number of contributors, etc.
> Some parts could be automated – the corresponding commands are document in 
> the wiki already.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)