[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-13 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684871#comment-16684871
 ] 

Sebastian Puzoń commented on KAFKA-7531:


I kept running new version of stream application that uses 5min time windows, 
after 3 days I got NullPointerException at on broker node:

 

 
{code:java}
[2018-11-13 05:41:41,927] INFO [TransactionCoordinator id=4] Aborting sending 
of transaction markers after appended COMMIT to transaction log and returning 
INVALID_TXN_STATE error to client for elog_server_inst_visits_agg-0_11's 
EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2018-11-13 05:41:49,064] ERROR [KafkaApi-4] Error when handling request 
{transactional_id=elog_server_inst_visits_agg-0_2,producer_id=3000,producer_epoch=11,transaction_result=true}
 (kafka.server.KafkaApis)
java.lang.NullPointerException
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
    at scala.util.Either$RightProjection.flatMap(Either.scala:702)
    at 
kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
    at 
kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
    at 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
    at 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
    at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
    at 
kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
    at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
    at 
kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
    at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
    at 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
    at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
    at 
kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
    at 
kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
    at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
    at java.lang.Thread.run(Thread.java:745)
    

{code}
 
{code:java}
[2018-11-13 05:42:21,739] ERROR [KafkaApi-4] Error when handling request 
{transactional_id=elog_server_inst_visits_agg-0_15,producer_id=3003,producer_epoch=13,transaction_result=true}
 (kafka.server.KafkaApis)
java.lang.NullPointerException
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:398)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
    at scala.util.Either$RightProjection.flatMap(Either.scala:702)
    at 
kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
    at 
kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
    at 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
    at 
kafka.coordinator.transaction.Tra

[jira] [Comment Edited] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-13 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684871#comment-16684871
 ] 

Sebastian Puzoń edited comment on KAFKA-7531 at 11/13/18 8:32 AM:
--

I kept running new version of stream application that uses 5min time windows, 
after 3 days I got NullPointerException on broker node:

 

 
{code:java}
[2018-11-13 05:41:41,927] INFO [TransactionCoordinator id=4] Aborting sending 
of transaction markers after appended COMMIT to transaction log and returning 
INVALID_TXN_STATE error to client for elog_server_inst_visits_agg-0_11's 
EndTransaction request (kafka.coordinator.transaction.TransactionCoordinator)
[2018-11-13 05:41:49,064] ERROR [KafkaApi-4] Error when handling request 
{transactional_id=elog_server_inst_visits_agg-0_2,producer_id=3000,producer_epoch=11,transaction_result=true}
 (kafka.server.KafkaApis)
java.lang.NullPointerException
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
    at scala.util.Either$RightProjection.flatMap(Either.scala:702)
    at 
kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
    at 
kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
    at 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
    at 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
    at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
    at 
kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
    at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
    at 
kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
    at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
    at 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
    at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
    at 
kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
    at 
kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
    at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
    at java.lang.Thread.run(Thread.java:745)
    

{code}
 
{code:java}
[2018-11-13 05:42:21,739] ERROR [KafkaApi-4] Error when handling request 
{transactional_id=elog_server_inst_visits_agg-0_15,producer_id=3003,producer_epoch=13,transaction_result=true}
 (kafka.server.KafkaApis)
java.lang.NullPointerException
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:398)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
    at scala.util.Either$RightProjection.flatMap(Either.scala:702)
    at 
kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
    at 
kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
    at 
kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:61

[jira] [Commented] (KAFKA-7446) Better error message to explain the upper limit of TimeWindow

2018-11-13 Thread Srinivas Reddy (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685102#comment-16685102
 ] 

Srinivas Reddy commented on KAFKA-7446:
---

Hi [~jlaskowski], Are you working on this? Should I pick if not? 

> Better error message to explain the upper limit of TimeWindow
> -
>
> Key: KAFKA-7446
> URL: https://issues.apache.org/jira/browse/KAFKA-7446
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>  Labels: newbie++
>
> The following code throws a {{IllegalArgumentException}}.
> {code:java}
> import org.apache.kafka.streams.kstream.TimeWindows
> import scala.concurrent.duration._
> val timeWindow = TimeWindows
> .of(1.minute.toMillis)
> .advanceBy(2.minutes.toMillis)
> {code}
> The exception is as follows and it's not clear why {{6}} is the upper 
> limit (not to mention that {{AdvanceMs}} with the uppercase {{A}} did also 
> confuse me).
> {code:java}
> java.lang.IllegalArgumentException: AdvanceMs must lie within interval (0, 
> 6].
> at 
> org.apache.kafka.streams.kstream.TimeWindows.advanceBy(TimeWindows.java:100)
> ... 44 elided{code}
> I think that the message should be more developer-friendly and explain the 
> boundaries, perhaps with an example (and a link to docs)?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7606) Remove deprecated --zookeeper option from StreamsResetter

2018-11-13 Thread Srinivas Reddy (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Srinivas Reddy reassigned KAFKA-7606:
-

Assignee: Srinivas Reddy

> Remove deprecated --zookeeper option from StreamsResetter
> -
>
> Key: KAFKA-7606
> URL: https://issues.apache.org/jira/browse/KAFKA-7606
> Project: Kafka
>  Issue Type: Task
>  Components: streams, tools
>Reporter: Matthias J. Sax
>Assignee: Srinivas Reddy
>Priority: Trivial
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7446) Better error message to explain the upper limit of TimeWindow

2018-11-13 Thread Srinivas Reddy (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Srinivas Reddy reassigned KAFKA-7446:
-

Assignee: Srinivas Reddy

> Better error message to explain the upper limit of TimeWindow
> -
>
> Key: KAFKA-7446
> URL: https://issues.apache.org/jira/browse/KAFKA-7446
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Jacek Laskowski
>Assignee: Srinivas Reddy
>Priority: Trivial
>  Labels: newbie++
>
> The following code throws a {{IllegalArgumentException}}.
> {code:java}
> import org.apache.kafka.streams.kstream.TimeWindows
> import scala.concurrent.duration._
> val timeWindow = TimeWindows
> .of(1.minute.toMillis)
> .advanceBy(2.minutes.toMillis)
> {code}
> The exception is as follows and it's not clear why {{6}} is the upper 
> limit (not to mention that {{AdvanceMs}} with the uppercase {{A}} did also 
> confuse me).
> {code:java}
> java.lang.IllegalArgumentException: AdvanceMs must lie within interval (0, 
> 6].
> at 
> org.apache.kafka.streams.kstream.TimeWindows.advanceBy(TimeWindows.java:100)
> ... 44 elided{code}
> I think that the message should be more developer-friendly and explain the 
> boundaries, perhaps with an example (and a link to docs)?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7621) Clients can't deal with server IP address change

2018-11-13 Thread Bastian Voigt (JIRA)
Bastian Voigt created KAFKA-7621:


 Summary: Clients can't deal with server IP address change
 Key: KAFKA-7621
 URL: https://issues.apache.org/jira/browse/KAFKA-7621
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.0.0
Reporter: Bastian Voigt


When the server IP address changes (and the corresponding DNS record changes, 
of course), the client cannot deal with this. It keeps saying:

{{Connection to node 0 could not be established. Broker may not be available}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7618) Trogdor - Fix /tasks endpoint parameters

2018-11-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685453#comment-16685453
 ] 

ASF GitHub Bot commented on KAFKA-7618:
---

cmccabe closed pull request #5905: KAFKA-7618: Fix /coordinator/tasks 
parameters to accept long values
URL: https://github.com/apache/kafka/pull/5905
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index cbfbddd7eda..4ffee6d410c 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -94,10 +94,10 @@ public Empty destroyTask(@DefaultValue("") 
@QueryParam("taskId") String taskId)
 @GET
 @Path("/tasks")
 public TasksResponse tasks(@QueryParam("taskId") List taskId,
-@DefaultValue("0") @QueryParam("firstStartMs") int firstStartMs,
-@DefaultValue("0") @QueryParam("lastStartMs") int lastStartMs,
-@DefaultValue("0") @QueryParam("firstEndMs") int firstEndMs,
-@DefaultValue("0") @QueryParam("lastEndMs") int lastEndMs) throws 
Throwable {
+@DefaultValue("0") @QueryParam("firstStartMs") long firstStartMs,
+@DefaultValue("0") @QueryParam("lastStartMs") long lastStartMs,
+@DefaultValue("0") @QueryParam("firstEndMs") long firstEndMs,
+@DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs) throws 
Throwable {
 return coordinator().tasks(new TasksRequest(taskId, firstStartMs, 
lastStartMs, firstEndMs, lastEndMs));
 }
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Trogdor - Fix /tasks endpoint parameters
> 
>
> Key: KAFKA-7618
> URL: https://issues.apache.org/jira/browse/KAFKA-7618
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> A Trogdor Coordinator's `/tasks` endpoint returns the status of all tasks. It 
> supports arguments to filter the tasks, like `firstStartMs`, `lastStartMs`, 
> `firstEndMs`, `lastEndMs`.
> These arguments denote milliseconds since the unix epoch.
> There is a bug currently where the endpoint parses the arguments as integers, 
> whereas they should be long (the current unix millisecond timestamp does not 
> fit into an integer).
> This results in API calls returning a 404
> {code:java}
> curl -v -L -G -d "firstStartMs=1542028764787" localhost:8889/coordinator/tasks
> * Trying ::1...
> * TCP_NODELAY set
> * Connected to localhost (::1) port 8889 (#0)
> > GET /coordinator/tasks?firstStartMs=154202876478 HTTP/1.1
> > Host: localhost:8889
> > User-Agent: curl/7.54.0
> > Accept: */*
> >
> < HTTP/1.1 500 Internal Server Error
> < Date: Mon, 12 Nov 2018 13:28:59 GMT
> < Content-Type: application/json
> < Content-Length: 43
> < Server: Jetty(9.4.12.v20180830)
> <
> * Connection #0 to host localhost left intact{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7514) Trogdor - Support Multiple Threads in ConsumeBenchWorker

2018-11-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685457#comment-16685457
 ] 

ASF GitHub Bot commented on KAFKA-7514:
---

cmccabe closed pull request #5864: KAFKA-7514: Add threads to ConsumeBenchWorker
URL: https://github.com/apache/kafka/pull/5864
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/TROGDOR.md b/TROGDOR.md
index d71455a6e49..168acfb78c1 100644
--- a/TROGDOR.md
+++ b/TROGDOR.md
@@ -38,16 +38,14 @@ Let's confirm that all of the daemons are running:
 Now, we can submit a test job to Trogdor.  Here's an example of a short bash 
script which makes it easier.
 
 > ./tests/bin/trogdor-run-produce-bench.sh
-[2018-04-12 10:32:04,055] DEBUG Sending POST with input 
{"id":"produce_bench_22137","spec":{"class":"org.apache.kafka.trogdor.workload.ProduceBenchSpec","startMs":0,"durationMs":1000,"producerNode":"node0","bootstrapServers":"localhost:9092","targetMessagesPerSec":10,"maxMessages":100,"keyGenerator":{"type":"sequential","size":4,"startOffset":0},"valueGenerator":{"type":"constant","size":512,"value":"AAA="},"totalTopics":10,"activeTopics":5,"topicPrefix":"foo","replicationFactor":1,"classLoader":{},"numPartitions":1}}
 to http://localhost:8889/coordinator/task/create 
(org.apache.kafka.trogdor.coordinator.CoordinatorClient)
-Created task.
-$TASK_ID = produce_bench_20462
+Sent CreateTaskRequest for task produce_bench_21634.$TASK_ID = 
produce_bench_21634
 
 To get the test results, we run --show-tasks:
 
 ./bin/trogdor.sh client --show-tasks localhost:8889
 Got coordinator tasks: {
   "tasks" : {
-"produce_bench_20462" : {
+"produce_bench_21634" : {
   "state" : "DONE",
   "spec" : {
 "class" : "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
@@ -55,8 +53,8 @@ To get the test results, we run --show-tasks:
 "durationMs" : 1000,
 "producerNode" : "node0",
 "bootstrapServers" : "localhost:9092",
-"targetMessagesPerSec" : 10,
-"maxMessages" : 100,
+"targetMessagesPerSec" : 1,
+"maxMessages" : 5,
 "keyGenerator" : {
   "type" : "sequential",
   "size" : 4,
@@ -67,22 +65,28 @@ To get the test results, we run --show-tasks:
   "size" : 512,
   "value" : 
"AAA="
 },
-"totalTopics" : 10,
-"activeTopics" : 5,
-"topicPrefix" : "foo",
-"replicationFactor" : 1,
-"classLoader" : { },
-"numPartitions" : 1
+"activeTopics" : {
+  "foo[1-3]" : {
+"numPartitions" : 10,
+"replicationFactor" : 1
+  }
+},
+"inactiveTopics" : {
+  "foo[4-5]" : {
+"numPartitions" : 10,
+"replicationFactor" : 1
+  }
+}
   },
-  "startedMs" : 1523552769850,
-  "doneMs" : 1523552780878,
+  "startedMs" : 1541435949784,
+  "doneMs" : 1541435955803,
   "cancelled" : false,
   "status" : {
-"totalSent" : 500,
-"averageLatencyMs" : 4.972,
-"p50LatencyMs" : 4,
-"p95L

[jira] [Created] (KAFKA-7622) Add findSessions functionality to ReadOnlySessionStore

2018-11-13 Thread Di Campo (JIRA)
Di Campo created KAFKA-7622:
---

 Summary: Add findSessions functionality to ReadOnlySessionStore
 Key: KAFKA-7622
 URL: https://issues.apache.org/jira/browse/KAFKA-7622
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Di Campo


When creating a session store from the DSL, and you get a 
{{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in a 
{{WindowStore}}, even of the key type is a {{Windowed}}. So you would have 
to iterate through it to find the time-related entries, which should be less 
efficient than querying by time.

So the purpose of this ticket is to be able to query the store with (key, time).

Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
time-bound access) to {{ReadOnlySessionStore.}}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7622) Add findSessions functionality to ReadOnlySessionStore

2018-11-13 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-7622:

Description: 
When creating a session store from the DSL, and you get a 
{{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in a 
{{SessionStore}}, even if the key type is a {{Windowed}}. So you would have 
to iterate through it to find the time-related entries, which should be less 
efficient than querying by time.

So the purpose of this ticket is to be able to query the store with (key, time).

Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
time-bound access) to {{ReadOnlySessionStore.}}
  

  was:
When creating a session store from the DSL, and you get a 
{{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in a 
{{WindowStore}}, even of the key type is a {{Windowed}}. So you would have 
to iterate through it to find the time-related entries, which should be less 
efficient than querying by time.

So the purpose of this ticket is to be able to query the store with (key, time).

Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
time-bound access) to {{ReadOnlySessionStore.}}
 


> Add findSessions functionality to ReadOnlySessionStore
> --
>
> Key: KAFKA-7622
> URL: https://issues.apache.org/jira/browse/KAFKA-7622
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Di Campo
>Priority: Major
>
> When creating a session store from the DSL, and you get a 
> {{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in 
> a {{SessionStore}}, even if the key type is a {{Windowed}}. So you would 
> have to iterate through it to find the time-related entries, which should be 
> less efficient than querying by time.
> So the purpose of this ticket is to be able to query the store with (key, 
> time).
> Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
> time-bound access) to {{ReadOnlySessionStore.}}
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7622) Add findSessions functionality to ReadOnlySessionStore

2018-11-13 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-7622:
---
Labels: needs-kip  (was: needs-k)

> Add findSessions functionality to ReadOnlySessionStore
> --
>
> Key: KAFKA-7622
> URL: https://issues.apache.org/jira/browse/KAFKA-7622
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Di Campo
>Priority: Major
>  Labels: needs-kip
>
> When creating a session store from the DSL, and you get a 
> {{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in 
> a {{SessionStore}}, even if the key type is a {{Windowed}}. So you would 
> have to iterate through it to find the time-related entries, which should be 
> less efficient than querying by time.
> So the purpose of this ticket is to be able to query the store with (key, 
> time).
> Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
> time-bound access) to {{ReadOnlySessionStore.}}
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7622) Add findSessions functionality to ReadOnlySessionStore

2018-11-13 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-7622:
---
Labels: needs-k  (was: )

> Add findSessions functionality to ReadOnlySessionStore
> --
>
> Key: KAFKA-7622
> URL: https://issues.apache.org/jira/browse/KAFKA-7622
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Di Campo
>Priority: Major
>  Labels: needs-kip
>
> When creating a session store from the DSL, and you get a 
> {{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in 
> a {{SessionStore}}, even if the key type is a {{Windowed}}. So you would 
> have to iterate through it to find the time-related entries, which should be 
> less efficient than querying by time.
> So the purpose of this ticket is to be able to query the store with (key, 
> time).
> Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
> time-bound access) to {{ReadOnlySessionStore.}}
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7622) Add findSessions functionality to ReadOnlySessionStore

2018-11-13 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685508#comment-16685508
 ] 

Bill Bejeck commented on KAFKA-7622:


Hi [~xmar], this seems like a useful feature. 

Please note that since the proposal involves changing a public interface a KIP 
([https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)]
 is required first. 

If you'd like to do the KIP yourself and would like some help with it, don't 
hesitate to ask on the dev mailing list, or you can ask here initially as well.

Thanks,

Bill

> Add findSessions functionality to ReadOnlySessionStore
> --
>
> Key: KAFKA-7622
> URL: https://issues.apache.org/jira/browse/KAFKA-7622
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Di Campo
>Priority: Major
>  Labels: needs-kip
>
> When creating a session store from the DSL, and you get a 
> {{ReadOnlySessionStore}}, you can fetch by key, but not by key and time as in 
> a {{SessionStore}}, even if the key type is a {{Windowed}}. So you would 
> have to iterate through it to find the time-related entries, which should be 
> less efficient than querying by time.
> So the purpose of this ticket is to be able to query the store with (key, 
> time).
> Proposal is to add {{SessionStore's findSessions}}-like methods (i.e. 
> time-bound access) to {{ReadOnlySessionStore.}}
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-13 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685591#comment-16685591
 ] 

Jason Gustafson commented on KAFKA-7610:


[~bchen225242] Yeah, it's a good point that static membership wouldn't have the 
same problem. I was also thinking about that.

I'm also in favor of the second option or something like it. I was thinking 
something like this. When a dynamic member first joins the group, they provide 
an empty memberId. When the coordinator receives this request, it will 
immediately generate a new memberId and respond to the JoinGroup with the 
generated memberId an error code. It will also begin a session timer. In the 
common case, the consumer will receive the response and rejoin using the newly 
generated memberId. If the response is lost, however, the coordinator will 
expire the member after the session timeout. Do you think that would work?

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-13 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685591#comment-16685591
 ] 

Jason Gustafson edited comment on KAFKA-7610 at 11/13/18 6:47 PM:
--

[~bchen225242] Yeah, it's a good point that static membership wouldn't have the 
same problem. I was also thinking about that.

I'm also in favor of the second option or something like it. I was thinking 
something like this. When a dynamic member first joins the group, they provide 
an empty memberId. When the coordinator receives this request, it will 
immediately generate a new memberId and respond to the JoinGroup with the 
generated memberId and an error code. It will also begin a session timer. In 
the common case, the consumer will receive the response and rejoin using the 
newly generated memberId. If the response is lost, however, the coordinator 
will expire the member after the session timeout. Do you think that would work?


was (Author: hachikuji):
[~bchen225242] Yeah, it's a good point that static membership wouldn't have the 
same problem. I was also thinking about that.

I'm also in favor of the second option or something like it. I was thinking 
something like this. When a dynamic member first joins the group, they provide 
an empty memberId. When the coordinator receives this request, it will 
immediately generate a new memberId and respond to the JoinGroup with the 
generated memberId an error code. It will also begin a session timer. In the 
common case, the consumer will receive the response and rejoin using the newly 
generated memberId. If the response is lost, however, the coordinator will 
expire the member after the session timeout. Do you think that would work?

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore

2018-11-13 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685639#comment-16685639
 ] 

Guozhang Wang commented on KAFKA-7080:
--

[~vvcephei] I checked all the four related PRs and it seems only one of them 
got into 2.0 branch, which is not directly related to this JIRA. Could you 
confirm?

> WindowStoreBuilder incorrectly initializes CachingWindowStore
> -
>
> Key: KAFKA-7080
> URL: https://issues.apache.org/jira/browse/KAFKA-7080
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0, 1.0.1, 1.1.0, 2.0.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> When caching is enabled on the WindowStoreBuilder, it creates a 
> CachingWindowStore. However, it incorrectly passes storeSupplier.segments() 
> (the number of segments) to the segmentInterval argument.
>  
> The impact is low, since any valid number of segments is also a valid segment 
> size, but it likely results in much smaller segments than intended. For 
> example, the segments may be sized 3ms instead of 60,000ms.
>  
> Ideally the WindowBytesStoreSupplier interface would allow suppliers to 
> advertise their segment size instead of segment count. I plan to create a KIP 
> to propose this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

2018-11-13 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685647#comment-16685647
 ] 

Guozhang Wang commented on KAFKA-4601:
--

cc [~bbejeck]. I think we can resolve this ticket for now -- there are a few 
follow-up discussions we already have while reviewing the PRs but instead of 
leaving it open and dragging too long, I think it's better to just create new 
ones when we work on those follow-up tasks.

> Avoid duplicated repartitioning in KStream DSL
> --
>
> Key: KAFKA-4601
> URL: https://issues.apache.org/jira/browse/KAFKA-4601
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: performance
>
> Consider the following DSL:
> {code}
> Stream source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1");
> Stream mapped = source.map(..);
> KTable counts = mapped
> .groupByKey()
> .count("Counts");
> KStream sink = mapped.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-FILTER-04, KSTREAM-FILTER-07]
>   KSTREAM-FILTER-04:
>   children:   
> [KSTREAM-SINK-03]
>   KSTREAM-SINK-03:
>   topic:  X-Counts-repartition
>   KSTREAM-FILTER-07:
>   children:   
> [KSTREAM-SINK-06]
>   KSTREAM-SINK-06:
>   topic:  
> X-KSTREAM-MAP-01-repartition
> ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: 
> [X-KSTREAM-MAP-01-repartition]
>   children:   
> [KSTREAM-LEFTJOIN-09]
>   KSTREAM-LEFTJOIN-09:
>   states: [Counts]
>   KSTREAM-SOURCE-05:
>   topics: [X-Counts-repartition]
>   children:   
> [KSTREAM-AGGREGATE-02]
>   KSTREAM-AGGREGATE-02:
>   states: [Counts]
> {code}
> I.e. there are two repartition topics, one for the aggregate and one for the 
> join, which not only introduce unnecessary overheads but also mess up the 
> processing ordering (users are expecting each record to go through 
> aggregation first then the join operator). And in order to get the following 
> simpler topology users today need to add a {{through}} operator after {{map}} 
> manually to enforce repartitioning.
> {code}
> Stream source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1");
> Stream repartitioned = source.map(..).through("topic2");
> KTable counts = repartitioned
> .groupByKey()
> .count("Counts");
> KStream sink = repartitioned.leftJoin(counts, ..);
> {code}
> The resulted topology then will look like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-SINK-02]
>   KSTREAM-SINK-02:
>   topic:  topic 2
> ProcessorTopology:
>   KSTREAM-SOURCE-03:
>   topics: [topic 2]
>   children:   
> [KSTREAM-AGGREGATE-04, KSTREAM-LEFTJOIN-05]
>   KSTREAM-AGGREGATE-04:
>   states: [Counts]
>   KSTREAM-LEFTJOIN-05:
>   states: [Counts]
> {code} 
> This kind of optimization should be automatic in Streams, which we can 
> consider doing when extending from one-operator-at-a-time trans

[jira] [Commented] (KAFKA-6812) Async ConsoleProducer exits with 0 status even after data loss

2018-11-13 Thread Kamal Kang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685746#comment-16685746
 ] 

Kamal Kang commented on KAFKA-6812:
---

[~enether] -

[https://cwiki.apache.org/confluence/pages/resumedraft.action?draftId=95655323&draftShareId=20701d88-2553-4fa4-b260-9ba0063ad503.]
 

 

Does this look right to start the discussion and how do I start the discussion?

 

> Async ConsoleProducer exits with 0 status even after data loss
> --
>
> Key: KAFKA-6812
> URL: https://issues.apache.org/jira/browse/KAFKA-6812
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0
>Reporter: Andras Beni
>Assignee: Kamal Kang
>Priority: Minor
>
> When {{ConsoleProducer}} is run without {{--sync}} flag and one of the 
> batches times out, {{ErrorLoggingCallback}} logs the error:
> {code:java}
>  18/04/21 04:23:01 WARN clients.NetworkClient: [Producer 
> clientId=console-producer] Connection to node 10 could not be established. 
> Broker may not be available.
>  18/04/21 04:23:02 ERROR internals.ErrorLoggingCallback: Error when sending 
> message to topic my-topic with key: null, value: 8 bytes with error:
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> my-topic-0: 1530 ms has passed since batch creation plus linger time{code}
>  However, the tool exits with status code 0. 
>  In my opinion the tool should indicate in the exit status that there was 
> data lost. Maybe it's reasonable to exit after the first error.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7606) Remove deprecated --zookeeper option from StreamsResetter

2018-11-13 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685783#comment-16685783
 ] 

Matthias J. Sax commented on KAFKA-7606:


[~mrsrinivas] Thanks for picking this up. Note that we can do this only for 3.0 
but not before, because we cannot remove deprecated APIs in minor releases. So 
far, it's unclear when 3.0 will happen. I expect that the next release after 
2.1. is 2.2.

> Remove deprecated --zookeeper option from StreamsResetter
> -
>
> Key: KAFKA-7606
> URL: https://issues.apache.org/jira/browse/KAFKA-7606
> Project: Kafka
>  Issue Type: Task
>  Components: streams, tools
>Reporter: Matthias J. Sax
>Assignee: Srinivas Reddy
>Priority: Trivial
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-13 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685810#comment-16685810
 ] 

Boyang Chen commented on KAFKA-7610:


[~hachikuji] So we will still accept unknown member joining the group? Because 
if we do that, one edge case I could think of is that a bad consumer keeps 
restarting itself which generates a lot of unknown join request. What if we 
hold a different map called *newMemberIds* to contain those responded member 
ids?  This way we are fencing zombie registration and keep the memory of join 
attempts from new members so that next time when new members join the group we 
would recognize them, and do a "real join" and expand the original member list. 
Saving a single id should be much more memory efficient than saving a member 
metadata. Each time we finished one rebalance, just erase the *newMemberIds* 
map.

We could define a new error code like UNASSIGNED_MEMBER to trigger immediate 
rejoin of new members. Does this extra protection make sense?

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7601) Handle message format downgrades during upgrade of message format version

2018-11-13 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-7601:
---
Description: 
During an upgrade of the message format, there is a short time during which the 
configured message format version is not consistent across all replicas of a 
partition. As long as all brokers are using the same binary version (i.e. all 
have been updated to the latest code), this typically does not cause any 
problems. Followers will take whatever message format is used by the leader. 
However, it is possible for leadership to change several times between brokers 
which support the new format and those which support the old format. This can 
cause the version used in the log to flap between the different formats until 
the upgrade is complete. 

For example, suppose broker 1 has been updated to use v2 and broker 2 is still 
on v1. When broker 1 is the leader, all new messages will be written in the v2 
format. When broker 2 is leader, v1 will be used. If there is any instability 
in the cluster or if completion of the update is delayed, then the log will be 
seen to switch back and forth between v1 and v2. Once the update is completed 
and broker 1 begins using v2, then the message format will stabilize and 
everything will generally be ok.

Downgrades of the message format are problematic, even if they are just 
temporary. There are basically two issues:

1. We use the configured message format version to tell whether down-conversion 
is needed. We assume that the this is always the maximum version used in the 
log, but that assumption fails in the case of a downgrade. In the worst case, 
old clients will see the new format and likely fail.

2. The logic we use for finding the truncation offset during the become 
follower transition does not handle flapping between message formats. When the 
new format is used by the leader, then the epoch cache will be updated 
correctly. When the old format is in use, the epoch cache won't be updated. 
This can lead to an incorrect result to OffsetsForLeaderEpoch queries.

We have actually observed the second problem. The scenario went something like 
this. Broker 1 is the leader of epoch 0 and writes some messages to the log 
using the v2 message format. Broker 2 then becomes the leader for epoch 1 and 
writes some messages in the v2 format. On broker 2, the last entry in the epoch 
cache is epoch 0. No entry is written for epoch 1 because it uses the old 
format. When broker 1 became a follower, it send an OffsetsForLeaderEpoch query 
to broker 2 for epoch 0. Since epoch 0 was the last entry in the cache, the log 
end offset was returned. This resulted in localized log divergence.

There are a few options to fix this problem. From a high level, we can either 
be stricter about preventing downgrades of the message format, or we can add 
additional logic to make downgrades safe. 

(Disallow downgrades): As an example of the first approach, the leader could 
always use the maximum of the last version written to the log and the 
configured message format version. 

(Allow downgrades): If we want to allow downgrades, then it make makes sense to 
invalidate and remove all entries in the epoch cache following the message 
format downgrade. This would basically force us to revert to truncation to the 
high watermark, which is what you'd expect from a downgrade.  We would also 
need a solution for the problem of detecting when down-conversion is needed for 
a fetch request. One option I've been thinking about is enforcing the invariant 
that each segment uses only one message format version. Whenever the message 
format changes, we need to roll a new segment. Then we can simply remember 
which format is in use by each segment to tell whether down-conversion is 
needed for a given fetch request.


  was:
During an upgrade of the message format, there is a short time during which the 
configured message format version is not consistent across all replicas of a 
partition. As long as all brokers are on the same binary version, this 
typically does not cause any problems. Followers will take whatever message 
format is used by the leader. However, it is possible for leadership to change 
several times between brokers which support the new format and those which 
support the old format. This can cause the version used in the log to flap 
between the different formats until the upgrade is complete. 

For example, suppose broker 1 has been updated to use v2 and broker 2 is still 
on v1. When broker 1 is the leader, all new messages will be written in the v2 
format. When broker 2 is leader, v1 will be used. If there is any instability 
in the cluster or if completion of the update is delayed, then the log will be 
seen to switch back and forth between v1 and v2. Once the update is completed 
and broker 1 begins using v2, then the message format will stabilize and 
eve

[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-13 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685884#comment-16685884
 ] 

Jason Gustafson commented on KAFKA-7610:


[~bchen225242] Hmm.. Good question. Perhaps a simple way to limit the memory 
from unknown group members is to not store the subscription until the first 
JoinGroup arrives using the generated memberId. I think the main gap at the 
moment is just having some way to detect these failures before the rebalance 
completes. If we want to protect the overall size of the group, perhaps a 
configuration would be more effective? For example, `group.max.size` or 
something like that. 

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7621) Clients can't deal with server IP address change

2018-11-13 Thread Edoardo Comar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685918#comment-16685918
 ] 

Edoardo Comar commented on KAFKA-7621:
--

Hi, have you tried to set *networkaddress.cache.ttl ?*

see https://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html


> Clients can't deal with server IP address change
> 
>
> Key: KAFKA-7621
> URL: https://issues.apache.org/jira/browse/KAFKA-7621
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Bastian Voigt
>Priority: Major
>
> When the server IP address changes (and the corresponding DNS record changes, 
> of course), the client cannot deal with this. It keeps saying:
> {{Connection to node 0 could not be established. Broker may not be available}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-13 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685961#comment-16685961
 ] 

Boyang Chen commented on KAFKA-7610:


[~hachikuji] I see your point.

> a simple way to limit the memory from unknown group members is to not store 
> the subscription until the first JoinGroup arrives using the generated 
> memberId

The issue is that right now we are fencing real "unknown member id" when the 
given member id in jg request is not within the member list. So the question 
becomes "how do we know this consumer has visited and we already allocate a new 
member id for it". Any idea other than storing this allocated id information 
else where? 

> If we want to protect the overall size of the group, perhaps a configuration 
>would be more effective? For example, `group.max.size` or something like that.

group.max.size is a good approach to limit the memory usage, however I'm just 
wondering whether this would create inconvenience to the user in case they need 
to scale up larger than group.max.size. What would be the expected behavior 
when we reach the member size limit, are we just refusing any new member join 
request then?

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-13 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685961#comment-16685961
 ] 

Boyang Chen edited comment on KAFKA-7610 at 11/14/18 1:04 AM:
--

[~hachikuji] I see your point.

> a simple way to limit the memory from unknown group members is to not store 
> the subscription until the first JoinGroup arrives using the generated 
> memberId

In current coordinator logic, we are fencing real "unknown member id" when the 
given member id in jg request is not within the member list. So the question 
becomes "how do we know this consumer has visited and we already allocate a new 
member id for it". Any idea other than storing this allocated id information 
else where? 

> If we want to protect the overall size of the group, perhaps a configuration 
>would be more effective? For example, `group.max.size` or something like that.

group.max.size is a good approach to limit the memory usage, however I'm just 
wondering whether this would create inconvenience to the user in case they need 
to scale up larger than group.max.size. What would be the expected behavior 
when we reach the member size limit, are we just refusing any new member join 
request then?


was (Author: bchen225242):
[~hachikuji] I see your point.

> a simple way to limit the memory from unknown group members is to not store 
> the subscription until the first JoinGroup arrives using the generated 
> memberId

The issue is that right now we are fencing real "unknown member id" when the 
given member id in jg request is not within the member list. So the question 
becomes "how do we know this consumer has visited and we already allocate a new 
member id for it". Any idea other than storing this allocated id information 
else where? 

> If we want to protect the overall size of the group, perhaps a configuration 
>would be more effective? For example, `group.max.size` or something like that.

group.max.size is a good approach to limit the memory usage, however I'm just 
wondering whether this would create inconvenience to the user in case they need 
to scale up larger than group.max.size. What would be the expected behavior 
when we reach the member size limit, are we just refusing any new member join 
request then?

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

2018-11-13 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-4601.

Resolution: Fixed

Marking this resolved with [https://github.com/apache/kafka/pull/5451.]

As [~guozhang] said we will address follow up work with individual tickets.

> Avoid duplicated repartitioning in KStream DSL
> --
>
> Key: KAFKA-4601
> URL: https://issues.apache.org/jira/browse/KAFKA-4601
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: performance
>
> Consider the following DSL:
> {code}
> Stream source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1");
> Stream mapped = source.map(..);
> KTable counts = mapped
> .groupByKey()
> .count("Counts");
> KStream sink = mapped.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-FILTER-04, KSTREAM-FILTER-07]
>   KSTREAM-FILTER-04:
>   children:   
> [KSTREAM-SINK-03]
>   KSTREAM-SINK-03:
>   topic:  X-Counts-repartition
>   KSTREAM-FILTER-07:
>   children:   
> [KSTREAM-SINK-06]
>   KSTREAM-SINK-06:
>   topic:  
> X-KSTREAM-MAP-01-repartition
> ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: 
> [X-KSTREAM-MAP-01-repartition]
>   children:   
> [KSTREAM-LEFTJOIN-09]
>   KSTREAM-LEFTJOIN-09:
>   states: [Counts]
>   KSTREAM-SOURCE-05:
>   topics: [X-Counts-repartition]
>   children:   
> [KSTREAM-AGGREGATE-02]
>   KSTREAM-AGGREGATE-02:
>   states: [Counts]
> {code}
> I.e. there are two repartition topics, one for the aggregate and one for the 
> join, which not only introduce unnecessary overheads but also mess up the 
> processing ordering (users are expecting each record to go through 
> aggregation first then the join operator). And in order to get the following 
> simpler topology users today need to add a {{through}} operator after {{map}} 
> manually to enforce repartitioning.
> {code}
> Stream source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1");
> Stream repartitioned = source.map(..).through("topic2");
> KTable counts = repartitioned
> .groupByKey()
> .count("Counts");
> KStream sink = repartitioned.leftJoin(counts, ..);
> {code}
> The resulted topology then will look like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-SINK-02]
>   KSTREAM-SINK-02:
>   topic:  topic 2
> ProcessorTopology:
>   KSTREAM-SOURCE-03:
>   topics: [topic 2]
>   children:   
> [KSTREAM-AGGREGATE-04, KSTREAM-LEFTJOIN-05]
>   KSTREAM-AGGREGATE-04:
>   states: [Counts]
>   KSTREAM-LEFTJOIN-05:
>   states: [Counts]
> {code} 
> This kind of optimization should be automatic in Streams, which we can 
> consider doing when extending from one-operator-at-a-time translation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7606) Remove deprecated --zookeeper option from StreamsResetter

2018-11-13 Thread Srinivas Reddy (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686073#comment-16686073
 ] 

Srinivas Reddy commented on KAFKA-7606:
---

Thank you [~mjsax] for information. Yes, I will start working on it once the 
3.0 work started.

> Remove deprecated --zookeeper option from StreamsResetter
> -
>
> Key: KAFKA-7606
> URL: https://issues.apache.org/jira/browse/KAFKA-7606
> Project: Kafka
>  Issue Type: Task
>  Components: streams, tools
>Reporter: Matthias J. Sax
>Assignee: Srinivas Reddy
>Priority: Trivial
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7623) SMT STRUCT to MASK or FILTER

2018-11-13 Thread Chenchu Lakshman kumar (JIRA)
Chenchu Lakshman kumar created KAFKA-7623:
-

 Summary: SMT STRUCT to MASK or FILTER
 Key: KAFKA-7623
 URL: https://issues.apache.org/jira/browse/KAFKA-7623
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect
Reporter: Chenchu Lakshman kumar


{
 "schema": {
 "type": "struct",
 "fields": [{
 "type": "string",
 "optional": false,
 "doc": "This field stores the value of `Message.getJMSMessageID() 
`_.",
 "field": "messageID"
 }, {
 "type": "string",
 "optional": false,
 "doc": "This field stores the type of message that was received. This 
corresponds to the subinterfaces of `Message 
`_. `BytesMessage 
`_ = `bytes`, 
`MapMessage `_ = 
`map`, `ObjectMessage 
`_ = 
`object`, `StreamMessage 
`_ = `stream` 
and `TextMessage 
`_ = `text`. 
The corresponding field will be populated with the values from the respective 
Message subinterface.",
 "field": "messageType"
 }, {
 "type": "int64",
 "optional": false,
 "doc": "Data from the `getJMSTimestamp() 
`_
 method.",
 "field": "timestamp"
 }, {
 "type": "int32",
 "optional": false,
 "doc": "This field stores the value of `Message.getJMSDeliveryMode() 
`_.",
 "field": "deliveryMode"
 }, {
 "type": "string",
 "optional": true,
 "doc": "This field stores the value of `Message.getJMSCorrelationID() 
`_.",
 "field": "correlationID"
 }, {
 "type": "struct",
 "fields": [{
 "type": "string",
 "optional": false,
 "doc": "The type of JMS Destination, and either ``queue`` or ``topic``.",
 "field": "destinationType"
 }, {
 "type": "string",
 "optional": false,
 "doc": "The name of the destination. This will be the value of 
`Queue.getQueueName() 
`_ or 
`Topic.getTopicName() 
`_.",
 "field": "name"
 }],
 "optional": true,
 "name": "io.confluent.connect.jms.Destination",
 "doc": "This schema is used to represent a JMS Destination, and is either 
`queue `_ or `topic 
`_.",
 "field": "replyTo"
 }, {
 "type": "struct",
 "fields": [{
 "type": "string",
 "optional": false,
 "doc": "The type of JMS Destination, and either ``queue`` or ``topic``.",
 "field": "destinationType"
 }, {
 "type": "string",
 "optional": false,
 "doc": "The name of the destination. This will be the value of 
`Queue.getQueueName() 
`_ or 
`Topic.getTopicName() 
`_.",
 "field": "name"
 }],
 "optional": true,
 "name": "io.confluent.connect.jms.Destination",
 "doc": "This schema is used to represent a JMS Destination, and is either 
`queue `_ or `topic 
`_.",
 "field": "destination"
 }, {
 "type": "boolean",
 "optional": false,
 "doc": "This field stores the value of `Message.getJMSRedelivered() 
`_.",
 "field": "redelivered"
 }, {
 "type": "string",
 "optional": true,
 "doc": "This field stores the value of `Message.getJMSType() 
`_.",
 "field": "type"
 }, {
 "type": "int64",
 "optional": false,
 "doc": "This field stores the value of `Message.getJMSExpiration() 
`_.",
 "field": "expiration"
 }, {
 "type": "int32",
 "optional": false,
 "doc": "This field stores the value of `Message.getJMSPriority() 
`_.",
 "field": "priority"
 }, {
 "type": "map",
 "keys": {
 "type": "string",
 "optional": false
 },
 "values": {
 "type": "struct",
 "fields": [{
 "type": "string",
 "optional": false,
 "doc": "The java type of the property on the Message. One of ``boolean``, 
``byte``, ``short``, ``integer``, ``long``, ``float``, ``double``, or 
``string``.",
 "field": "propertyType"
 }, {