[jira] [Comment Edited] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

2016-06-09 Thread Maysam Yabandeh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15319273#comment-15319273
 ] 

Maysam Yabandeh edited comment on KAFKA-3693 at 6/10/16 4:00 AM:
-

*update:* the statement "controller is started AFTER the broker 16 has started 
shutting down" in this comment is incorrect. Refer to this 
[comment|https://issues.apache.org/jira/browse/KAFKA-3693?focusedCommentId=15315055=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15315055]
 for updated shutdown time.

Thanks for the grep hints [~junrao]. According to controller logs, it did not 
label broker 16 as dead:
{code}
$ grep "Newly added brokers" controller.log.2016-05-10.2 
2016-05-10 05:58:30,633 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Newly added brokers: 18, deleted brokers: , all live brokers: 
14,20,21,13,17,22,18,16,19,15
2016-05-10 06:17:48,512 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Newly added brokers: , deleted brokers: 14,21,13,22,18,19, all 
live brokers: 15,16
2016-05-10 06:29:57,981 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Newly added brokers: 14,21,13,22,18,19, deleted brokers: , all 
live brokers: 14,21,13,22,18,16,19,15
{code}

I am still not sure what is the mechanism by which the controller is supposed 
to mark the broker dead but it seems that the controller is started AFTER the 
broker 16 has started shutting down, so if this mechanism depends on a message 
from the broker to the controller it might have not received by the new 
controller:
{code}
$ grep "Controller starting up\|Controller-17-to-broker-16-send-thread" 
controller.log.2016-05-10.2 
2016-05-10 05:50:20,645 INFO controller.KafkaController: [Controller 17]: 
Controller starting up
2016-05-10 05:57:27,440 INFO controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Starting 
2016-05-10 05:57:28,309 INFO controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17 connected to Node(16, 
node16.com, 9092) for sending state change requests
2016-05-10 06:17:01,771 WARN controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17 epoch 269 fails to send 
request ... to broker Node(16, node16.com, 9092). Reconnecting to broker.
2016-05-10 06:17:02,076 WARN controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17's connection to broker 
Node(16, node16.com, 9092) was unsuccessful
...
2016-05-10 06:17:33,043 WARN controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17's connection to broker 
Node(16, node16.com, 9092) was unsuccessful
2016-05-10 06:17:33,344 INFO controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17 connected to Node(16, 
node16.com, 9092) for sending state change requests
{code}
Regarding the slow shutdown, that is also a question for us but investigating 
it had less priority compared to losing hw checkpoints.


was (Author: maysamyabandeh):
Thanks for the grep hints [~junrao]. According to controller logs, it did not 
label broker 16 as dead:
{code}
$ grep "Newly added brokers" controller.log.2016-05-10.2 
2016-05-10 05:58:30,633 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Newly added brokers: 18, deleted brokers: , all live brokers: 
14,20,21,13,17,22,18,16,19,15
2016-05-10 06:17:48,512 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Newly added brokers: , deleted brokers: 14,21,13,22,18,19, all 
live brokers: 15,16
2016-05-10 06:29:57,981 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Newly added brokers: 14,21,13,22,18,19, deleted brokers: , all 
live brokers: 14,21,13,22,18,16,19,15
{code}

I am still not sure what is the mechanism by which the controller is supposed 
to mark the broker dead but it seems that the controller is started AFTER the 
broker 16 has started shutting down, so if this mechanism depends on a message 
from the broker to the controller it might have not received by the new 
controller:
{code}
$ grep "Controller starting up\|Controller-17-to-broker-16-send-thread" 
controller.log.2016-05-10.2 
2016-05-10 05:50:20,645 INFO controller.KafkaController: [Controller 17]: 
Controller starting up
2016-05-10 05:57:27,440 INFO controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Starting 
2016-05-10 05:57:28,309 INFO controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17 connected to Node(16, 
node16.com, 9092) for sending state change requests
2016-05-10 06:17:01,771 WARN controller.RequestSendThread: 

[jira] [Comment Edited] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

2016-06-09 Thread Maysam Yabandeh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15316991#comment-15316991
 ] 

Maysam Yabandeh edited comment on KAFKA-3693 at 6/10/16 3:57 AM:
-

*update:* broker 16 started shutting down at 06:15:16,582. Refer to this 
[comment|https://issues.apache.org/jira/browse/KAFKA-3693?focusedCommentId=15315055=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15315055]
 for details. 

My apology. That was a copy-paste error. Broker 16 started shutting down at 
05:40:46,845
{code}
2016-05-10 05:40:46,845 INFO server.ReplicaFetcherThread: 
[ReplicaFetcherThread-0-15], Shutting down
{code}
until it completely shutdown at 06:17:27,160
{code}
2016-05-10 06:17:27,160 INFO server.KafkaServer: [Kafka Server 16], shut down 
completed
{code}
So it seems that the controller is sending the LeaderAndIsr request but the 
message is not delivered since broker 16 is being shut down. Instead the 
controller retries until the broker is fully restarted and then the message is 
delivered.

Did this correction resolve the confusion?


was (Author: maysamyabandeh):
My apology. That was a copy-paste error. Broker 16 started shutting down at 
05:40:46,845
{code}
2016-05-10 05:40:46,845 INFO server.ReplicaFetcherThread: 
[ReplicaFetcherThread-0-15], Shutting down
{code}
until it completely shutdown at 06:17:27,160
{code}
2016-05-10 06:17:27,160 INFO server.KafkaServer: [Kafka Server 16], shut down 
completed
{code}
So it seems that the controller is sending the LeaderAndIsr request but the 
message is not delivered since broker 16 is being shut down. Instead the 
controller retries until the broker is fully restarted and then the message is 
delivered.

Did this correction resolve the confusion?

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> ---
>
> Key: KAFKA-3693
> URL: https://issues.apache.org/jira/browse/KAFKA-3693
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
> var partition = allPartitions.get((topic, partitionId))
> if (partition == null) {
>   allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
> val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>   val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>   val offsetMap = checkpoint.read
>   if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
> info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not 

[jira] [Comment Edited] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

2016-06-09 Thread Maysam Yabandeh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15315055#comment-15315055
 ] 

Maysam Yabandeh edited comment on KAFKA-3693 at 6/10/16 3:54 AM:
-

*update:* the start and end of broker 16's shutdown that was put here was not 
correct, which had caused a lot of confusion; corrected in this update to avoid 
confusion for future leaders

Thanks [~junrao]. My understanding is that the LeaderAndIsr message with 
partial list has been failed to be delivered to broker 16 and it was finally 
delivered when broker 16 was back online again.

Let me paste more of relevant logs perhaps it would make the buggy scenario 
more clear.

At 06:17:02,012 the controller attempts to send the LeaderAndIsr message to 
broker 16

{code}
2016-05-10 06:17:02,012 TRACE change.logger: Controller 17 epoch 269 sending 
become-leader LeaderAndIsr request 
(Leader:16,ISR:16,LeaderEpoch:79,ControllerEpoch:269) to broker 16 for 
partition [topic.xyz,134]
{code}

This attempt however fails 
{code}
2016-05-10 06:17:02,076 WARN controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17's connection to broker 
Node(16, node16.com, 9876) was unsuccessful
{code}
since broker 16 has been shutting down since 06:15:16,582 until 06:17:27,160
{code}
2016-05-10 06:15:16,582 INFO server.KafkaServer: [Kafka Server 16], shutting 
down
2016-05-10 06:15:16,583 INFO server.KafkaServer: [Kafka Server 16], Starting 
controlled shutdown
2016-05-10 06:17:27,160 INFO server.KafkaServer: [Kafka Server 16], shut down 
completed
{code}

The controller repeats the attempt until it finally connects to broker 16 at 
06:17:33,344
{code}2016-05-10 06:17:33,344 INFO controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17 connected to Node(16, 
node16.com, 9876) for sending state change requests
{code}
when the broker 16 is restarted. 

Here is the first 5 state changes the broker 16 performs right after the 
restart:
{code}
2016-05-10 06:17:33,410 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:21,ISR:21,LeaderEpoch:58,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:20,21)
 for partition [topic.qwe,3] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2398
2016-05-10 06:17:33,438 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:14,ISR:14,LeaderEpoch:110,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:20,14)
 for partition [topic.asd,88] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2399
2016-05-10 06:17:33,440 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:15,ISR:15,LeaderEpoch:18,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:20,15)
 for partition [topic.zxc,8] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2400
2016-05-10 06:17:33,442 TRACE change.logger: Broker 16 cached leader info 
(LeaderAndIsrInfo:(Leader:21,ISR:21,LeaderEpoch:61,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:21,20)
 for partition [topic.iop,4] in response to UpdateMetadata request sent by 
controller 17 epoch 269 with correlation id 2401
2016-05-10 06:17:33,447 TRACE change.logger: Broker 16 received LeaderAndIsr 
request 
(LeaderAndIsrInfo:(Leader:16,ISR:16,LeaderEpoch:79,ControllerEpoch:269),ReplicationFactor:2),AllReplicas:16,20)
 correlation id 2402 from controller 17 epoch 269 for partition [topic.xyz,134]
{code}
The last of which is the LeaderAndIsr with incomplete list of partitions that 
we were talking about. I do not see any other track in the controller log 
indicating of any similar message sent to broker 16, which tells me the 
LeaderAndIsr message received at 06:17:33,447 must be the same one that was 
formed at 06:17:02,012 and has been attempted later when broker 16 was back 
online.

Does the above make sense?


was (Author: maysamyabandeh):
Thanks [~junrao]. My understanding is that the LeaderAndIsr message with 
partial list has been failed to be delivered to broker 16 and it was finally 
delivered when broker 16 was back online again.

Let me paste more of relevant logs perhaps it would make the buggy scenario 
more clear.

At 06:17:02,012 the controller attempts to send the LeaderAndIsr message to 
broker 16

{code}
2016-05-10 06:17:02,012 TRACE change.logger: Controller 17 epoch 269 sending 
become-leader LeaderAndIsr request 
(Leader:16,ISR:16,LeaderEpoch:79,ControllerEpoch:269) to broker 16 for 
partition [topic.xyz,134]
{code}

This attempt however fails 
{code}
2016-05-10 06:17:02,076 WARN controller.RequestSendThread: 
[Controller-17-to-broker-16-send-thread], Controller 17's connection to broker 
Node(16, node16.com, 9876) was unsuccessful
{code}
since broker 16 has been shutting down since 05:40:46,845 until 06:17:01,701
{code}

[jira] [Commented] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

2016-06-09 Thread Maysam Yabandeh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323801#comment-15323801
 ] 

Maysam Yabandeh commented on KAFKA-3693:


[~junrao] Yes, you seem to be right.
The controller missed the death of broker 16:
{code}
$ grep "Broker change listener fired" controller.log.2016-05-10.2 
2016-05-10 05:58:30,616 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Broker change listener fired for path /brokers/ids with 
children 21,20,19,17,22,18,15,16,13,14
2016-05-10 06:17:37,577 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Broker change listener fired for path /brokers/ids with 
children 20,17,15,16
2016-05-10 06:29:48,595 INFO 
controller.ReplicaStateMachine$BrokerChangeListener: [BrokerChangeListener on 
Controller 17]: Broker change listener fired for path /brokers/ids with 
children 21,20,19,22,17,18,15,16,13,14
{code}
And it was because broker 16 rejoined zk at 06:17:33,432 before the controller 
gets notified at 06:17:37,577 and hence does not notice that the ephemeral node 
of broker 16 was deleted at some point:
{code}
2016-05-10 05:33:57,907 INFO utils.ZkUtils: Registered broker 16 at path 
/brokers/ids/16 with addresses: PLAINTEXT -> EndPoint(node16.com,9092,PLAINTEXT)
2016-05-10 06:15:16,582 INFO server.KafkaServer: [Kafka Server 16], shutting 
down
2016-05-10 06:17:27,160 INFO server.KafkaServer: [Kafka Server 16], shut down 
completed
2016-05-10 06:17:30,916 INFO server.KafkaServer: starting
2016-05-10 06:17:33,432 INFO utils.ZkUtils: Registered broker 16 at path 
/brokers/ids/16 with addresses: PLAINTEXT -> EndPoint(node16.com,9092,PLAINTEXT)
{code}

So I guess the mystery of why the controller sends incomplete LeaderAndIsr 
requests to brokers is solved. If I may, i would like to emphasis again that 
making the broker code defensive against such cases would offer a stable fix to 
this problem since we do not know whether there are other existing cases that 
would result to similar incomplete LeaderAndIsr messages or perhaps there will 
be new cases caused by future patches. Simple solutions such as having the 
broker counting the number of partitions in the HW checkpoint file and 
rejecting the controller's message if it contains less partitions should still 
be compatible with security-enabled cases that you previously mentioned.

Regarding the shutdown process, i think i was mistaken; the shutdown process 
took only 2m:11s
{code}
2016-05-10 06:15:16,582 INFO server.KafkaServer: [Kafka Server 16], shutting 
down
2016-05-10 06:15:16,583 INFO server.KafkaServer: [Kafka Server 16], Starting 
controlled shutdown
2016-05-10 06:17:27,160 INFO server.KafkaServer: [Kafka Server 16], shut down 
completed
{code}
I misread the fetcher thread shut down as the start of the shut down process:
{code}
2016-05-10 05:40:46,845 INFO server.ReplicaFetcherThread: 
[ReplicaFetcherThread-0-15], Shutting down
{code}
Sorry for the confusion.

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> ---
>
> Key: KAFKA-3693
> URL: https://issues.apache.org/jira/browse/KAFKA-3693
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
> var partition = allPartitions.get((topic, partitionId))
> if (partition == null) {
>   allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> 

[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2016-06-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323726#comment-15323726
 ] 

Randall Hauch commented on KAFKA-3821:
--

[~ewencp], I'm not sure a {{data}} field is that much different than storing 
information in the offsets. I'd also worry that if the data isn't stored in the 
same place as the offsets, that there's a chance for failing after offsets are 
flushed but before data is flushed (or vice versa).

However, I have been thinking of something that may very well be, well, out in 
left field. Maybe connectors need some kind of _history events_ that they can 
produce and easily consume. We originally talked some time ago about possibly 
adding persisted state (e.g., a key-value store), but I think that's going in 
the wrong direction and that instead an _event stream_ is far better and more 
powerful because it gives you an ordered history. Naturally, these history 
events would be persisted to a Kafka topic, but it'd be one that's primarily 
for the connector (or any monitors). The semantics are very specific, though:

* The connector registers an _event handler_ that is called for every event, 
with one method that handles the events and a few other methods that are called 
at various lifecycle state transitions.
* Upon startup, the framework reads the topic from the very beginning, 
replaying all events and forwarding them to the _event handler_. When complete, 
the event handler's _end replay_ method is called to signal to it that all 
events have been replayed.
* The connector can write to it at any time, and all events are synchronously 
written to Kafka (this is thus a lower-volume topic). The connector's consumer 
sees these events after they've been read from Kafka, although much of the time 
these events could probably be ignored (since the connector is writing them), 
but occasionally there might be an event it didn't create and the connector 
wants/needs to respond.

I could see a couple of uses for this:

# Debezium's MySQL connector records all DDL statements (with offsets) it reads 
from the MySQL binlog so that, when the connector starts back up and is given 
an offset it is to start it, it can replay the DDL statements until it reaches 
but does not pass the designated offset to reconstruct an in-memory model of 
the database schemas. Currently we do this with a separate thread, but we've 
had to sidestep Kafka Connect and handle this ourselves because we have to have 
full control over producing, flushing, and consuming. And our history is 
flushed before the offsets for normal records might be flushed, so we know that 
we might have extra events in our history (and have to ignore them) should 
something go wrong and DDL get recorded without the affiliated offsets.
# Record occasional state changes, such as "starting a snapshot", "completing a 
snapshot", etc.
# Enable a management tool to submit events (rather "requests") to be written 
to the topic. The connector would receive them and can react accordingly, all 
without having to stop the connector, change the configuration, restart the 
connector, wait for something to complete, and change the configuration back. 
An example might be to perform an ad hoc snapshot of 3 specific tables, or 
"flush snapshot of current DDL statements to history" (allowing the topic to be 
log compacted).
# Task coordination, where a connector with multiple tasks needs one "leader" 
task to do work while the others wait, and when completed all the tasks can 
continue. (This might be contrived.)

Now, I've not fleshed this out very far and welcome any thoughts or criticisms. 
My connector writing experience is limited: I've written one complicated 
connector and one relatively simple prototype connector, and I'm not sure 
whether this is a general need or something far too specific to my limited 
context.

This isn't too different from offset storage, except that you want to be able 
to replay the history upon startup. Perhaps there's a way of incorporating 
these ideas into offset storage, but I suspect that's not really a good idea.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Ewen Cheslack-Postava
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the 

[jira] [Commented] (KAFKA-2934) Offset storage file configuration in Connect standalone mode is not included in StandaloneConfig

2016-06-09 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323719#comment-15323719
 ] 

Ewen Cheslack-Postava commented on KAFKA-2934:
--

[~davispw] How are you doing that without relying on a bunch of other internal 
APIs? ConnectStandalone always instantiates a FileOffsetBackingStore and 
ConnectDistributed always instantiates a KafkaOffsetBackingStore...

> Offset storage file configuration in Connect standalone mode is not included 
> in StandaloneConfig
> 
>
> Key: KAFKA-2934
> URL: https://issues.apache.org/jira/browse/KAFKA-2934
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.10.0.0
>
>
> The config is coded directly in FileOffsetBackingStore rather than being 
> listed (and validated) in StandaloneConfig. This also means it wouldn't be 
> included if we autogenerated docs from the config classes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references

2016-06-09 Thread Jeff Klukas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323718#comment-15323718
 ] 

Jeff Klukas commented on KAFKA-3801:


Let me give more context for the example. We have an application that produces 
JSON messages to a Kafka topic interleaved with occasional checkpoint messages 
that are of {{Long}} type.

If I want to create a KStream of just the checkpoint messages, I need to filter 
out the JSON messages before deserializing. Here's what it looks like:

{{KStream checkpointStream = builder.stream(Serdes.Long(), 
Serdes.ByteArray(), inputTopicName)}}
{{.filter((key, bytes) -> bytes.length == 
8).mapValues(LongDeserializer::deserialize)}}

I need to use ByteArraySerde when calling {{stream}}, then I do the 
deserialization in a {{mapValues}} invocation after filtering out messages of 
the wrong type.

Another option would be to materialize the stream to a topic after the filter 
and then call {{builder.stream(Serdes.Long(), Serdes.Long(), newTopicName)}}, 
but I'd like to avoid unnecessary materialization.

So in the current scheme, I need to create an instance of {{LongDeserializer}} 
separately so that I can then call its {{deserialize}} method in {{mapValues}}.

This situation probably won't occur frequently, so I understand if it's decided 
not to bother considering this change.

> Provide static serialize() and deserialize() for use as method references
> -
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2934) Offset storage file configuration in Connect standalone mode is not included in StandaloneConfig

2016-06-09 Thread Peter Davis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323697#comment-15323697
 ] 

Peter Davis commented on KAFKA-2934:


After upgrading to 0.10, discovered that this fix causes 
`offset.storage.file.filename` to be a *required* property, even if a 
FileOffsetBackingStore is not used.  We are consuming from JMS (which has no 
need to store offsets as acknowledged messages are removed from the source) so 
we are using a MemoryOffsetBackingStore.

https://github.com/apache/kafka/pull/734 argues there are only two sensible 
choices for a backing store but I think my use case shows that is not true!

Would you be willing to revisit the pull request to make the offset backing 
store class configurable, and make the file optional?

> Offset storage file configuration in Connect standalone mode is not included 
> in StandaloneConfig
> 
>
> Key: KAFKA-2934
> URL: https://issues.apache.org/jira/browse/KAFKA-2934
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.10.0.0
>
>
> The config is coded directly in FileOffsetBackingStore rather than being 
> listed (and validated) in StandaloneConfig. This also means it wouldn't be 
> included if we autogenerated docs from the config classes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3816) Provide more context in Kafka Connect log messages

2016-06-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323684#comment-15323684
 ] 

Randall Hauch edited comment on KAFKA-3816 at 6/10/16 1:11 AM:
---

{quote}
One thing we might want to do in conjunction with this is to allow the 
framework to allocate threads for connectors and set this up. Background 
threads that monitor sources are a particular case that we don't provide any 
help to connector developers for and would mean we might be missing this info 
in those threads.
{quote}

The simple approach is to have the connector code that creates threads set this 
up, but I agree it's fairly complex and will likely be ignored or not done. We 
can provide a factory method for threads, perhaps on the context, but we'd 
probably have to provide multiple overloaded methods to provide the flexibility 
they need. 

Perhaps a slightly easier way is to provide to the contexts a new interface 
called {{LoggingContext}} that has methods that connector and task 
implementations can call:

{code:java}
public interface LoggingContext {
 public void use(String contextName);
}
{code}

The reason I'd suggest an interface is just in case we want to provide 
additional helper methods in the future. For example, with Java 8 it's really 
easy to temporarily perform logic within a custom context: 

{code:java}
loggingContext.withContext("newContextName", ()->{ ... });
{code}

where that lambda might log messages, and after the lambda completes it 
automatically sets the MDC context back to what it was before this method is 
called. (I'm not saying we need this now, but I wouldn't be surprised if we 
want to add a few helper methods in the future.)

I'm also not a fan of hiding too much (e.g., if Kafka Connector provides 
subclasses of {{SourceTask}} that handle some of the threading automatically). 
In Debezium our {{SourceTask}} implementation in our MySQL connector actually 
doesn't create any threads; instead, it creates 1 or more "reader" components 
(e.g., snapshot reader, binlog reader, etc.) based upon what the task needs to 
do, and the task then runs these sequentially. Each reader implementation can 
create their own threads if needed or uses libraries that create threads. And 
while libraries rarely give you the control you need for thread creation, they 
often take callbacks or make other calls to methods that you do provide, and 
its in those methods that you have a chance to set the MDC parameters. This 
would be easy if the connector code can call methods, but much harder if you 
just provide a thread factory that sets it up for those threads (and only those 
threads).

{quote}
For the naming – the scope makes sense but we want a reliable way to extract 
this from the connector without it having to provide extra info if possible. 
Basing this on class name could work. My guess is that 3 parameters should be 
sufficient (and if not, we probably need to do some refactoring to simplify 
that code), but we'll just have to try it to find out.
{quote}

Yeah, I agree. Perhaps a non-abstract method (e.g., {{getConnectorTypeName()}}) 
on the Connector class that defaults to grabbing the class short name. Then, 
subclasses can always override this to provide something shorter and more 
meaningful.


was (Author: rhauch):
{quote}
One thing we might want to do in conjunction with this is to allow the 
framework to allocate threads for connectors and set this up. Background 
threads that monitor sources are a particular case that we don't provide any 
help to connector developers for and would mean we might be missing this info 
in those threads.
{quote}

The simple approach is to have the connector code that creates threads set this 
up, but I agree it's fairly complex and will likely be ignored or not done. We 
can provide a factory method for threads, perhaps on the context, but we'd 
probably have to provide multiple overloaded methods to provide the flexibility 
they need. 

Perhaps a slightly easier way is to provide to the contexts a new interface 
called {{LoggingContext}} that has methods that connector and task 
implementations can call:

{code:java}
public interface LoggingContext {
 public void use(String contextName);
}
{code}

The reason I'd suggest an interface is just in case we want to provide 
additional helper methods in the future. For example, with Java 8 it's really 
easy to temporarily perform logic within a custom context: 

{code:java}
loggingContext.withContext("newContextName", ()->{ ... });
{code}

where that lambda might log messages, and after the lambda completes it 
automatically sets the MDC context back to what it was before this method is 
called. (I'm not saying we need this now, but I wouldn't be surprised if we 
want to add a few helper methods in the future.)

I'm also not a fan of hiding too much (e.g., if Kafka Connector provides 
subclasses of 

[jira] [Comment Edited] (KAFKA-3816) Provide more context in Kafka Connect log messages

2016-06-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323684#comment-15323684
 ] 

Randall Hauch edited comment on KAFKA-3816 at 6/10/16 1:07 AM:
---

{quote}
One thing we might want to do in conjunction with this is to allow the 
framework to allocate threads for connectors and set this up. Background 
threads that monitor sources are a particular case that we don't provide any 
help to connector developers for and would mean we might be missing this info 
in those threads.
{quote}

The simple approach is to have the connector code that creates threads set this 
up, but I agree it's fairly complex and will likely be ignored or not done. We 
can provide a factory method for threads, perhaps on the context, but we'd 
probably have to provide multiple overloaded methods to provide the flexibility 
they need. 

Perhaps a slightly easier way is to provide to the contexts a new interface 
called {{LoggingContext}} that has methods that connector and task 
implementations can call:

{code:java}
public interface LoggingContext {
 public void use(String contextName);
}
{code}

The reason I'd suggest an interface is just in case we want to provide 
additional helper methods in the future. For example, with Java 8 it's really 
easy to temporarily perform logic within a custom context: 

{code:java}
loggingContext.withContext("newContextName", ()->{ ... });
{code}

where that lambda might log messages, and after the lambda completes it 
automatically sets the MDC context back to what it was before this method is 
called. (I'm not saying we need this now, but I wouldn't be surprised if we 
want to add a few helper methods in the future.)

I'm also not a fan of hiding too much (e.g., if Kafka Connector provides 
subclasses of {{SourceTask}} that handle some of the threading automatically). 
In Debezium our {{SourceTask}} implementation actually doesn't create the 
threads; instead, it creates 1 or more "reader" components (depending upon what 
needs to be done) that are used sequentially, and each reader implementations 
(e.g., snapshot reader, binlog reader, etc.) create their own threads if needed 
or uses libraries that create threads. And while libraries rarely give you the 
control you need for thread creation, they often take callbacks or make other 
calls to methods that you provide, and its in those methods that you have a 
chance to set the MDC parameters.

{quote}
For the naming – the scope makes sense but we want a reliable way to extract 
this from the connector without it having to provide extra info if possible. 
Basing this on class name could work. My guess is that 3 parameters should be 
sufficient (and if not, we probably need to do some refactoring to simplify 
that code), but we'll just have to try it to find out.
{quote}

Yeah, I agree. Perhaps a non-abstract method (e.g., {{getConnectorTypeName()}}) 
on the Connector class that defaults to grabbing the class short name. Then, 
subclasses can always override this to provide something shorter and more 
meaningful.


was (Author: rhauch):
{quote}
One thing we might want to do in conjunction with this is to allow the 
framework to allocate threads for connectors and set this up. Background 
threads that monitor sources are a particular case that we don't provide any 
help to connector developers for and would mean we might be missing this info 
in those threads.
{quote}

The simple approach is to have the connector code that creates threads set this 
up, but I agree it's fairly complex and will likely be ignored or not done. We 
can provide a factory method for threads, perhaps on the context, but we'd 
probably have to provide multiple overloaded methods to provide the flexibility 
they need. 

Perhaps a slightly easier way is to provide to the contexts a new interface 
called {{LoggingContext}} that has methods that connector and task 
implementations can call:

{code:java}
public interface LoggingContext {
 public void use(String contextName);
}
{code:java}

The reason I'd suggest an interface is just in case we want to provide 
additional helper methods in the future. For example, with Java 8 it's really 
easy to temporarily perform logic within a custom context: 

{code:java}
loggingContext.withContext("newContextName", ()->{ ... });
{code}

where that lambda might log messages, and after the lambda completes it 
automatically sets the MDC context back to what it was before this method is 
called. (I'm not saying we need this now, but I wouldn't be surprised if we 
want to add a few helper methods in the future.)

I'm also not a fan of hiding too much (e.g., if Kafka Connector provides 
subclasses of {{SourceTask}} that handle some of the threading automatically). 
In Debezium our {{SourceTask}} implementation actually doesn't create the 
threads; instead, it creates 1 or more "reader" components 

[jira] [Commented] (KAFKA-3816) Provide more context in Kafka Connect log messages

2016-06-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323684#comment-15323684
 ] 

Randall Hauch commented on KAFKA-3816:
--

{quote}
One thing we might want to do in conjunction with this is to allow the 
framework to allocate threads for connectors and set this up. Background 
threads that monitor sources are a particular case that we don't provide any 
help to connector developers for and would mean we might be missing this info 
in those threads.
{quote}

The simple approach is to have the connector code that creates threads set this 
up, but I agree it's fairly complex and will likely be ignored or not done. We 
can provide a factory method for threads, perhaps on the context, but we'd 
probably have to provide multiple overloaded methods to provide the flexibility 
they need. 

Perhaps a slightly easier way is to provide to the contexts a new interface 
called {{LoggingContext}} that has methods that connector and task 
implementations can call:

{code:java}
public interface LoggingContext {
 public void use(String contextName);
}
{code:java}

The reason I'd suggest an interface is just in case we want to provide 
additional helper methods in the future. For example, with Java 8 it's really 
easy to temporarily perform logic within a custom context: 

{code:java}
loggingContext.withContext("newContextName", ()->{ ... });
{code}

where that lambda might log messages, and after the lambda completes it 
automatically sets the MDC context back to what it was before this method is 
called. (I'm not saying we need this now, but I wouldn't be surprised if we 
want to add a few helper methods in the future.)

I'm also not a fan of hiding too much (e.g., if Kafka Connector provides 
subclasses of {{SourceTask}} that handle some of the threading automatically). 
In Debezium our {{SourceTask}} implementation actually doesn't create the 
threads; instead, it creates 1 or more "reader" components (depending upon what 
needs to be done) that are used sequentially, and each reader implementations 
(e.g., snapshot reader, binlog reader, etc.) create their own threads if needed 
or uses libraries that create threads. And while libraries rarely give you the 
control you need for thread creation, they often take callbacks or make other 
calls to methods that you provide, and its in those methods that you have a 
chance to set the MDC parameters.

{quote}
For the naming – the scope makes sense but we want a reliable way to extract 
this from the connector without it having to provide extra info if possible. 
Basing this on class name could work. My guess is that 3 parameters should be 
sufficient (and if not, we probably need to do some refactoring to simplify 
that code), but we'll just have to try it to find out.
{quote}

Yeah, I agree. Perhaps a non-abstract method (e.g., {{getConnectorTypeName()}}) 
on the Connector class that defaults to grabbing the class short name. Then, 
subclasses can always override this to provide something shorter and more 
meaningful.

> Provide more context in Kafka Connect log messages
> --
>
> Key: KAFKA-3816
> URL: https://issues.apache.org/jira/browse/KAFKA-3816
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Ewen Cheslack-Postava
>
> Currently it is relatively difficult to correlate individual log messages 
> with the various threads and activities that are going on within a Kafka 
> Connect worker, let along a cluster of workers. Log messages should provide 
> more context to make it easier and to allow log scraping tools to coalesce 
> related log messages.
> One simple way to do this is by using _mapped diagnostic contexts_, or MDC. 
> This is supported by the SLF4J API, and by the Logback and Log4J logging 
> frameworks.
> Basically, the framework would be changed so that each thread is configured 
> with one or more MDC parameters using the 
> {{org.slf4j.MDC.put(String,String)}} method in SLF4J. Once that thread is 
> configured, all log messages made using that thread have that context. The 
> logs can then be configured to use those parameters.
> It would be ideal to define a convention for connectors and the Kafka Connect 
> framework. A single set of MDC parameters means that the logging framework 
> can use the specific parameters on its message formats.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

2016-06-09 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323675#comment-15323675
 ] 

Jun Rao commented on KAFKA-3693:


[~maysamyabandeh], the controller detects broker failure through ZK watchers. 
Every broker registers a path in ZK on startup, and deletes that path on 
shutdown. The controller will be notified when a broker registration path in ZK 
is added or deleted. It's not very clear to me why the controller didn't detect 
the failure of broker 16. Currently, we have a known issue that if a broker is 
restarted too quickly, the controller could miss the change and think that the 
broker never went down. Did that happen?

Also, what's the output from broker 16 when it's shutdown? I am still curious 
why the shutdown takes 37 minutes to complete.

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> ---
>
> Key: KAFKA-3693
> URL: https://issues.apache.org/jira/browse/KAFKA-3693
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
> var partition = allPartitions.get((topic, partitionId))
> if (partition == null) {
>   allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
> val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>   val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>   val offsetMap = checkpoint.read
>   if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
> info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the 
> partitions listed in the replication-offset-checkpoint (and perhaps 
> recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3816) Provide more context in Kafka Connect log messages

2016-06-09 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323651#comment-15323651
 ] 

Ewen Cheslack-Postava commented on KAFKA-3816:
--

This would be awesome, and I'm hoping this can also help us with 
framework-level debugging too -- one of the really tricky things to understand 
in logs is which AbstractCoordinator messages were coming from a consumer vs 
distributed herder.

One thing we might want to do in conjunction with this is to allow the 
framework to allocate threads for connectors and set this up. Background 
threads that monitor sources are a particular case that we don't provide any 
help to connector developers for and would mean we might be missing this info 
in those threads.

For the naming -- the scope makes sense but we want a reliable way to extract 
this from the connector without it having to provide extra info if possible. 
Basing this on class name could work. My guess is that 3 parameters should be 
sufficient (and if not, we probably need to do some refactoring to simplify 
that code), but we'll just have to try it to find out.

> Provide more context in Kafka Connect log messages
> --
>
> Key: KAFKA-3816
> URL: https://issues.apache.org/jira/browse/KAFKA-3816
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Ewen Cheslack-Postava
>
> Currently it is relatively difficult to correlate individual log messages 
> with the various threads and activities that are going on within a Kafka 
> Connect worker, let along a cluster of workers. Log messages should provide 
> more context to make it easier and to allow log scraping tools to coalesce 
> related log messages.
> One simple way to do this is by using _mapped diagnostic contexts_, or MDC. 
> This is supported by the SLF4J API, and by the Logback and Log4J logging 
> frameworks.
> Basically, the framework would be changed so that each thread is configured 
> with one or more MDC parameters using the 
> {{org.slf4j.MDC.put(String,String)}} method in SLF4J. Once that thread is 
> configured, all log messages made using that thread have that context. The 
> logs can then be configured to use those parameters.
> It would be ideal to define a convention for connectors and the Kafka Connect 
> framework. A single set of MDC parameters means that the logging framework 
> can use the specific parameters on its message formats.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2016-06-09 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323648#comment-15323648
 ] 

Ewen Cheslack-Postava commented on KAFKA-3821:
--

I was thinking about this a bit more, along with the fact that having to 
shoehorn extra data into offsets is not ideal.

Maybe a better way to expose this would be to provide a separate {{data}} field 
or something like that, which is also key-value based (similar to source 
partition/source offset) and has as flexible a data structure. We could manage 
the two types of data together and they'd have the same basic semantics, but 
allow you to decouple state/data changes that really only have to happen once 
in awhile from the offsets, which really do need to be associated with every 
message. We could possibly then use a subclass like {{DataOnlySourceRecord}} 
which doesn't trigger any data to be written, but still applies the data 
changes. (I think it might be nice to introduce a parent interface for both 
instead of still calling it {{SourceRecord}}, but I'm not sure we could do that 
in a compatible way.)

[~rhauch] Thoughts? Would this be a better fit for what you're trying to 
accomplish? The change to the output of {{poll()}} to contain more than just 
records to be written to Kafka is a bit weird, but might make sense for these 
use cases and provides framework support for having those changes get 
"committed" asynchronously but still at a safe point.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Ewen Cheslack-Postava
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-09 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323616#comment-15323616
 ] 

Guozhang Wang commented on KAFKA-3811:
--

Yeah I think they are related. Feel free to re-assign to yourself.

One thing that I'm working on is to optimize some metrics overhead due to 
frequent calls to `time.milliseconds`, some details are discussed in 
https://github.com/apache/kafka/pull/1447. So we may have some overlaps in the 
code base, and hence some rebasing may be needed along the process. Just FYI.

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3820) Provide utilities for tracking source offsets

2016-06-09 Thread Liquan Pei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liquan Pei reassigned KAFKA-3820:
-

Assignee: Liquan Pei  (was: Ewen Cheslack-Postava)

> Provide utilities for tracking source offsets
> -
>
> Key: KAFKA-3820
> URL: https://issues.apache.org/jira/browse/KAFKA-3820
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Liquan Pei
>
> OffsetStorageReader does not (and is not expected to) be immediately updated 
> when a SourceRecord is returned from poll(). However, this can be a bit 
> confusing to connector developers as they may return that data, then expect a 
> subsequent read from OffsetStorageReader should match that. In other words, 
> rather than tracking which offset they are at themselves in variables 
> maintained by the task implementation, the connector developer expected 
> OffsetStorageReader to do this for them.
> Part of the confusion comes from the fact that data is sent asynchronously 
> after returned from poll(), which explains the semantics we have. However, it 
> does also mean many connectors have similarly structured code where they keep 
> track of the current offset themselves. It might be nice to provide some 
> utilities, probably through the Context object, to get the last returned 
> offset for each source partition being processed by a task.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3234) Minor documentation edits: clarify minISR; some topic-level configs are missing

2016-06-09 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323554#comment-15323554
 ] 

James Cheng commented on KAFKA-3234:


Hi, what's the status on this JIRA? 

I didn't realize that this JIRA existed, and so filed 
https://issues.apache.org/jira/browse/KAFKA-3809 to add auto-generated 
documentation for topic-level configs. If this one will be moving forward, I 
can close mine as a dup.


> Minor documentation edits: clarify minISR; some topic-level configs are 
> missing
> ---
>
> Key: KAFKA-3234
> URL: https://issues.apache.org/jira/browse/KAFKA-3234
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Joel Koshy
>Assignee: Joel Koshy
> Fix For: 0.10.0.1
>
>
> Based on an offline conversation with [~junrao] and [~gwenshap]
> The current documentation is somewhat confusing on minISR in that it says 
> that it offers a trade-off between consistency and availability. From the 
> user's view-point, consistency (at least in the usual sense of the term) is 
> achieved by disabling unclean leader election - since no replica that was out 
> of ISR can be elected as the leader. So a consumer will never see a message 
> that was not acknowledged to a producer that set acks to "all". Or to put it 
> another way, setting minISR alone will not prevent exposing uncommitted 
> messages - disabling unclean leader election is the stronger requirement. You 
> can achieve the same effect though by setting minISR equal to  the number of 
> replicas.
> There is also some stale documentation that needs to be removed:
> {quote}
> In our current release we choose the second strategy and favor choosing a 
> potentially inconsistent replica when all replicas in the ISR are dead. In 
> the future, we would like to make this configurable to better support use 
> cases where downtime is preferable to inconsistency.
> {quote}
> Finally, it was reported on the mailing list (from Elias Levy) that 
> compression.type should be added under the topic configs. Same goes for 
> unclean leader election. Would be good to have these auto-generated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references

2016-06-09 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323561#comment-15323561
 ] 

Guozhang Wang commented on KAFKA-3801:
--

Hello [~jeff.klu...@gmail.com] could you present a concrete example that a lot 
of serde instances need to be created on-the-fly? Today for most primitive 
types, users should be able to use {{Serdes.Long}} etc to directly get the 
serde objects.

And also specifically in the description, {{KStream.mapValues}} should not 
require a serde object, right?

> Provide static serialize() and deserialize() for use as method references
> -
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3809) Auto-generate documentation for topic-level configuration

2016-06-09 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323557#comment-15323557
 ] 

Ismael Juma commented on KAFKA-3809:


I think there is benefit in doing it in two parts as the auto-generation would 
probably require less discussion than the deprecation of lots of configs.

> Auto-generate documentation for topic-level configuration
> -
>
> Key: KAFKA-3809
> URL: https://issues.apache.org/jira/browse/KAFKA-3809
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: James Cheng
> Attachments: configuration.html, topic_config.html
>
>
> The documentation for topic-level configuration is not auto-generated from 
> the code. configuration.html still contains hand-maintained documentation.
> I noticed this because I wanted to set message.timestamp.type on a topic, and 
> didn't see that it was supported, but grepped through the code and it looked 
> like it was.
> The code to auto-generate the docs is quite close, but needs some additional 
> work. In particular, topic-level configuration is different from all the 
> other ConfigDefs in that topic-level configuration docs list the broker-level 
> config that they inherit from. We would need to have a way to show what 
> broker-level config applies to each topic-level config.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3817) KTableRepartitionMap should handle null inputs

2016-06-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323535#comment-15323535
 ] 

ASF GitHub Bot commented on KAFKA-3817:
---

GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/1488

KAFKA-3817: handle null keys in KTableRepartitionMap



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka 
K3817-handle-null-groupedkey

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1488


commit 7512dd0a03db2a62be469a3d2de0bf8f445ce8e6
Author: Guozhang Wang 
Date:   2016-06-09T22:41:01Z

handle null keys in KTable repartition map




> KTableRepartitionMap should handle null inputs
> --
>
> Key: KAFKA-3817
> URL: https://issues.apache.org/jira/browse/KAFKA-3817
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
> Fix For: 0.10.0.1
>
>
> When calling {{KTable.groupBy}} on the result of a KTable-KTable join, NPEs 
> are raised:
> {{org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$
> > KTableMapProcessor.process(KTableRepartitionMap.java:88)}}
> The root cause is that the join is expected to emit null values when no match 
> is found, but KTableRepartitionMap is not set up to handle this case.
> On the users email list, [~guozhang] described a plan of action:
> I think this is actually a bug in KTableRepartitionMap
> that it actually should expect null grouped keys; this would be a
> straight-forward fix for this operator, but I can make a pass over all the
> repartition operators just to make sure they are all gracefully handling
> null keys.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1488: KAFKA-3817: handle null keys in KTableRepartitionM...

2016-06-09 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/1488

KAFKA-3817: handle null keys in KTableRepartitionMap



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka 
K3817-handle-null-groupedkey

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1488


commit 7512dd0a03db2a62be469a3d2de0bf8f445ce8e6
Author: Guozhang Wang 
Date:   2016-06-09T22:41:01Z

handle null keys in KTable repartition map




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes

2016-06-09 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323527#comment-15323527
 ] 

James Cheng commented on KAFKA-3806:


One impact, which I don't know if it's a large issue, is that the 
most-recent-offset for a given consumer group is stored in memory in the 
broker. There is a process in the broker that continually reads the 
__consumer_offsets topic, and stores them into an in-memory structure, so that 
when a consumer group starts up, the broker can efficiently tell the consumer 
what its most recent offset was.

If you set the offsets.retention.minutes to a very large value, you will end up 
with fewer garbage collections, which means the in-memory aggregate size will 
be larger.


> Adjust default values of log.retention.hours and offsets.retention.minutes
> --
>
> Key: KAFKA-3806
> URL: https://issues.apache.org/jira/browse/KAFKA-3806
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Michal Turek
>Priority: Minor
>
> Combination of default values of log.retention.hours (168 hours = 7 days) and 
> offsets.retention.minutes (1440 minutes = 1 day) may be dangerous in special 
> cases. Offset retention should be always greater than log retention.
> We have observed the following scenario and issue:
> - Producing of data to a topic was disabled two days ago by producer update, 
> topic wasn't deleted.
> - Consumer consumed all data and properly committed offsets to Kafka.
> - Consumer made no more offset commits for that topic because there was no 
> more incoming data and there was nothing to confirm. (We have auto-commit 
> disabled, I'm not sure how behaves enabled auto-commit.)
> - After one day: Kafka cleared too old offsets according to 
> offsets.retention.minutes.
> - After two days: Long-term running consumer was restarted after update, it 
> didn't find any committed offsets for that topic since they were deleted by 
> offsets.retention.minutes so it started consuming from the beginning.
> - The messages were still in Kafka due to larger log.retention.hours, about 5 
> days of messages were read again.
> Known workaround to solve this issue:
> - Explicitly configure log.retention.hours and offsets.retention.minutes, 
> don't use defaults.
> Proposals:
> - Prolong default value of offsets.retention.minutes to be at least twice 
> larger than log.retention.hours.
> - Check these values during Kafka startup and log a warning if 
> offsets.retention.minutes is smaller than log.retention.hours.
> - Add a note to migration guide about differences between storing of offsets 
> in ZooKeeper and Kafka (http://kafka.apache.org/documentation.html#upgrade).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes

2016-06-09 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323521#comment-15323521
 ] 

Jun Rao commented on KAFKA-3806:


[~dwatzke], I think the original intention is that consumers in Kafka are 
typically real time. If a consumer is down for more than a day, chances are 
that it will never come back again and therefore it's offsets won't be needed.

[~tu...@avast.com], it's true that for a given consumer-group,topic,partition, 
after compaction, there will be only one message stored in the offset topic 
eventually. What we wanted to protect against is lots of short lived consumer 
groups. For example, currently, each time when one runs tools like 
console-consumer, a new consumer group is created. In an environment where many 
people are running those tools for a short period of time, it would be useful 
to garbage collect the unused offsets. If you know this is not the case, you 
can increase offsets.retention.minutes.

Yes, ideally, perhaps we should only start the expiration clock once the 
consumer becomes inactive, instead when an offset is last committed. For 
example, one simple improvement is if there is no new update to the offset but 
the consumer is still alive, we can automatically write the last offset with a 
new expiration time. This will address [~wushujames]'s issue better since the 
consumer no longer has to recommit the same offset.

> Adjust default values of log.retention.hours and offsets.retention.minutes
> --
>
> Key: KAFKA-3806
> URL: https://issues.apache.org/jira/browse/KAFKA-3806
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Michal Turek
>Priority: Minor
>
> Combination of default values of log.retention.hours (168 hours = 7 days) and 
> offsets.retention.minutes (1440 minutes = 1 day) may be dangerous in special 
> cases. Offset retention should be always greater than log retention.
> We have observed the following scenario and issue:
> - Producing of data to a topic was disabled two days ago by producer update, 
> topic wasn't deleted.
> - Consumer consumed all data and properly committed offsets to Kafka.
> - Consumer made no more offset commits for that topic because there was no 
> more incoming data and there was nothing to confirm. (We have auto-commit 
> disabled, I'm not sure how behaves enabled auto-commit.)
> - After one day: Kafka cleared too old offsets according to 
> offsets.retention.minutes.
> - After two days: Long-term running consumer was restarted after update, it 
> didn't find any committed offsets for that topic since they were deleted by 
> offsets.retention.minutes so it started consuming from the beginning.
> - The messages were still in Kafka due to larger log.retention.hours, about 5 
> days of messages were read again.
> Known workaround to solve this issue:
> - Explicitly configure log.retention.hours and offsets.retention.minutes, 
> don't use defaults.
> Proposals:
> - Prolong default value of offsets.retention.minutes to be at least twice 
> larger than log.retention.hours.
> - Check these values during Kafka startup and log a warning if 
> offsets.retention.minutes is smaller than log.retention.hours.
> - Add a note to migration guide about differences between storing of offsets 
> in ZooKeeper and Kafka (http://kafka.apache.org/documentation.html#upgrade).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3818) Change Mirror Maker default assignment strategy to round robin

2016-06-09 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-3818:
--

Assignee: Vahid Hashemian

> Change Mirror Maker default assignment strategy to round robin
> --
>
> Key: KAFKA-3818
> URL: https://issues.apache.org/jira/browse/KAFKA-3818
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> It might make more sense to use round robin assignment by default for MM 
> since it gives a better balance between the instances, in particular when the 
> number of MM instances exceeds the typical number of partitions per topic. 
> There doesn't seem to be any need to keep range assignment since 
> copartitioning is not an issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Question on Kafka Consumer

2016-06-09 Thread Govindarajan Srinivasaraghavan
Hello All,

I am working on building an application on cloud and below are the high
level requirements.

- Ingest data into Kafka with say 100 partitions (Incoming rate - 100,000
msgs/sec)
- Read data from Kafka and process each data (Do some computation, compare
with old data, find location etc) real time

I need some suggestions on how to distribute and manage kafka consumers for
my scenario.

- If I deploy my application with kafka consumer threads then how can I
distribute load and rebalance it if I add more partitions or during failure.
- Should I be using some kind of data streaming platform like storm, flink,
samza to get the data from kafka and stream it realtime?

Thanks a bunch and appreciate all your help.

Regards,
Govind


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2016-06-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323429#comment-15323429
 ] 

Randall Hauch commented on KAFKA-3821:
--

Source tasks produce records by returning them from their {{poll()}} method, 
and the records in the resulting list are ordered, and the lists of records are 
ordered based upon the sequence of calls to {{poll()}}. Therefore, a 
straightforward way to update the offset explicitly while maintaining this 
order is to request it via inclusion of a special {{SourceRecord}} within the 
list returned by {{poll()}}. Some ideas include:

* a {{SourceRecord}} object with a null (or special) topic, or a null key and 
value; or
* a {{SourceRecord}} object with a null key and value; or
* a subclass of {{SourceRecord}} that denotes this particular kind of request.

Neither is clean or elegant. The first two are very implicit, while the second 
requires more processing overhead and logic for every {{SourceRecord}} (though 
there are ways around this that may be acceptable).

Any other thoughts?

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Ewen Cheslack-Postava
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2016-06-09 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-3821:


 Summary: Allow Kafka Connect source tasks to produce offset 
without writing to topics
 Key: KAFKA-3821
 URL: https://issues.apache.org/jira/browse/KAFKA-3821
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 0.9.0.1
Reporter: Randall Hauch
Assignee: Ewen Cheslack-Postava


Provide a way for a {{SourceTask}} implementation to record a new offset for a 
given partition without necessarily writing a source record to a topic.

Consider a connector task that uses the same offset when producing an unknown 
number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
database). Once the task completes those records, the connector wants to update 
the offsets (e.g., the snapshot is complete) but has no more records to be 
written to a topic. With this change, the task could simply supply an updated 
offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3814) Connectors should be able to know their name

2016-06-09 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-3814.
--
Resolution: Invalid

Apologies. There is a {{name}} property. Not sure what I was thinking.

> Connectors should be able to know their name
> 
>
> Key: KAFKA-3814
> URL: https://issues.apache.org/jira/browse/KAFKA-3814
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
>
> Currently a connector implementation does not have access to the unique name 
> that Kafka Connect framework uses for each connector. For example, the 
> [http://docs.confluent.io/3.0.0/connect/userguide.html#rest-interface|REST 
> API] of the workers make use of this name.
> The connectors could use this name in a number of ways, not the least of 
> which is using the name in log messages to help the administrator correlate 
> messages with configured connectors.
> It is possible to work around this by using an additional configuration 
> property, although it is not ideal since the property value and connector 
> name might diverge, making it harder to correlate the two.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3814) Connectors should be able to know their name

2016-06-09 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323385#comment-15323385
 ] 

Ewen Cheslack-Postava commented on KAFKA-3814:
--

[~rhauch] Is this true? I would think you could get it out of the configuration 
you're passed in start()? It might still make sense to include this 
automatically (and tasks might want to know their task ID too), but the info 
you want might already be available.

> Connectors should be able to know their name
> 
>
> Key: KAFKA-3814
> URL: https://issues.apache.org/jira/browse/KAFKA-3814
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
>
> Currently a connector implementation does not have access to the unique name 
> that Kafka Connect framework uses for each connector. For example, the 
> [http://docs.confluent.io/3.0.0/connect/userguide.html#rest-interface|REST 
> API] of the workers make use of this name.
> The connectors could use this name in a number of ways, not the least of 
> which is using the name in log messages to help the administrator correlate 
> messages with configured connectors.
> It is possible to work around this by using an additional configuration 
> property, although it is not ideal since the property value and connector 
> name might diverge, making it harder to correlate the two.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-09 Thread aarti gupta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323347#comment-15323347
 ] 

aarti gupta commented on KAFKA-3811:


Wondering how this relates to https://issues.apache.org/jira/browse/KAFKA-3715, 
https://github.com/apache/kafka/pull/1446

Do user defined metrics have a level?

[~guozhang] would you like me to take this up? (if you think it overlaps with 
https://issues.apache.org/jira/browse/KAFKA-3715)

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3820) Provide utilities for tracking source offsets

2016-06-09 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-3820:


 Summary: Provide utilities for tracking source offsets
 Key: KAFKA-3820
 URL: https://issues.apache.org/jira/browse/KAFKA-3820
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava


OffsetStorageReader does not (and is not expected to) be immediately updated 
when a SourceRecord is returned from poll(). However, this can be a bit 
confusing to connector developers as they may return that data, then expect a 
subsequent read from OffsetStorageReader should match that. In other words, 
rather than tracking which offset they are at themselves in variables 
maintained by the task implementation, the connector developer expected 
OffsetStorageReader to do this for them.

Part of the confusion comes from the fact that data is sent asynchronously 
after returned from poll(), which explains the semantics we have. However, it 
does also mean many connectors have similarly structured code where they keep 
track of the current offset themselves. It might be nice to provide some 
utilities, probably through the Context object, to get the last returned offset 
for each source partition being processed by a task.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3753) Add size() to the StateStore interface for metrics reporting

2016-06-09 Thread Jeff Klukas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323342#comment-15323342
 ] 

Jeff Klukas commented on KAFKA-3753:


I agree that {{size}} is a bit ambiguous. I'd expect many folks would recognize 
the analogy to Java's {{Map.size}}, but there's no reason we need to stick with 
that.

RocksDB has nice consistency in their naming of properties, where {{size}} 
refers to bytes and {{num-entries}} refers to a count.

The PR discussion also brings up the point that since we can only get an 
estimated count from RocksDB, the name of this method should indicate that the 
result is not necessarily exact.

I'd be happy to see this method called {{estimatedCount}}, 
{{estimatedNumEntries}}, {{approximateCount}}, or some other variant.

> Add size() to the StateStore interface for metrics reporting
> 
>
> Key: KAFKA-3753
> URL: https://issues.apache.org/jira/browse/KAFKA-3753
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: api
> Fix For: 0.10.1.0
>
>
> As a developer building a Kafka Streams application, I'd like to have 
> visibility into what's happening with my state stores. How can I know if a 
> particular store is growing large? How can I know if a particular store is 
> frequently needing to hit disk?
> I'm interested to know if there are existing mechanisms for extracting this 
> information or if other people have thoughts on how we might approach this.
> I can't think of a way to provide metrics generically, so each state store 
> implementation would likely need to handle this separately. Given that the 
> default RocksDBStore will likely be the most-used, it would be a first target 
> for adding metrics.
> I'd be interested in knowing the total number of entries in the store, the 
> total size on disk and in memory, rates of gets and puts, and hit/miss ratio 
> for the MemoryLRUCache. Some of these numbers are likely calculable through 
> the RocksDB API, others may simply not be accessible.
> Would there be value to the wider community in having state stores register 
> metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3819) Provide utilities for polling source connectors

2016-06-09 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-3819:


 Summary: Provide utilities for polling source connectors
 Key: KAFKA-3819
 URL: https://issues.apache.org/jira/browse/KAFKA-3819
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava


Source connectors that need to poll for data are currently responsible for 
managing their own sleeping/backoff if they don't have any new data available. 
This is becoming a very common pattern. It's also easy to implement it 
incorrectly, e.g. by using Thread.sleep and not properly interrupting on stop().

We should probably provide some utilities, maybe just exposed via the Context 
object to implement this for connector developers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3819) Provide utilities for polling source connectors

2016-06-09 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-3819:
-
Issue Type: New Feature  (was: Bug)

> Provide utilities for polling source connectors
> ---
>
> Key: KAFKA-3819
> URL: https://issues.apache.org/jira/browse/KAFKA-3819
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>
> Source connectors that need to poll for data are currently responsible for 
> managing their own sleeping/backoff if they don't have any new data 
> available. This is becoming a very common pattern. It's also easy to 
> implement it incorrectly, e.g. by using Thread.sleep and not properly 
> interrupting on stop().
> We should probably provide some utilities, maybe just exposed via the Context 
> object to implement this for connector developers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3818) Change Mirror Maker default assignment strategy to round robin

2016-06-09 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-3818:
--

 Summary: Change Mirror Maker default assignment strategy to round 
robin
 Key: KAFKA-3818
 URL: https://issues.apache.org/jira/browse/KAFKA-3818
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


It might make more sense to use round robin assignment by default for MM since 
it gives a better balance between the instances, in particular when the number 
of MM instances exceeds the typical number of partitions per topic. There 
doesn't seem to be any need to keep range assignment since copartitioning is 
not an issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3753) Add size() to the StateStore interface for metrics reporting

2016-06-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323324#comment-15323324
 ] 

Jay Kreps commented on KAFKA-3753:
--

I think size is a bit ambiguous, it could mean the number of bytes and it could 
mean the number of rows. Will that confuse people? (Not sure, actually asking). 
An alternative would be something like sizeOnDisk().

> Add size() to the StateStore interface for metrics reporting
> 
>
> Key: KAFKA-3753
> URL: https://issues.apache.org/jira/browse/KAFKA-3753
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: api
> Fix For: 0.10.1.0
>
>
> As a developer building a Kafka Streams application, I'd like to have 
> visibility into what's happening with my state stores. How can I know if a 
> particular store is growing large? How can I know if a particular store is 
> frequently needing to hit disk?
> I'm interested to know if there are existing mechanisms for extracting this 
> information or if other people have thoughts on how we might approach this.
> I can't think of a way to provide metrics generically, so each state store 
> implementation would likely need to handle this separately. Given that the 
> default RocksDBStore will likely be the most-used, it would be a first target 
> for adding metrics.
> I'd be interested in knowing the total number of entries in the store, the 
> total size on disk and in memory, rates of gets and puts, and hit/miss ratio 
> for the MemoryLRUCache. Some of these numbers are likely calculable through 
> the RocksDB API, others may simply not be accessible.
> Would there be value to the wider community in having state stores register 
> metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3817) KTableRepartitionMap should handle null inputs

2016-06-09 Thread Jeff Klukas (JIRA)
Jeff Klukas created KAFKA-3817:
--

 Summary: KTableRepartitionMap should handle null inputs
 Key: KAFKA-3817
 URL: https://issues.apache.org/jira/browse/KAFKA-3817
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.0
Reporter: Jeff Klukas
Assignee: Guozhang Wang
 Fix For: 0.10.0.1


When calling {{KTable.groupBy}} on the result of a KTable-KTable join, NPEs are 
raised:

{{org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$
> KTableMapProcessor.process(KTableRepartitionMap.java:88)}}

The root cause is that the join is expected to emit null values when no match 
is found, but KTableRepartitionMap is not set up to handle this case.

On the users email list, [~guozhang] described a plan of action:

I think this is actually a bug in KTableRepartitionMap
that it actually should expect null grouped keys; this would be a
straight-forward fix for this operator, but I can make a pass over all the
repartition operators just to make sure they are all gracefully handling
null keys.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3802) log mtimes reset on broker restart

2016-06-09 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3802:
---
Fix Version/s: 0.10.0.1

> log mtimes reset on broker restart
> --
>
> Key: KAFKA-3802
> URL: https://issues.apache.org/jira/browse/KAFKA-3802
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Andrew Otto
> Fix For: 0.10.0.1
>
>
> Folks over in 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201605.mbox/%3CCAO8=cz0ragjad1acx4geqcwj+rkd1gmdavkjwytwthkszfg...@mail.gmail.com%3E
>  are commenting about this issue.
> In 0.9, any data log file that was on
> disk before the broker has it's mtime modified to the time of the broker
> restart.
> This causes problems with log retention, as all the files then look like
> they contain recent data to kafka.  We use the default log retention of 7
> days, but if all the files are touched at the same time, this can cause us
> to retain up to 2 weeks of log data, which can fill up our disks.
> This happens *most* of the time, but seemingly not all.  We have seen broker 
> restarts where mtimes were not changed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2016-06-09 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323299#comment-15323299
 ] 

Evan Huus commented on KAFKA-2082:
--

This issue has been "In Progress" since April 2015, any actual progress in that 
time?

> Kafka Replication ends up in a bad state
> 
>
> Key: KAFKA-2082
> URL: https://issues.apache.org/jira/browse/KAFKA-2082
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.2.1
>Reporter: Evan Huus
>Assignee: Sriharsha Chintalapani
>Priority: Critical
>  Labels: zkclient-problems
> Attachments: KAFKA-2082.patch
>
>
> While running integration tests for Sarama (the go client) we came across a 
> pattern of connection losses that reliably puts kafka into a bad state: 
> several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
> with hundreds of thousands of lines like:
> {noformat}
> [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
> request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
> partition [many_partition,1] failed due to Leader not local for partition 
> [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
> [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
> request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
> partition [many_partition,6] failed due to Leader not local for partition 
> [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
> [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
> request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
> partition [many_partition,21] failed due to Leader not local for partition 
> [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
> [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
> request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
> partition [many_partition,26] failed due to Leader not local for partition 
> [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
> [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
> request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
> partition [many_partition,1] failed due to Leader not local for partition 
> [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
> [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
> request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
> partition [many_partition,6] failed due to Leader not local for partition 
> [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
> [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
> request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
> partition [many_partition,21] failed due to Leader not local for partition 
> [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
> [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
> request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
> partition [many_partition,26] failed due to Leader not local for partition 
> [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
> {noformat}
> This can be easily and reliably reproduced using the {{toxiproxy-final}} 
> branch of https://github.com/Shopify/sarama which includes a vagrant script 
> for provisioning the appropriate cluster: 
> - {{git clone https://github.com/Shopify/sarama.git}}
> - {{git checkout test-jira-kafka-2082}}
> - {{vagrant up}}
> - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
> After the test finishes (it fails because the cluster ends up in a bad 
> state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
> the bad nodes. The vagrant script provisions five zookeepers and five brokers 
> in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}.
> Additional context: the test produces continually to the cluster while 
> randomly cutting and restoring zookeeper connections (all connections to 
> zookeeper are run through a simple proxy on the same vm to make this easy). 
> The majority of the time this works very well and does a good job exercising 
> our producer's retry and failover code. However, under certain patterns of 
> connection loss (the {{TEST_SEED}} in the instructions is important), kafka 
> gets confused. The test never cuts more than two connections at a time, so 
> zookeeper should always have quorum, and the topic (with three replicas) 
> should always be writable.
> Completely restarting the cluster via {{vagrant reload}} seems to put it back 
> into a sane state.



--
This message was sent by Atlassian JIRA

Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-09 Thread Jay Kreps
Super sorry to come in late on this one. Rajini, I had two quick questions
I think we should be able to answer:

   1. Do client ids make sense in a world which has users? If not should we
   unify them the way Hadoop did (without auth the user is a kind of best
   effort honor system identity). This came up in the discussion thread but I
   didn't really see a crisp answer. Basically, what is the definition of
   "client id" and what is the definition of "user" and how do the concepts
   relate?
   2. If both client ids and users are sensible distinct notions and we
   want to maintain both, why don't we just support quotas on both? If they
   both make sense then you would have a reason to set quotas at both levels.
   Why have this "mode" that you set that swaps between only being able to use
   one and the other? I should be able to set quotas at both levels. Going
   forward the model we had discussed with quotas was potentially being able
   to set quotas for many things independently (say at the topic level), and I
   don't think it would make sense to extend this mode approach to those.

-Jay

On Wed, Jun 8, 2016 at 12:56 PM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> I would like to initiate the vote for KIP-55.
>
> The KIP details are here: KIP-55: Secure quotas for authenticated users
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-55%3A+Secure+Quotas+for+Authenticated+Users
> >
> .
>
> The JIRA  KAFKA-3492   >has
> a draft PR here: https://github.com/apache/kafka/pull/1256.
>
> Thank you...
>
> Regards,
>
> Rajini
>


[jira] [Commented] (KAFKA-3816) Provide more context in Kafka Connect log messages

2016-06-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323179#comment-15323179
 ] 

Randall Hauch commented on KAFKA-3816:
--

One example of a set of 3 parameters would work for connectors and (hopefully) 
framework components:

* {{scope}} - A short alias for the type of connector or framework component. 
For example, {{jdbc}}, {{file}}, {{mysql}}, {{mongo}}, {{postgres}}, etc. for 
connector types, or {{config}}, {{monitor}}, {{worker}}, etc. for framework 
components.
* {{component}} - The name of the component, such as the connector or component 
name. Ideally all threads for a single connector would use the same value.
* {{componentContext}} - A short name for a separate thread running within the 
connector's task. For example, {{main}} for the SourceTask implementation 
class, or the thread name for threads that each handle multiple source 
resources (e.g., tables, collections, etc.). In some cases when a thread is 
allocated for a specific resource (e.g., table or collection), the name of that 
resource could be used instead. Each thread associated with a connector would 
use a distinct value. 

Thus, a log message for a MySQL connector named "product-connector" might have 
log messages that look like:

{code}
2016-06-09 16:56:52,191 INFO   MySQL|product-connector|snapshot  Started 
snapshot
2016-06-09 16:56:52,192 INFO   MySQL|product-connector|snapshot  Completed 
snapshot
2016-06-09 16:56:52,193 INFO   MySQL|product-connector|binlog  Started reading 
binlog
{code}

I'm not sure if this is sufficient to handle all framework contexts, though.

> Provide more context in Kafka Connect log messages
> --
>
> Key: KAFKA-3816
> URL: https://issues.apache.org/jira/browse/KAFKA-3816
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Ewen Cheslack-Postava
>
> Currently it is relatively difficult to correlate individual log messages 
> with the various threads and activities that are going on within a Kafka 
> Connect worker, let along a cluster of workers. Log messages should provide 
> more context to make it easier and to allow log scraping tools to coalesce 
> related log messages.
> One simple way to do this is by using _mapped diagnostic contexts_, or MDC. 
> This is supported by the SLF4J API, and by the Logback and Log4J logging 
> frameworks.
> Basically, the framework would be changed so that each thread is configured 
> with one or more MDC parameters using the 
> {{org.slf4j.MDC.put(String,String)}} method in SLF4J. Once that thread is 
> configured, all log messages made using that thread have that context. The 
> logs can then be configured to use those parameters.
> It would be ideal to define a convention for connectors and the Kafka Connect 
> framework. A single set of MDC parameters means that the logging framework 
> can use the specific parameters on its message formats.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3815) Support command line arguments in Kafka Connect distribute worker

2016-06-09 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-3815:


 Summary: Support command line arguments in Kafka Connect 
distribute worker
 Key: KAFKA-3815
 URL: https://issues.apache.org/jira/browse/KAFKA-3815
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 0.9.0.1
Reporter: Randall Hauch
Assignee: Ewen Cheslack-Postava


Change the Kafka Connect distribute worker so that one connector could be 
configured via the command line. This would make it much easier to define 
immutable containers (e.g., Docker, Kubernetes), where each container runs a 
single distributed worker with a single configured connector. A "force" flag 
might specify whether any existing configuration could be overwritten by the 
configuration passed via the command line.

In fact, distributed environments that run immutable containers, especially 
Kubernetes and OpenShift, would benefit greatly from being able to run each 
Kafka Connect connector in one or more containers that are configured exactly 
the same way and running as a single Kafka Connect group. Because the Kafka 
Connect group has only a single configured connector, the group experiences no 
unnecessary rebalances that would normally occur in other topologies with 
multiple connectors deployed to one Kafka Connect group.

Ideally, the distributed worker could also be run in read-only mode so that the 
connector configuration cannot be changed via the REST API. This would only 
help to reinforce the connector as being immutable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3753) Add size() to the StateStore interface for metrics reporting

2016-06-09 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3753:
-
Summary: Add size() to the StateStore interface for metrics reporting  
(was: Metrics for StateStores)

> Add size() to the StateStore interface for metrics reporting
> 
>
> Key: KAFKA-3753
> URL: https://issues.apache.org/jira/browse/KAFKA-3753
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: api
> Fix For: 0.10.1.0
>
>
> As a developer building a Kafka Streams application, I'd like to have 
> visibility into what's happening with my state stores. How can I know if a 
> particular store is growing large? How can I know if a particular store is 
> frequently needing to hit disk?
> I'm interested to know if there are existing mechanisms for extracting this 
> information or if other people have thoughts on how we might approach this.
> I can't think of a way to provide metrics generically, so each state store 
> implementation would likely need to handle this separately. Given that the 
> default RocksDBStore will likely be the most-used, it would be a first target 
> for adding metrics.
> I'd be interested in knowing the total number of entries in the store, the 
> total size on disk and in memory, rates of gets and puts, and hit/miss ratio 
> for the MemoryLRUCache. Some of these numbers are likely calculable through 
> the RocksDB API, others may simply not be accessible.
> Would there be value to the wider community in having state stores register 
> metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3814) Connectors should be able to know their name

2016-06-09 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-3814:


 Summary: Connectors should be able to know their name
 Key: KAFKA-3814
 URL: https://issues.apache.org/jira/browse/KAFKA-3814
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 0.9.0.1
Reporter: Randall Hauch
Assignee: Ewen Cheslack-Postava
Priority: Minor


Currently a connector implementation does not have access to the unique name 
that Kafka Connect framework uses for each connector. For example, the 
[http://docs.confluent.io/3.0.0/connect/userguide.html#rest-interface|REST API] 
of the workers make use of this name.

The connectors could use this name in a number of ways, not the least of which 
is using the name in log messages to help the administrator correlate messages 
with configured connectors.

It is possible to work around this by using an additional configuration 
property, although it is not ideal since the property value and connector name 
might diverge, making it harder to correlate the two.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3813) Let SourceConnector implementations access the offset reader

2016-06-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323112#comment-15323112
 ] 

Randall Hauch commented on KAFKA-3813:
--

The current API defines {{ConnectorContext}} interface and uses it when 
initializing {{SourceConnector}} and {{SinkConnector}} instances, and there is 
no specialization of {{ConnectorContext}}.

There seem to be (at least) two approaches to implementing this improvement:

# Add {{SourceConnectorContext}} and {{SinkConnectorContext}} specializations 
of the {{ConnectorContext}}, add a {{offsetStorageReader()}} method to 
{{SourceConnectorContext}}, and change the rest of the Kafka Connect framework 
to use these. This would likely be a breaking change, but it would distinguish 
between the contexts for source and sink connectors, allowing the contexts to 
evolve differently for different kinds of connectors.
# Add {{offsetStorageReader()}} to {{ConnectorContext}}. This does add a method 
and would be byte-code compatible, and thus would be far less invasive and more 
straightforward.

Thoughts?

> Let SourceConnector implementations access the offset reader
> 
>
> Key: KAFKA-3813
> URL: https://issues.apache.org/jira/browse/KAFKA-3813
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Assignee: Ewen Cheslack-Postava
>
> When a source connector is started, having access to the 
> {{OffsetStorageReader}} would allow it to more intelligently configure its 
> tasks based upon the stored offsets. (Currently only the {{SourceTask}} 
> implementations can access the offset reader (via the {{SourceTaskContext}}), 
> but the {{SourceConnector}} does not have access to the offset reader.)
> Of course, accessing the stored offsets is not likely to be useful for sink 
> connectors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3813) Let SourceConnector implementations access the offset reader

2016-06-09 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-3813:


 Summary: Let SourceConnector implementations access the offset 
reader
 Key: KAFKA-3813
 URL: https://issues.apache.org/jira/browse/KAFKA-3813
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 0.10.0.0
Reporter: Randall Hauch
Assignee: Ewen Cheslack-Postava


When a source connector is started, having access to the 
{{OffsetStorageReader}} would allow it to more intelligently configure its 
tasks based upon the stored offsets. (Currently only the {{SourceTask}} 
implementations can access the offset reader (via the {{SourceTaskContext}}), 
but the {{SourceConnector}} does not have access to the offset reader.)

Of course, accessing the stored offsets is not likely to be useful for sink 
connectors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3805) Running multiple instances of a Streams app on the same machine results in Java_org_rocksdb_RocksDB_write error

2016-06-09 Thread Andres Gomez Ferrer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323070#comment-15323070
 ] 

Andres Gomez Ferrer commented on KAFKA-3805:


ok, thanks!!

> Running multiple instances of a Streams app on the same machine results in 
> Java_org_rocksdb_RocksDB_write error
> ---
>
> Key: KAFKA-3805
> URL: https://issues.apache.org/jira/browse/KAFKA-3805
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: * Ubuntu Server 16.04 LTS
> - Java: OpenJDK Runtime Environment (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
> * OSX 10.11.5
>- Java: Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
>Reporter: Andres Gomez Ferrer
>Assignee: Eno Thereska
>Priority: Critical
> Attachments: hs_err_pid23047.log, kafka-streams-testing.zip
>
>
> I reproduce the error working with simple two instances of  my kafka streams 
> application reading from one topic writing on other and using a RocksDB store.
> I reproduce it starting one instance, sending some messages on the input 
> topic and later starting another Kafka streams instance. Then, the first 
> instance died and drop a core dump.
> --
>  A fatal error has been detected by the Java Runtime Environment:
>   SIGSEGV (0xb) at pc=0x7f9f488fd8e8, pid=23047, tid=140322171361024
>  JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
>  Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
>  Problematic frame:
>  C  [librocksdbjni712288404493406713..so+0x15b8e8]  
> Java_org_rocksdb_RocksDB_write0+0x48
>  Core dump written. Default location: /root/core or core.23047
>  An error report file with more information is saved as:
>  /root/hs_err_pid23047.log
>  If you would like to submit a bug report, please visit:
>http://bugreport.java.com/bugreport/crash.jsp
>  The crash happened outside the Java Virtual Machine in native code.
>  See problematic frame for where to report the bug.
> --
> The core dump is there: 
> https://s3-eu-west-1.amazonaws.com/arodriguezg-test/core
> Note: The core dump is collected at Ubuntu Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3805) Running multiple instances of a Streams app on the same machine results in Java_org_rocksdb_RocksDB_write error

2016-06-09 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323059#comment-15323059
 ] 

Eno Thereska commented on KAFKA-3805:
-

[~agomez] I created another JIRA for the case when one uses the same state dir 
for multiple processes (https://issues.apache.org/jira/browse/KAFKA-3812). The 
fix for that requires more thought and I don't want to delay the use of 
different state dirs meanwhile.

> Running multiple instances of a Streams app on the same machine results in 
> Java_org_rocksdb_RocksDB_write error
> ---
>
> Key: KAFKA-3805
> URL: https://issues.apache.org/jira/browse/KAFKA-3805
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: * Ubuntu Server 16.04 LTS
> - Java: OpenJDK Runtime Environment (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
> * OSX 10.11.5
>- Java: Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
>Reporter: Andres Gomez Ferrer
>Assignee: Eno Thereska
>Priority: Critical
> Attachments: hs_err_pid23047.log, kafka-streams-testing.zip
>
>
> I reproduce the error working with simple two instances of  my kafka streams 
> application reading from one topic writing on other and using a RocksDB store.
> I reproduce it starting one instance, sending some messages on the input 
> topic and later starting another Kafka streams instance. Then, the first 
> instance died and drop a core dump.
> --
>  A fatal error has been detected by the Java Runtime Environment:
>   SIGSEGV (0xb) at pc=0x7f9f488fd8e8, pid=23047, tid=140322171361024
>  JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
>  Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
>  Problematic frame:
>  C  [librocksdbjni712288404493406713..so+0x15b8e8]  
> Java_org_rocksdb_RocksDB_write0+0x48
>  Core dump written. Default location: /root/core or core.23047
>  An error report file with more information is saved as:
>  /root/hs_err_pid23047.log
>  If you would like to submit a bug report, please visit:
>http://bugreport.java.com/bugreport/crash.jsp
>  The crash happened outside the Java Virtual Machine in native code.
>  See problematic frame for where to report the bug.
> --
> The core dump is there: 
> https://s3-eu-west-1.amazonaws.com/arodriguezg-test/core
> Note: The core dump is collected at Ubuntu Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3812) State store locking is incorrect

2016-06-09 Thread Eno Thereska (JIRA)
Eno Thereska created KAFKA-3812:
---

 Summary: State store locking is incorrect
 Key: KAFKA-3812
 URL: https://issues.apache.org/jira/browse/KAFKA-3812
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Eno Thereska
Assignee: Eno Thereska
 Fix For: 0.10.1.0


In {{ProcessorStateManager}} in the {{lockStateDirectory}} method, we close a 
channel if the lock is null. However, as described in 
https://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileLock.html, 
"...closing a channel releases all locks held by the Java virtual machine on 
the underlying file regardless of whether the locks were acquired via that 
channel or via another channel open on the same file. It is strongly 
recommended that, within a program, a unique channel be used to acquire all 
locks on any given file.". Thus closing this channel sometimes leads to the 
other lock in {{ProcessorStateManager}}, called {{directoryLock}} to be 
released. 

Problem is reproduced as part of the steps in KAFKA-3805, when two processes 
are using the same state directory.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-09 Thread Tom Crayford
+1 (non-binding)

On Wed, Jun 8, 2016 at 8:56 PM, Rajini Sivaram  wrote:

> I would like to initiate the vote for KIP-55.
>
> The KIP details are here: KIP-55: Secure quotas for authenticated users
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-55%3A+Secure+Quotas+for+Authenticated+Users
> >
> .
>
> The JIRA  KAFKA-3492   >has
> a draft PR here: https://github.com/apache/kafka/pull/1256.
>
> Thank you...
>
> Regards,
>
> Rajini
>


[jira] [Work started] (KAFKA-3805) Running multiple instances of a Streams app on the same machine results in Java_org_rocksdb_RocksDB_write error

2016-06-09 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3805 started by Eno Thereska.
---
> Running multiple instances of a Streams app on the same machine results in 
> Java_org_rocksdb_RocksDB_write error
> ---
>
> Key: KAFKA-3805
> URL: https://issues.apache.org/jira/browse/KAFKA-3805
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: * Ubuntu Server 16.04 LTS
> - Java: OpenJDK Runtime Environment (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
> * OSX 10.11.5
>- Java: Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
>Reporter: Andres Gomez Ferrer
>Assignee: Eno Thereska
>Priority: Critical
> Attachments: hs_err_pid23047.log, kafka-streams-testing.zip
>
>
> I reproduce the error working with simple two instances of  my kafka streams 
> application reading from one topic writing on other and using a RocksDB store.
> I reproduce it starting one instance, sending some messages on the input 
> topic and later starting another Kafka streams instance. Then, the first 
> instance died and drop a core dump.
> --
>  A fatal error has been detected by the Java Runtime Environment:
>   SIGSEGV (0xb) at pc=0x7f9f488fd8e8, pid=23047, tid=140322171361024
>  JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
>  Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
>  Problematic frame:
>  C  [librocksdbjni712288404493406713..so+0x15b8e8]  
> Java_org_rocksdb_RocksDB_write0+0x48
>  Core dump written. Default location: /root/core or core.23047
>  An error report file with more information is saved as:
>  /root/hs_err_pid23047.log
>  If you would like to submit a bug report, please visit:
>http://bugreport.java.com/bugreport/crash.jsp
>  The crash happened outside the Java Virtual Machine in native code.
>  See problematic frame for where to report the bug.
> --
> The core dump is there: 
> https://s3-eu-west-1.amazonaws.com/arodriguezg-test/core
> Note: The core dump is collected at Ubuntu Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] scalability limits in the coordinator

2016-06-09 Thread Onur Karaman
I think the value of adding a "offsets.replica.fetch.max.bytes" config is
that we don't break/change the meaning of "replica.fetch.max.bytes".

We can also set "offsets.replica.fetch.max.bytes" to be a value safely
larger than what we expect to ever allow the __consumer_offsets topic max
message size to be without doing the larger change of bumping up the global
"replica.fetch.max.bytes".

On Thu, Jun 9, 2016 at 10:40 AM, Becket Qin  wrote:

> I think taking bigger one of the fetch size and message size limit is
> probably good enough. If we have a separate
> "offset.replica.fetch.max.bytes", I guess the value will always be set to
> max message size of the __consumer_offsets topic, which does not seem to
> have much value.
>
> On Thu, Jun 9, 2016 at 3:15 AM, Onur Karaman  >
> wrote:
>
> > Maybe another approach can be to add a new
> > "offsets.replica.fetch.max.bytes" config on the brokers.
> >
> > On Thu, Jun 9, 2016 at 3:03 AM, Onur Karaman 
> > wrote:
> >
> > > I made a PR with a tweak to Jun's/Becket's proposal:
> > > https://github.com/apache/kafka/pull/1484
> > >
> > > It just tweaks the fetch behavior specifically for replicas fetching
> from
> > > the __consumer_offsets topic when the fetcher's
> "replica.fetch.max.bytes"
> > > is less than the __consumer_offset leader's "message.max.bytes" to take
> > the
> > > max of the two.
> > >
> > > I'm honestly not that happy with this solution, as I'd rather not
> change
> > > the "replica.fetch.max.bytes" config from being a limit to a
> > > recommendation. I'd definitely be happy to hear other alternatives!
> > >
> > > On Sun, May 29, 2016 at 1:57 PM, Onur Karaman <
> > > onurkaraman.apa...@gmail.com> wrote:
> > >
> > >> Sorry I know next to nothing about Kafka Connect. I didn't understand
> > the
> > >> Kafka Connect / MM idea you brought up. Can you go into more detail?
> > >>
> > >> Otherwise I think our remaining options are:
> > >> - Jun's suggestion to bump up the KafkaConfig.messageMaxBytes for
> > >> __consumer_offsets topic and change the fetch behavior when message
> size
> > >> is
> > >> larger than fetch size
> > >> - option 6: support sending the regex over the wire instead of the
> fully
> > >> expanded topic subscriptions. This should cut down the message size
> from
> > >> the subscription side. Again this only helps when pattern-based
> > >> subscriptions are done.
> > >>
> > >> minor correction to an earlier comment I made regarding the message
> > size:
> > >> message size ~ sum(s_i + a_i for i in range [1, |C|])
> > >>
> > >> On Thu, May 26, 2016 at 3:35 PM, Jason Gustafson 
> > >> wrote:
> > >>
> > >> > Hey Onur,
> > >> >
> > >> > Thanks for the investigation. It seems the conclusion is that the
> > >> compact
> > >> > format helps, but perhaps not enough to justify adding a new
> > assignment
> > >> > schema? I'm not sure there's much more room for savings unless we
> > change
> > >> > something more fundamental in the assignment approach. We spent some
> > >> time
> > >> > thinking before about whether we could let the consumers compute
> their
> > >> > assignment locally from a smaller set of information, but the
> > difficulty
> > >> > (I'm sure you remember) is reaching consensus on topic metadata.
> Kafka
> > >> > Connect has a similar problem where all the workers need to agree on
> > >> > connector configurations. Since all configs are stored in a single
> > topic
> > >> > partition, the approach we take there is to propagate the offset in
> > the
> > >> > assignment protocol. Not sure if we could do something similar for
> > MM...
> > >> > Anyway, it seems like the best workaround at the moment is Jun's
> > initial
> > >> > suggestion. What do you think?
> > >> >
> > >> > -Jason
> > >> >
> > >> > On Wed, May 25, 2016 at 10:47 PM, Onur Karaman <
> > >> > onurkaraman.apa...@gmail.com
> > >> > > wrote:
> > >> >
> > >> > > I gave the topic index assignment trick a try against the same
> > >> > environment.
> > >> > > The implementation just changed the assignment serialization and
> > >> > > deserialization logic. It didn't change SyncGroupResponse, meaning
> > it
> > >> > > continues to exclude the subscription from the SyncGroupResponse
> and
> > >> > > assumes the member has kept track of its last subscription.
> > >> > >
> > >> > > Assignment topic indexing with compression:
> > >> > > 1 consumer 34346 bytes
> > >> > > 5 consumers 177687 bytes
> > >> > > 10 consumers 331897 bytes
> > >> > > 20 consumers 572467 bytes
> > >> > > 30 consumers 811269 bytes
> > >> > > 40 consumers 1047188 bytes * the tipping point
> > >> > > 50 consumers 1290092 bytes
> > >> > > 60 consumers 1527806 bytes
> > >> > > 70 consumers 1769259 bytes
> > >> > > 80 consumers 2000118 bytes
> > >> > > 90 consumers 2244392 bytes
> > >> > > 100 consumers 2482415 bytes
> > >> > >
> > >> > > Assignment topic indexing without compression:
> > >> > > 1 consumer 

[jira] [Created] (KAFKA-3811) Introduce KStream metrics recording levels

2016-06-09 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3811:
-

 Summary: Introduce KStream metrics recording levels
 Key: KAFKA-3811
 URL: https://issues.apache.org/jira/browse/KAFKA-3811
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Greg Fodor
Assignee: Guozhang Wang


Follow-up from the discussions here:

https://github.com/apache/kafka/pull/1447
https://issues.apache.org/jira/browse/KAFKA-3769

The proposal is to introduce configuration to control the granularity/volumes 
of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
introduce non-trivial overhead and are possibly less useful once a job has been 
optimized. 

Proposal from guozhangwang:

level0 (stream thread global): per-record process / punctuate latency, commit 
latency, poll latency, etc

level1 (per processor node, and per state store): IO latency, per-record .. 
latency, forward throughput, etc.

And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-06-09 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3811:
--
Summary: Introduce Kafka Streams metrics recording levels  (was: Introduce 
KStream metrics recording levels)

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] scalability limits in the coordinator

2016-06-09 Thread Becket Qin
I think taking bigger one of the fetch size and message size limit is
probably good enough. If we have a separate
"offset.replica.fetch.max.bytes", I guess the value will always be set to
max message size of the __consumer_offsets topic, which does not seem to
have much value.

On Thu, Jun 9, 2016 at 3:15 AM, Onur Karaman 
wrote:

> Maybe another approach can be to add a new
> "offsets.replica.fetch.max.bytes" config on the brokers.
>
> On Thu, Jun 9, 2016 at 3:03 AM, Onur Karaman 
> wrote:
>
> > I made a PR with a tweak to Jun's/Becket's proposal:
> > https://github.com/apache/kafka/pull/1484
> >
> > It just tweaks the fetch behavior specifically for replicas fetching from
> > the __consumer_offsets topic when the fetcher's "replica.fetch.max.bytes"
> > is less than the __consumer_offset leader's "message.max.bytes" to take
> the
> > max of the two.
> >
> > I'm honestly not that happy with this solution, as I'd rather not change
> > the "replica.fetch.max.bytes" config from being a limit to a
> > recommendation. I'd definitely be happy to hear other alternatives!
> >
> > On Sun, May 29, 2016 at 1:57 PM, Onur Karaman <
> > onurkaraman.apa...@gmail.com> wrote:
> >
> >> Sorry I know next to nothing about Kafka Connect. I didn't understand
> the
> >> Kafka Connect / MM idea you brought up. Can you go into more detail?
> >>
> >> Otherwise I think our remaining options are:
> >> - Jun's suggestion to bump up the KafkaConfig.messageMaxBytes for
> >> __consumer_offsets topic and change the fetch behavior when message size
> >> is
> >> larger than fetch size
> >> - option 6: support sending the regex over the wire instead of the fully
> >> expanded topic subscriptions. This should cut down the message size from
> >> the subscription side. Again this only helps when pattern-based
> >> subscriptions are done.
> >>
> >> minor correction to an earlier comment I made regarding the message
> size:
> >> message size ~ sum(s_i + a_i for i in range [1, |C|])
> >>
> >> On Thu, May 26, 2016 at 3:35 PM, Jason Gustafson 
> >> wrote:
> >>
> >> > Hey Onur,
> >> >
> >> > Thanks for the investigation. It seems the conclusion is that the
> >> compact
> >> > format helps, but perhaps not enough to justify adding a new
> assignment
> >> > schema? I'm not sure there's much more room for savings unless we
> change
> >> > something more fundamental in the assignment approach. We spent some
> >> time
> >> > thinking before about whether we could let the consumers compute their
> >> > assignment locally from a smaller set of information, but the
> difficulty
> >> > (I'm sure you remember) is reaching consensus on topic metadata. Kafka
> >> > Connect has a similar problem where all the workers need to agree on
> >> > connector configurations. Since all configs are stored in a single
> topic
> >> > partition, the approach we take there is to propagate the offset in
> the
> >> > assignment protocol. Not sure if we could do something similar for
> MM...
> >> > Anyway, it seems like the best workaround at the moment is Jun's
> initial
> >> > suggestion. What do you think?
> >> >
> >> > -Jason
> >> >
> >> > On Wed, May 25, 2016 at 10:47 PM, Onur Karaman <
> >> > onurkaraman.apa...@gmail.com
> >> > > wrote:
> >> >
> >> > > I gave the topic index assignment trick a try against the same
> >> > environment.
> >> > > The implementation just changed the assignment serialization and
> >> > > deserialization logic. It didn't change SyncGroupResponse, meaning
> it
> >> > > continues to exclude the subscription from the SyncGroupResponse and
> >> > > assumes the member has kept track of its last subscription.
> >> > >
> >> > > Assignment topic indexing with compression:
> >> > > 1 consumer 34346 bytes
> >> > > 5 consumers 177687 bytes
> >> > > 10 consumers 331897 bytes
> >> > > 20 consumers 572467 bytes
> >> > > 30 consumers 811269 bytes
> >> > > 40 consumers 1047188 bytes * the tipping point
> >> > > 50 consumers 1290092 bytes
> >> > > 60 consumers 1527806 bytes
> >> > > 70 consumers 1769259 bytes
> >> > > 80 consumers 2000118 bytes
> >> > > 90 consumers 2244392 bytes
> >> > > 100 consumers 2482415 bytes
> >> > >
> >> > > Assignment topic indexing without compression:
> >> > > 1 consumer 211904 bytes
> >> > > 5 consumers 677184 bytes
> >> > > 10 consumers 1211154 bytes * the tipping point
> >> > > 20 consumers 2136196 bytes
> >> > > 30 consumers 3061238 bytes
> >> > > 40 consumers 3986280 bytes
> >> > > 50 consumers 4911322 bytes
> >> > > 60 consumers 5836284 bytes
> >> > > 70 consumers 6761246 bytes
> >> > > 80 consumers 7686208 bytes
> >> > > 90 consumers 8611170 bytes
> >> > > 100 consumers 9536132 bytes
> >> > >
> >> > > Assignment topic indexing seems to reduce the size by 500KB without
> >> > > compression and 80KB with compression. So assignment topic indexing
> >> makes
> >> > > some difference in both with and without compression but in our case
> >> was
> >> > > not 

Re: [DISCUSS] KIP-48 Support for delegation tokens as an authentication mechanism

2016-06-09 Thread Harsha
Jun & Ismael,
 Unfortunately I couldn't attend the KIP meeting
 when delegation tokens discussed. Appreciate if
 you can update the thread if you have any
 further questions.
Thanks,
Harsha

On Tue, May 24, 2016, at 11:32 AM, Liquan Pei wrote:
> It seems that the links to images in the KIP are broken.
> 
> Liquan
> 
> On Tue, May 24, 2016 at 9:33 AM, parth brahmbhatt <
> brahmbhatt.pa...@gmail.com> wrote:
> 
> > 110. What does getDelegationTokenAs mean?
> > In the current proposal we only allow a user to get delegation token for
> > the identity that it authenticated as using another mechanism, i.e. A user
> > that authenticate using a keytab for principal us...@example.com will get
> > delegation tokens for that user only. In future I think we will have to
> > extend support such that we allow some set of users (
> > kafka-rest-u...@example.com, storm-nim...@example.com) to acquire
> > delegation tokens on behalf of other users whose identity they have
> > verified independently.  Kafka brokers will have ACLs to control which
> > users are allowed to impersonate other users and get tokens on behalf of
> > them. Overall Impersonation is a whole different problem in my opinion and
> > I think we can tackle it in separate KIP.
> >
> > 111. What's the typical rate of getting and renewing delegation tokens?
> > Typically this should be very very low, 1 request per minute is a
> > relatively high estimate. However it depends on the token expiration. I am
> > less worried about the extra load it puts on controller vs the added
> > complexity and the value it offers.
> >
> > Thanks
> > Parth
> >
> >
> >
> > On Tue, May 24, 2016 at 7:30 AM, Ismael Juma  wrote:
> >
> > > Thanks Rajini. It would probably require a separate KIP as it will
> > > introduce user visible changes. We could also update KIP-48 to have this
> > > information, but it seems cleaner to do it separately. We can discuss
> > that
> > > in the KIP call today.
> > >
> > > Ismael
> > >
> > > On Tue, May 24, 2016 at 3:19 PM, Rajini Sivaram <
> > > rajinisiva...@googlemail.com> wrote:
> > >
> > > > Ismael,
> > > >
> > > > I have created a JIRA (
> > https://issues.apache.org/jira/browse/KAFKA-3751)
> > > > for adding SCRAM as a SASL mechanism. Would that need another KIP? If
> > > > KIP-48 will use this mechanism, can this just be a JIRA that gets
> > > reviewed
> > > > when the PR is ready?
> > > >
> > > > Thank you,
> > > >
> > > > Rajini
> > > >
> > > > On Tue, May 24, 2016 at 2:46 PM, Ismael Juma 
> > wrote:
> > > >
> > > > > Thanks Rajini, SCRAM seems like a good candidate.
> > > > >
> > > > > Gwen had independently mentioned this as a SASL mechanism that might
> > be
> > > > > useful for Kafka and I have been meaning to check it in more detail.
> > > Good
> > > > > to know that you are willing to contribute an implementation. Maybe
> > we
> > > > > should file a separate JIRA for this?
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Tue, May 24, 2016 at 2:12 PM, Rajini Sivaram <
> > > > > rajinisiva...@googlemail.com> wrote:
> > > > >
> > > > > > SCRAM (Salted Challenge Response Authentication Mechanism) is a
> > > better
> > > > > > mechanism than Digest-MD5. Java doesn't come with a built-in SCRAM
> > > > > > SaslServer or SaslClient, but I will be happy to add support in
> > Kafka
> > > > > since
> > > > > > it would be a useful mechanism to support anyway.
> > > > > > https://tools.ietf.org/html/rfc7677 describes the protocol for
> > > > > > SCRAM-SHA-256.
> > > > > >
> > > > > > On Tue, May 24, 2016 at 2:37 AM, Jun Rao  wrote:
> > > > > >
> > > > > > > Parth,
> > > > > > >
> > > > > > > Thanks for the explanation. A couple of more questions.
> > > > > > >
> > > > > > > 110. What does getDelegationTokenAs mean?
> > > > > > >
> > > > > > > 111. What's the typical rate of getting and renewing delegation
> > > > tokens?
> > > > > > > That may have an impact on whether they should be directed to the
> > > > > > > controller.
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Mon, May 23, 2016 at 1:19 PM, parth brahmbhatt <
> > > > > > > brahmbhatt.pa...@gmail.com> wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > Thanks for reviewing.
> > > > > > > >
> > > > > > > > * We could add a Cluster action to add acls on who can request
> > > > > > delegation
> > > > > > > > tokens. I don't see the use case for that yet but down the line
> > > > when
> > > > > we
> > > > > > > > start supporting getDelegationTokenAs it will be necessary.
> > > > > > > > * Yes we recommend tokens to be only used/distributed over
> > secure
> > > > > > > channels.
> > > > > > > > * Depending on what design we end up choosing Invalidation will
> > > be
> > > > > > > > responsibility of every broker or controller.
> > > > > > > > * I am not sure if I documented somewhere that 

[jira] [Commented] (KAFKA-3805) Running multiple instances of a Streams app on the same machine results in Java_org_rocksdb_RocksDB_write error

2016-06-09 Thread Andres Gomez Ferrer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322738#comment-15322738
 ] 

Andres Gomez Ferrer commented on KAFKA-3805:


Hey [~enothereska] the PR works fine! Thanks :) 

In addition: The patch doesn't apply directly on v0.10.0.0, you need to use the 
patch on trunk branch if you want that its works! 

> Running multiple instances of a Streams app on the same machine results in 
> Java_org_rocksdb_RocksDB_write error
> ---
>
> Key: KAFKA-3805
> URL: https://issues.apache.org/jira/browse/KAFKA-3805
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: * Ubuntu Server 16.04 LTS
> - Java: OpenJDK Runtime Environment (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
> * OSX 10.11.5
>- Java: Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
>Reporter: Andres Gomez Ferrer
>Assignee: Eno Thereska
>Priority: Critical
> Attachments: hs_err_pid23047.log, kafka-streams-testing.zip
>
>
> I reproduce the error working with simple two instances of  my kafka streams 
> application reading from one topic writing on other and using a RocksDB store.
> I reproduce it starting one instance, sending some messages on the input 
> topic and later starting another Kafka streams instance. Then, the first 
> instance died and drop a core dump.
> --
>  A fatal error has been detected by the Java Runtime Environment:
>   SIGSEGV (0xb) at pc=0x7f9f488fd8e8, pid=23047, tid=140322171361024
>  JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
>  Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
>  Problematic frame:
>  C  [librocksdbjni712288404493406713..so+0x15b8e8]  
> Java_org_rocksdb_RocksDB_write0+0x48
>  Core dump written. Default location: /root/core or core.23047
>  An error report file with more information is saved as:
>  /root/hs_err_pid23047.log
>  If you would like to submit a bug report, please visit:
>http://bugreport.java.com/bugreport/crash.jsp
>  The crash happened outside the Java Virtual Machine in native code.
>  See problematic frame for where to report the bug.
> --
> The core dump is there: 
> https://s3-eu-west-1.amazonaws.com/arodriguezg-test/core
> Note: The core dump is collected at Ubuntu Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3753) Metrics for StateStores

2016-06-09 Thread Jeff Klukas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322683#comment-15322683
 ] 

Jeff Klukas commented on KAFKA-3753:


Since there's refactoring going on right now with the metrics interface for 
streams (https://issues.apache.org/jira/browse/KAFKA-3715), I think we should 
delay actually adding size metrics to a different issue.

The PR I attached here adds the size() method so that it can be used for a 
metric in the future.

> Metrics for StateStores
> ---
>
> Key: KAFKA-3753
> URL: https://issues.apache.org/jira/browse/KAFKA-3753
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: api
> Fix For: 0.10.1.0
>
>
> As a developer building a Kafka Streams application, I'd like to have 
> visibility into what's happening with my state stores. How can I know if a 
> particular store is growing large? How can I know if a particular store is 
> frequently needing to hit disk?
> I'm interested to know if there are existing mechanisms for extracting this 
> information or if other people have thoughts on how we might approach this.
> I can't think of a way to provide metrics generically, so each state store 
> implementation would likely need to handle this separately. Given that the 
> default RocksDBStore will likely be the most-used, it would be a first target 
> for adding metrics.
> I'd be interested in knowing the total number of entries in the store, the 
> total size on disk and in memory, rates of gets and puts, and hit/miss ratio 
> for the MemoryLRUCache. Some of these numbers are likely calculable through 
> the RocksDB API, others may simply not be accessible.
> Would there be value to the wider community in having state stores register 
> metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1487: kafka-run-class.sh now runs under Cygwin.

2016-06-09 Thread deragon
GitHub user deragon opened a pull request:

https://github.com/apache/kafka/pull/1487

kafka-run-class.sh now runs under Cygwin.

kafka-run-class.sh now runs under Cygwin; paths and classpath are set up 
properly.

**WARNING:**  The script was not tested on a Linux machine, only under 
Cygwin.  Prior to merge it into trunk, if accepted, please run a quick test to 
ensure nothing broke.  From my own code review, there should not be any 
problem, but we can never be too sure.

I do not have the environment to test it under Linux at this moment.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/deragon/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1487


commit 7d1ad2f57a855126e7ce0e520a8bb327f030abe6
Author: deragon <32nx9812masakjds>
Date:   2016-06-09T15:04:55Z

kafka-run-class.sh now runs under Cygwin; paths and classpath are setup 
properly.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3753) Metrics for StateStores

2016-06-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322678#comment-15322678
 ] 

ASF GitHub Bot commented on KAFKA-3753:
---

GitHub user jklukas opened a pull request:

https://github.com/apache/kafka/pull/1486

KAFKA-3753: Add size() method to KeyValueStore interface

See https://issues.apache.org/jira/browse/KAFKA-3753

This contribution is my original work and I license the work to the project 
under the project's open source license.

cc @guozhangwang @kichristensen @ijuma 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jklukas/kafka kvstore-size

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1486.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1486


commit c4ea7cd0ed4d822e65cee3fd283f81c29ce11d57
Author: Jeff Klukas 
Date:   2016-06-09T15:08:33Z

Add size() method to KeyValueStore interface




> Metrics for StateStores
> ---
>
> Key: KAFKA-3753
> URL: https://issues.apache.org/jira/browse/KAFKA-3753
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: api
> Fix For: 0.10.1.0
>
>
> As a developer building a Kafka Streams application, I'd like to have 
> visibility into what's happening with my state stores. How can I know if a 
> particular store is growing large? How can I know if a particular store is 
> frequently needing to hit disk?
> I'm interested to know if there are existing mechanisms for extracting this 
> information or if other people have thoughts on how we might approach this.
> I can't think of a way to provide metrics generically, so each state store 
> implementation would likely need to handle this separately. Given that the 
> default RocksDBStore will likely be the most-used, it would be a first target 
> for adding metrics.
> I'd be interested in knowing the total number of entries in the store, the 
> total size on disk and in memory, rates of gets and puts, and hit/miss ratio 
> for the MemoryLRUCache. Some of these numbers are likely calculable through 
> the RocksDB API, others may simply not be accessible.
> Would there be value to the wider community in having state stores register 
> metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1486: KAFKA-3753: Add size() method to KeyValueStore int...

2016-06-09 Thread jklukas
GitHub user jklukas opened a pull request:

https://github.com/apache/kafka/pull/1486

KAFKA-3753: Add size() method to KeyValueStore interface

See https://issues.apache.org/jira/browse/KAFKA-3753

This contribution is my original work and I license the work to the project 
under the project's open source license.

cc @guozhangwang @kichristensen @ijuma 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jklukas/kafka kvstore-size

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1486.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1486


commit c4ea7cd0ed4d822e65cee3fd283f81c29ce11d57
Author: Jeff Klukas 
Date:   2016-06-09T15:08:33Z

Add size() method to KeyValueStore interface




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3805) Running multiple instances of a Streams app on the same machine results in Java_org_rocksdb_RocksDB_write error

2016-06-09 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322641#comment-15322641
 ] 

Eno Thereska commented on KAFKA-3805:
-

Thanks [~agomez]. If you get a chance, could you please try the PR? It should 
fix both cases when using the same state dir and different state dirs. 

> Running multiple instances of a Streams app on the same machine results in 
> Java_org_rocksdb_RocksDB_write error
> ---
>
> Key: KAFKA-3805
> URL: https://issues.apache.org/jira/browse/KAFKA-3805
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: * Ubuntu Server 16.04 LTS
> - Java: OpenJDK Runtime Environment (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
> * OSX 10.11.5
>- Java: Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
>Reporter: Andres Gomez Ferrer
>Assignee: Eno Thereska
>Priority: Critical
> Attachments: hs_err_pid23047.log, kafka-streams-testing.zip
>
>
> I reproduce the error working with simple two instances of  my kafka streams 
> application reading from one topic writing on other and using a RocksDB store.
> I reproduce it starting one instance, sending some messages on the input 
> topic and later starting another Kafka streams instance. Then, the first 
> instance died and drop a core dump.
> --
>  A fatal error has been detected by the Java Runtime Environment:
>   SIGSEGV (0xb) at pc=0x7f9f488fd8e8, pid=23047, tid=140322171361024
>  JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
>  Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
>  Problematic frame:
>  C  [librocksdbjni712288404493406713..so+0x15b8e8]  
> Java_org_rocksdb_RocksDB_write0+0x48
>  Core dump written. Default location: /root/core or core.23047
>  An error report file with more information is saved as:
>  /root/hs_err_pid23047.log
>  If you would like to submit a bug report, please visit:
>http://bugreport.java.com/bugreport/crash.jsp
>  The crash happened outside the Java Virtual Machine in native code.
>  See problematic frame for where to report the bug.
> --
> The core dump is there: 
> https://s3-eu-west-1.amazonaws.com/arodriguezg-test/core
> Note: The core dump is collected at Ubuntu Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3805) Running multiple instances of a Streams app on the same machine results in Java_org_rocksdb_RocksDB_write error

2016-06-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322637#comment-15322637
 ] 

ASF GitHub Bot commented on KAFKA-3805:
---

GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/1485

KAFKA-3805: Check if DB is null. Do not release locked resources



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka KAFKA-3805-locks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1485.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1485


commit bfcbcfcee954b5581e240cd89f48b4bc1c82daeb
Author: Eno Thereska 
Date:   2016-06-09T14:46:23Z

Check if DB is null. Do not release locked resources




> Running multiple instances of a Streams app on the same machine results in 
> Java_org_rocksdb_RocksDB_write error
> ---
>
> Key: KAFKA-3805
> URL: https://issues.apache.org/jira/browse/KAFKA-3805
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: * Ubuntu Server 16.04 LTS
> - Java: OpenJDK Runtime Environment (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
> * OSX 10.11.5
>- Java: Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
>Reporter: Andres Gomez Ferrer
>Assignee: Eno Thereska
>Priority: Critical
> Attachments: hs_err_pid23047.log, kafka-streams-testing.zip
>
>
> I reproduce the error working with simple two instances of  my kafka streams 
> application reading from one topic writing on other and using a RocksDB store.
> I reproduce it starting one instance, sending some messages on the input 
> topic and later starting another Kafka streams instance. Then, the first 
> instance died and drop a core dump.
> --
>  A fatal error has been detected by the Java Runtime Environment:
>   SIGSEGV (0xb) at pc=0x7f9f488fd8e8, pid=23047, tid=140322171361024
>  JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
>  Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
>  Problematic frame:
>  C  [librocksdbjni712288404493406713..so+0x15b8e8]  
> Java_org_rocksdb_RocksDB_write0+0x48
>  Core dump written. Default location: /root/core or core.23047
>  An error report file with more information is saved as:
>  /root/hs_err_pid23047.log
>  If you would like to submit a bug report, please visit:
>http://bugreport.java.com/bugreport/crash.jsp
>  The crash happened outside the Java Virtual Machine in native code.
>  See problematic frame for where to report the bug.
> --
> The core dump is there: 
> https://s3-eu-west-1.amazonaws.com/arodriguezg-test/core
> Note: The core dump is collected at Ubuntu Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1485: KAFKA-3805: Check if DB is null. Do not release lo...

2016-06-09 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/1485

KAFKA-3805: Check if DB is null. Do not release locked resources



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka KAFKA-3805-locks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1485.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1485


commit bfcbcfcee954b5581e240cd89f48b4bc1c82daeb
Author: Eno Thereska 
Date:   2016-06-09T14:46:23Z

Check if DB is null. Do not release locked resources




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (KAFKA-3805) Running multiple instances of a Streams app on the same machine results in Java_org_rocksdb_RocksDB_write error

2016-06-09 Thread Andres Gomez Ferrer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322302#comment-15322302
 ] 

Andres Gomez Ferrer edited comment on KAFKA-3805 at 6/9/16 11:48 AM:
-

[~enothereska] Yes sure, I attached the Kafka streaming example application 
that I'm using. It's so simple

App1: java -jar kafka-streams-testing-assembly-1.0.jar dir1

App2: java -jar kafka-streams-testing-assembly-1.0.jar dir2

I tried it again and the issue persists.


was (Author: agomez):
[~enothereska] Yes sure, I attached the Kafka streaming example application 
that I'm using. It's so simple

App1: java -jar kafka-streams-testing-assembly-1.0.jar dir1

App2: java -jar kafka-streams-testing-assembly-1.0.jar dir2

I try it again and the issue persists.

> Running multiple instances of a Streams app on the same machine results in 
> Java_org_rocksdb_RocksDB_write error
> ---
>
> Key: KAFKA-3805
> URL: https://issues.apache.org/jira/browse/KAFKA-3805
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: * Ubuntu Server 16.04 LTS
> - Java: OpenJDK Runtime Environment (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
> * OSX 10.11.5
>- Java: Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
>Reporter: Andres Gomez Ferrer
>Assignee: Eno Thereska
>Priority: Critical
> Attachments: hs_err_pid23047.log, kafka-streams-testing.zip
>
>
> I reproduce the error working with simple two instances of  my kafka streams 
> application reading from one topic writing on other and using a RocksDB store.
> I reproduce it starting one instance, sending some messages on the input 
> topic and later starting another Kafka streams instance. Then, the first 
> instance died and drop a core dump.
> --
>  A fatal error has been detected by the Java Runtime Environment:
>   SIGSEGV (0xb) at pc=0x7f9f488fd8e8, pid=23047, tid=140322171361024
>  JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
>  Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
>  Problematic frame:
>  C  [librocksdbjni712288404493406713..so+0x15b8e8]  
> Java_org_rocksdb_RocksDB_write0+0x48
>  Core dump written. Default location: /root/core or core.23047
>  An error report file with more information is saved as:
>  /root/hs_err_pid23047.log
>  If you would like to submit a bug report, please visit:
>http://bugreport.java.com/bugreport/crash.jsp
>  The crash happened outside the Java Virtual Machine in native code.
>  See problematic frame for where to report the bug.
> --
> The core dump is there: 
> https://s3-eu-west-1.amazonaws.com/arodriguezg-test/core
> Note: The core dump is collected at Ubuntu Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3805) Running multiple instances of a Streams app on the same machine results in Java_org_rocksdb_RocksDB_write error

2016-06-09 Thread Andres Gomez Ferrer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322302#comment-15322302
 ] 

Andres Gomez Ferrer edited comment on KAFKA-3805 at 6/9/16 10:18 AM:
-

[~enothereska] Yes sure, I attached the Kafka streaming example application 
that I'm using. It's so simple

App1: java -jar kafka-streams-testing-assembly-1.0.jar dir1

App2: java -jar kafka-streams-testing-assembly-1.0.jar dir2

I try it again and the issue persists.


was (Author: agomez):
[~enothereska] Yes sure, I attached the Kafka streaming example application 
that I'm using. It's so simpl

App1: java -jar kafka-streams-testing-assembly-1.0.jar dir1

App2: java -jar kafka-streams-testing-assembly-1.0.jar dir2

I try it again and the issue persists.

> Running multiple instances of a Streams app on the same machine results in 
> Java_org_rocksdb_RocksDB_write error
> ---
>
> Key: KAFKA-3805
> URL: https://issues.apache.org/jira/browse/KAFKA-3805
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: * Ubuntu Server 16.04 LTS
> - Java: OpenJDK Runtime Environment (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
> * OSX 10.11.5
>- Java: Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
>Reporter: Andres Gomez Ferrer
>Assignee: Eno Thereska
>Priority: Critical
> Attachments: hs_err_pid23047.log, kafka-streams-testing.zip
>
>
> I reproduce the error working with simple two instances of  my kafka streams 
> application reading from one topic writing on other and using a RocksDB store.
> I reproduce it starting one instance, sending some messages on the input 
> topic and later starting another Kafka streams instance. Then, the first 
> instance died and drop a core dump.
> --
>  A fatal error has been detected by the Java Runtime Environment:
>   SIGSEGV (0xb) at pc=0x7f9f488fd8e8, pid=23047, tid=140322171361024
>  JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
>  Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
>  Problematic frame:
>  C  [librocksdbjni712288404493406713..so+0x15b8e8]  
> Java_org_rocksdb_RocksDB_write0+0x48
>  Core dump written. Default location: /root/core or core.23047
>  An error report file with more information is saved as:
>  /root/hs_err_pid23047.log
>  If you would like to submit a bug report, please visit:
>http://bugreport.java.com/bugreport/crash.jsp
>  The crash happened outside the Java Virtual Machine in native code.
>  See problematic frame for where to report the bug.
> --
> The core dump is there: 
> https://s3-eu-west-1.amazonaws.com/arodriguezg-test/core
> Note: The core dump is collected at Ubuntu Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3805) Running multiple instances of a Streams app on the same machine results in Java_org_rocksdb_RocksDB_write error

2016-06-09 Thread Andres Gomez Ferrer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322302#comment-15322302
 ] 

Andres Gomez Ferrer commented on KAFKA-3805:


[~enothereska] Yes sure, I attached the Kafka streaming example application 
that I'm using. It's so simpl

App1: java -jar kafka-streams-testing-assembly-1.0.jar dir1

App2: java -jar kafka-streams-testing-assembly-1.0.jar dir2

I try it again and the issue persists.

> Running multiple instances of a Streams app on the same machine results in 
> Java_org_rocksdb_RocksDB_write error
> ---
>
> Key: KAFKA-3805
> URL: https://issues.apache.org/jira/browse/KAFKA-3805
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: * Ubuntu Server 16.04 LTS
> - Java: OpenJDK Runtime Environment (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
> * OSX 10.11.5
>- Java: Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
>Reporter: Andres Gomez Ferrer
>Assignee: Eno Thereska
>Priority: Critical
> Attachments: hs_err_pid23047.log, kafka-streams-testing.zip
>
>
> I reproduce the error working with simple two instances of  my kafka streams 
> application reading from one topic writing on other and using a RocksDB store.
> I reproduce it starting one instance, sending some messages on the input 
> topic and later starting another Kafka streams instance. Then, the first 
> instance died and drop a core dump.
> --
>  A fatal error has been detected by the Java Runtime Environment:
>   SIGSEGV (0xb) at pc=0x7f9f488fd8e8, pid=23047, tid=140322171361024
>  JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
>  Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
>  Problematic frame:
>  C  [librocksdbjni712288404493406713..so+0x15b8e8]  
> Java_org_rocksdb_RocksDB_write0+0x48
>  Core dump written. Default location: /root/core or core.23047
>  An error report file with more information is saved as:
>  /root/hs_err_pid23047.log
>  If you would like to submit a bug report, please visit:
>http://bugreport.java.com/bugreport/crash.jsp
>  The crash happened outside the Java Virtual Machine in native code.
>  See problematic frame for where to report the bug.
> --
> The core dump is there: 
> https://s3-eu-west-1.amazonaws.com/arodriguezg-test/core
> Note: The core dump is collected at Ubuntu Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] scalability limits in the coordinator

2016-06-09 Thread Onur Karaman
Maybe another approach can be to add a new
"offsets.replica.fetch.max.bytes" config on the brokers.

On Thu, Jun 9, 2016 at 3:03 AM, Onur Karaman  wrote:

> I made a PR with a tweak to Jun's/Becket's proposal:
> https://github.com/apache/kafka/pull/1484
>
> It just tweaks the fetch behavior specifically for replicas fetching from
> the __consumer_offsets topic when the fetcher's "replica.fetch.max.bytes"
> is less than the __consumer_offset leader's "message.max.bytes" to take the
> max of the two.
>
> I'm honestly not that happy with this solution, as I'd rather not change
> the "replica.fetch.max.bytes" config from being a limit to a
> recommendation. I'd definitely be happy to hear other alternatives!
>
> On Sun, May 29, 2016 at 1:57 PM, Onur Karaman <
> onurkaraman.apa...@gmail.com> wrote:
>
>> Sorry I know next to nothing about Kafka Connect. I didn't understand the
>> Kafka Connect / MM idea you brought up. Can you go into more detail?
>>
>> Otherwise I think our remaining options are:
>> - Jun's suggestion to bump up the KafkaConfig.messageMaxBytes for
>> __consumer_offsets topic and change the fetch behavior when message size
>> is
>> larger than fetch size
>> - option 6: support sending the regex over the wire instead of the fully
>> expanded topic subscriptions. This should cut down the message size from
>> the subscription side. Again this only helps when pattern-based
>> subscriptions are done.
>>
>> minor correction to an earlier comment I made regarding the message size:
>> message size ~ sum(s_i + a_i for i in range [1, |C|])
>>
>> On Thu, May 26, 2016 at 3:35 PM, Jason Gustafson 
>> wrote:
>>
>> > Hey Onur,
>> >
>> > Thanks for the investigation. It seems the conclusion is that the
>> compact
>> > format helps, but perhaps not enough to justify adding a new assignment
>> > schema? I'm not sure there's much more room for savings unless we change
>> > something more fundamental in the assignment approach. We spent some
>> time
>> > thinking before about whether we could let the consumers compute their
>> > assignment locally from a smaller set of information, but the difficulty
>> > (I'm sure you remember) is reaching consensus on topic metadata. Kafka
>> > Connect has a similar problem where all the workers need to agree on
>> > connector configurations. Since all configs are stored in a single topic
>> > partition, the approach we take there is to propagate the offset in the
>> > assignment protocol. Not sure if we could do something similar for MM...
>> > Anyway, it seems like the best workaround at the moment is Jun's initial
>> > suggestion. What do you think?
>> >
>> > -Jason
>> >
>> > On Wed, May 25, 2016 at 10:47 PM, Onur Karaman <
>> > onurkaraman.apa...@gmail.com
>> > > wrote:
>> >
>> > > I gave the topic index assignment trick a try against the same
>> > environment.
>> > > The implementation just changed the assignment serialization and
>> > > deserialization logic. It didn't change SyncGroupResponse, meaning it
>> > > continues to exclude the subscription from the SyncGroupResponse and
>> > > assumes the member has kept track of its last subscription.
>> > >
>> > > Assignment topic indexing with compression:
>> > > 1 consumer 34346 bytes
>> > > 5 consumers 177687 bytes
>> > > 10 consumers 331897 bytes
>> > > 20 consumers 572467 bytes
>> > > 30 consumers 811269 bytes
>> > > 40 consumers 1047188 bytes * the tipping point
>> > > 50 consumers 1290092 bytes
>> > > 60 consumers 1527806 bytes
>> > > 70 consumers 1769259 bytes
>> > > 80 consumers 2000118 bytes
>> > > 90 consumers 2244392 bytes
>> > > 100 consumers 2482415 bytes
>> > >
>> > > Assignment topic indexing without compression:
>> > > 1 consumer 211904 bytes
>> > > 5 consumers 677184 bytes
>> > > 10 consumers 1211154 bytes * the tipping point
>> > > 20 consumers 2136196 bytes
>> > > 30 consumers 3061238 bytes
>> > > 40 consumers 3986280 bytes
>> > > 50 consumers 4911322 bytes
>> > > 60 consumers 5836284 bytes
>> > > 70 consumers 6761246 bytes
>> > > 80 consumers 7686208 bytes
>> > > 90 consumers 8611170 bytes
>> > > 100 consumers 9536132 bytes
>> > >
>> > > Assignment topic indexing seems to reduce the size by 500KB without
>> > > compression and 80KB with compression. So assignment topic indexing
>> makes
>> > > some difference in both with and without compression but in our case
>> was
>> > > not nearly enough.
>> > >
>> > > This can be explained by the fact that we aren't actually hitting the
>> > worst
>> > > case scenario of each consumer being assigned a partition from every
>> > topic.
>> > > The reason is simple: a topic can only fully span all the consumers
>> if it
>> > > has at least as many partitions as there are consumers. Given that
>> there
>> > > are 8 partitions per topic and we have 100 consumers, it makes sense
>> that
>> > > we aren't close to this worse case scenario where topic indexing would
>> > make
>> > > a bigger difference.
>> > >
>> > > I tweaked the 

[jira] [Updated] (KAFKA-3805) Running multiple instances of a Streams app on the same machine results in Java_org_rocksdb_RocksDB_write error

2016-06-09 Thread Andres Gomez Ferrer (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Gomez Ferrer updated KAFKA-3805:
---
Attachment: kafka-streams-testing.zip

> Running multiple instances of a Streams app on the same machine results in 
> Java_org_rocksdb_RocksDB_write error
> ---
>
> Key: KAFKA-3805
> URL: https://issues.apache.org/jira/browse/KAFKA-3805
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: * Ubuntu Server 16.04 LTS
> - Java: OpenJDK Runtime Environment (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
> * OSX 10.11.5
>- Java: Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
>Reporter: Andres Gomez Ferrer
>Assignee: Eno Thereska
>Priority: Critical
> Attachments: hs_err_pid23047.log, kafka-streams-testing.zip
>
>
> I reproduce the error working with simple two instances of  my kafka streams 
> application reading from one topic writing on other and using a RocksDB store.
> I reproduce it starting one instance, sending some messages on the input 
> topic and later starting another Kafka streams instance. Then, the first 
> instance died and drop a core dump.
> --
>  A fatal error has been detected by the Java Runtime Environment:
>   SIGSEGV (0xb) at pc=0x7f9f488fd8e8, pid=23047, tid=140322171361024
>  JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
>  Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
>  Problematic frame:
>  C  [librocksdbjni712288404493406713..so+0x15b8e8]  
> Java_org_rocksdb_RocksDB_write0+0x48
>  Core dump written. Default location: /root/core or core.23047
>  An error report file with more information is saved as:
>  /root/hs_err_pid23047.log
>  If you would like to submit a bug report, please visit:
>http://bugreport.java.com/bugreport/crash.jsp
>  The crash happened outside the Java Virtual Machine in native code.
>  See problematic frame for where to report the bug.
> --
> The core dump is there: 
> https://s3-eu-west-1.amazonaws.com/arodriguezg-test/core
> Note: The core dump is collected at Ubuntu Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] scalability limits in the coordinator

2016-06-09 Thread Onur Karaman
I made a PR with a tweak to Jun's/Becket's proposal:
https://github.com/apache/kafka/pull/1484

It just tweaks the fetch behavior specifically for replicas fetching from
the __consumer_offsets topic when the fetcher's "replica.fetch.max.bytes"
is less than the __consumer_offset leader's "message.max.bytes" to take the
max of the two.

I'm honestly not that happy with this solution, as I'd rather not change
the "replica.fetch.max.bytes" config from being a limit to a
recommendation. I'd definitely be happy to hear other alternatives!

On Sun, May 29, 2016 at 1:57 PM, Onur Karaman 
wrote:

> Sorry I know next to nothing about Kafka Connect. I didn't understand the
> Kafka Connect / MM idea you brought up. Can you go into more detail?
>
> Otherwise I think our remaining options are:
> - Jun's suggestion to bump up the KafkaConfig.messageMaxBytes for
> __consumer_offsets topic and change the fetch behavior when message size is
> larger than fetch size
> - option 6: support sending the regex over the wire instead of the fully
> expanded topic subscriptions. This should cut down the message size from
> the subscription side. Again this only helps when pattern-based
> subscriptions are done.
>
> minor correction to an earlier comment I made regarding the message size:
> message size ~ sum(s_i + a_i for i in range [1, |C|])
>
> On Thu, May 26, 2016 at 3:35 PM, Jason Gustafson 
> wrote:
>
> > Hey Onur,
> >
> > Thanks for the investigation. It seems the conclusion is that the compact
> > format helps, but perhaps not enough to justify adding a new assignment
> > schema? I'm not sure there's much more room for savings unless we change
> > something more fundamental in the assignment approach. We spent some time
> > thinking before about whether we could let the consumers compute their
> > assignment locally from a smaller set of information, but the difficulty
> > (I'm sure you remember) is reaching consensus on topic metadata. Kafka
> > Connect has a similar problem where all the workers need to agree on
> > connector configurations. Since all configs are stored in a single topic
> > partition, the approach we take there is to propagate the offset in the
> > assignment protocol. Not sure if we could do something similar for MM...
> > Anyway, it seems like the best workaround at the moment is Jun's initial
> > suggestion. What do you think?
> >
> > -Jason
> >
> > On Wed, May 25, 2016 at 10:47 PM, Onur Karaman <
> > onurkaraman.apa...@gmail.com
> > > wrote:
> >
> > > I gave the topic index assignment trick a try against the same
> > environment.
> > > The implementation just changed the assignment serialization and
> > > deserialization logic. It didn't change SyncGroupResponse, meaning it
> > > continues to exclude the subscription from the SyncGroupResponse and
> > > assumes the member has kept track of its last subscription.
> > >
> > > Assignment topic indexing with compression:
> > > 1 consumer 34346 bytes
> > > 5 consumers 177687 bytes
> > > 10 consumers 331897 bytes
> > > 20 consumers 572467 bytes
> > > 30 consumers 811269 bytes
> > > 40 consumers 1047188 bytes * the tipping point
> > > 50 consumers 1290092 bytes
> > > 60 consumers 1527806 bytes
> > > 70 consumers 1769259 bytes
> > > 80 consumers 2000118 bytes
> > > 90 consumers 2244392 bytes
> > > 100 consumers 2482415 bytes
> > >
> > > Assignment topic indexing without compression:
> > > 1 consumer 211904 bytes
> > > 5 consumers 677184 bytes
> > > 10 consumers 1211154 bytes * the tipping point
> > > 20 consumers 2136196 bytes
> > > 30 consumers 3061238 bytes
> > > 40 consumers 3986280 bytes
> > > 50 consumers 4911322 bytes
> > > 60 consumers 5836284 bytes
> > > 70 consumers 6761246 bytes
> > > 80 consumers 7686208 bytes
> > > 90 consumers 8611170 bytes
> > > 100 consumers 9536132 bytes
> > >
> > > Assignment topic indexing seems to reduce the size by 500KB without
> > > compression and 80KB with compression. So assignment topic indexing
> makes
> > > some difference in both with and without compression but in our case
> was
> > > not nearly enough.
> > >
> > > This can be explained by the fact that we aren't actually hitting the
> > worst
> > > case scenario of each consumer being assigned a partition from every
> > topic.
> > > The reason is simple: a topic can only fully span all the consumers if
> it
> > > has at least as many partitions as there are consumers. Given that
> there
> > > are 8 partitions per topic and we have 100 consumers, it makes sense
> that
> > > we aren't close to this worse case scenario where topic indexing would
> > make
> > > a bigger difference.
> > >
> > > I tweaked the group leader's assignment code to print out the
> assignments
> > > and found that each consumer was getting either 238 or 239 partitions.
> > Each
> > > of these partitions were from unique topics. So the consumers were
> really
> > > getting partitions from 239 topics instead of the full worst case
> > scenario

[jira] [Commented] (KAFKA-3810) replication of internal topics should not be limited by replica.fetch.max.bytes

2016-06-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322267#comment-15322267
 ] 

ASF GitHub Bot commented on KAFKA-3810:
---

GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/1484

KAFKA-3810: replication of internal topics should not be limited by 
replica.fetch.max.bytes

From the kafka-dev mailing list discussion: [DISCUSS] scalability limits in 
the coordinator

There's a scalability limit on the new consumer / coordinator regarding the 
amount of group metadata we can fit into one message. This restricts a 
combination of consumer group size, topic subscription sizes, topic assignment 
sizes, and any remaining member metadata.

Under more strenuous use cases like mirroring clusters with thousands of 
topics, this limitation can be reached even after applying gzip to the 
__consumer_offsets topic.

Various options were proposed in the discussion:
1. Config change: reduce the number of consumers in the group. This isn't 
always a realistic answer in more strenuous use cases like MirrorMaker clusters 
or for auditing.
2. Config change: split the group into smaller groups which together will 
get full coverage of the topics. This gives each group member a smaller 
subscription.(ex: g1 has topics starting with a-m while g2 has topics starting 
with n-z). This would be operationally painful to manage.
3. Config change: split the topics among members of the group. Again this 
gives each group member a smaller subscription. This would also be 
operationally painful to manage.
4. Config change: bump up KafkaConfig.messageMaxBytes (a topic-level 
config) and KafkaConfig.replicaFetchMaxBytes (a broker-level config). Applying 
messageMaxBytes to just the __consumer_offsets topic seems relatively harmless, 
but bumping up the broker-level replicaFetchMaxBytes would probably need more 
attention.
5. Config change: try different compression codecs. Based on 2 minutes of 
googling, it seems like lz4 and snappy are faster than gzip but have worse 
compression, so this probably won't help.
6. Implementation change: support sending the regex over the wire instead 
of the fully expanded topic subscriptions. I think people said in the past that 
different languages have subtle differences in regex, so this doesn't play 
nicely with cross-language groups.
7. Implementation change: maybe we can reverse the mapping? Instead of 
mapping from member to subscriptions, we can map a subscription to a list of 
members.
8. Implementation change: maybe we can try to break apart the subscription 
and assignments from the same SyncGroupRequest into multiple records? They can 
still go to the same message set and get appended together. This way the limit 
become the segment size, which shouldn't be a problem. This can be tricky to 
get right because we're currently keying these messages on the group, so I 
think records from the same rebalance might accidentally compact one another, 
but my understanding of compaction isn't that great.
9. Implementation change: try to apply some tricks on the assignment 
serialization to make it smaller.
10. Config and Implementation change: bump up the __consumer_offsets topic 
messageMaxBytes and (from Jun Rao) fix how we deal with the case when a message 
is larger than the fetch size. Today, if the fetch size is smaller than the 
fetch size, the consumer will get stuck. Instead, we can simply return the full 
message if it's larger than the fetch size w/o requiring the consumer to 
manually adjust the fetch size.
11. Config and Implementation change: same as above but only apply the 
special fetch logic when fetching from internal topics

This PR provides an implementation of option 11.

That being said, I'm not very happy with this approach as it essentially 
doesn't honor the "replica.fetch.max.bytes" config. Better alternatives are 
definitely welcome!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-3810

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1484


commit f8c1bb2d07df13b90ec338ddc4de08d58aab153d
Author: Onur Karaman 
Date:   2016-06-09T09:43:25Z

replication of internal topics should not be limited by 
replica.fetch.max.bytes




> replication of internal topics should not be limited by 
> replica.fetch.max.bytes
> ---
>
> Key: KAFKA-3810
> URL: https://issues.apache.org/jira/browse/KAFKA-3810
> 

[GitHub] kafka pull request #1484: KAFKA-3810: replication of internal topics should ...

2016-06-09 Thread onurkaraman
GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/1484

KAFKA-3810: replication of internal topics should not be limited by 
replica.fetch.max.bytes

From the kafka-dev mailing list discussion: [DISCUSS] scalability limits in 
the coordinator

There's a scalability limit on the new consumer / coordinator regarding the 
amount of group metadata we can fit into one message. This restricts a 
combination of consumer group size, topic subscription sizes, topic assignment 
sizes, and any remaining member metadata.

Under more strenuous use cases like mirroring clusters with thousands of 
topics, this limitation can be reached even after applying gzip to the 
__consumer_offsets topic.

Various options were proposed in the discussion:
1. Config change: reduce the number of consumers in the group. This isn't 
always a realistic answer in more strenuous use cases like MirrorMaker clusters 
or for auditing.
2. Config change: split the group into smaller groups which together will 
get full coverage of the topics. This gives each group member a smaller 
subscription.(ex: g1 has topics starting with a-m while g2 has topics starting 
with n-z). This would be operationally painful to manage.
3. Config change: split the topics among members of the group. Again this 
gives each group member a smaller subscription. This would also be 
operationally painful to manage.
4. Config change: bump up KafkaConfig.messageMaxBytes (a topic-level 
config) and KafkaConfig.replicaFetchMaxBytes (a broker-level config). Applying 
messageMaxBytes to just the __consumer_offsets topic seems relatively harmless, 
but bumping up the broker-level replicaFetchMaxBytes would probably need more 
attention.
5. Config change: try different compression codecs. Based on 2 minutes of 
googling, it seems like lz4 and snappy are faster than gzip but have worse 
compression, so this probably won't help.
6. Implementation change: support sending the regex over the wire instead 
of the fully expanded topic subscriptions. I think people said in the past that 
different languages have subtle differences in regex, so this doesn't play 
nicely with cross-language groups.
7. Implementation change: maybe we can reverse the mapping? Instead of 
mapping from member to subscriptions, we can map a subscription to a list of 
members.
8. Implementation change: maybe we can try to break apart the subscription 
and assignments from the same SyncGroupRequest into multiple records? They can 
still go to the same message set and get appended together. This way the limit 
become the segment size, which shouldn't be a problem. This can be tricky to 
get right because we're currently keying these messages on the group, so I 
think records from the same rebalance might accidentally compact one another, 
but my understanding of compaction isn't that great.
9. Implementation change: try to apply some tricks on the assignment 
serialization to make it smaller.
10. Config and Implementation change: bump up the __consumer_offsets topic 
messageMaxBytes and (from Jun Rao) fix how we deal with the case when a message 
is larger than the fetch size. Today, if the fetch size is smaller than the 
fetch size, the consumer will get stuck. Instead, we can simply return the full 
message if it's larger than the fetch size w/o requiring the consumer to 
manually adjust the fetch size.
11. Config and Implementation change: same as above but only apply the 
special fetch logic when fetching from internal topics

This PR provides an implementation of option 11.

That being said, I'm not very happy with this approach as it essentially 
doesn't honor the "replica.fetch.max.bytes" config. Better alternatives are 
definitely welcome!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-3810

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1484


commit f8c1bb2d07df13b90ec338ddc4de08d58aab153d
Author: Onur Karaman 
Date:   2016-06-09T09:43:25Z

replication of internal topics should not be limited by 
replica.fetch.max.bytes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3805) Running multiple instances of a Streams app on the same machine results in Java_org_rocksdb_RocksDB_write error

2016-06-09 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322261#comment-15322261
 ] 

Eno Thereska commented on KAFKA-3805:
-

[~agomez] I cannot reproduce problem when using two different data.dirs:
{{App1: streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
"/tmp/kafka-streams");}}
{{App2: streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
"/tmp/kafka-streams2");}}

Could you share your config if this is still an issue? Thanks.

> Running multiple instances of a Streams app on the same machine results in 
> Java_org_rocksdb_RocksDB_write error
> ---
>
> Key: KAFKA-3805
> URL: https://issues.apache.org/jira/browse/KAFKA-3805
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: * Ubuntu Server 16.04 LTS
> - Java: OpenJDK Runtime Environment (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
> * OSX 10.11.5
>- Java: Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
>Reporter: Andres Gomez Ferrer
>Assignee: Eno Thereska
>Priority: Critical
> Attachments: hs_err_pid23047.log
>
>
> I reproduce the error working with simple two instances of  my kafka streams 
> application reading from one topic writing on other and using a RocksDB store.
> I reproduce it starting one instance, sending some messages on the input 
> topic and later starting another Kafka streams instance. Then, the first 
> instance died and drop a core dump.
> --
>  A fatal error has been detected by the Java Runtime Environment:
>   SIGSEGV (0xb) at pc=0x7f9f488fd8e8, pid=23047, tid=140322171361024
>  JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 
> 1.8.0_91-8u91-b14-0ubuntu4~16.04.1-b14)
>  Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
>  Problematic frame:
>  C  [librocksdbjni712288404493406713..so+0x15b8e8]  
> Java_org_rocksdb_RocksDB_write0+0x48
>  Core dump written. Default location: /root/core or core.23047
>  An error report file with more information is saved as:
>  /root/hs_err_pid23047.log
>  If you would like to submit a bug report, please visit:
>http://bugreport.java.com/bugreport/crash.jsp
>  The crash happened outside the Java Virtual Machine in native code.
>  See problematic frame for where to report the bug.
> --
> The core dump is there: 
> https://s3-eu-west-1.amazonaws.com/arodriguezg-test/core
> Note: The core dump is collected at Ubuntu Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-09 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322254#comment-15322254
 ] 

Matthias J. Sax commented on KAFKA-3775:


I want to push this discussion further. As [~guozhang] suggested, it might be 
better to hand in a custom {{PartitionGrouper}} instead of patching 
{{StreamPartitionAssignor}}. Would this work for you use-case [~kawamuray]? If 
yes, we could close this as "not a problem" a keep the list of configurable 
parameters short.

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3810) replication of internal topics should not be limited by replica.fetch.max.bytes

2016-06-09 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-3810:
---

 Summary: replication of internal topics should not be limited by 
replica.fetch.max.bytes
 Key: KAFKA-3810
 URL: https://issues.apache.org/jira/browse/KAFKA-3810
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman


>From the kafka-dev mailing list discussion:
[\[DISCUSS\] scalability limits in the 
coordinator|http://mail-archives.apache.org/mod_mbox/kafka-dev/201605.mbox/%3ccamquqbzddtadhcgl6h4smtgo83uqt4s72gc03b3vfghnme3...@mail.gmail.com%3E]

There's a scalability limit on the new consumer / coordinator regarding the 
amount of group metadata we can fit into one message. This restricts a 
combination of consumer group size, topic subscription sizes, topic assignment 
sizes, and any remaining member metadata.

Under more strenuous use cases like mirroring clusters with thousands of 
topics, this limitation can be reached even after applying gzip to the 
__consumer_offsets topic.

Various options were proposed in the discussion:
# Config change: reduce the number of consumers in the group. This isn't always 
a realistic answer in more strenuous use cases like MirrorMaker clusters or for 
auditing.
# Config change: split the group into smaller groups which together will get 
full coverage of the topics. This gives each group member a smaller 
subscription.(ex: g1 has topics starting with a-m while g2 has topics starting 
with n-z). This would be operationally painful to manage.
# Config change: split the topics among members of the group. Again this gives 
each group member a smaller subscription. This would also be operationally 
painful to manage.
# Config change: bump up KafkaConfig.messageMaxBytes (a topic-level config) and 
KafkaConfig.replicaFetchMaxBytes (a broker-level config). Applying 
messageMaxBytes to just the __consumer_offsets topic seems relatively harmless, 
but bumping up the broker-level replicaFetchMaxBytes would probably need more 
attention.
# Config change: try different compression codecs. Based on 2 minutes of 
googling, it seems like lz4 and snappy are faster than gzip but have worse 
compression, so this probably won't help.
# Implementation change: support sending the regex over the wire instead of the 
fully expanded topic subscriptions. I think people said in the past that 
different languages have subtle differences in regex, so this doesn't play 
nicely with cross-language groups.
# Implementation change: maybe we can reverse the mapping? Instead of mapping 
from member to subscriptions, we can map a subscription to a list of members.
# Implementation change: maybe we can try to break apart the subscription and 
assignments from the same SyncGroupRequest into multiple records? They can 
still go to the same message set and get appended together. This way the limit 
become the segment size, which shouldn't be a problem. This can be tricky to 
get right because we're currently keying these messages on the group, so I 
think records from the same rebalance might accidentally compact one another, 
but my understanding of compaction isn't that great.
# Implementation change: try to apply some tricks on the assignment 
serialization to make it smaller.
# Config and Implementation change: bump up the __consumer_offsets topic 
messageMaxBytes and (from [~junrao]) fix how we deal with the case when a 
message is larger than the fetch size. Today, if the fetch size is smaller than 
the fetch size, the consumer will get stuck. Instead, we can simply return the 
full message if it's larger than the fetch size w/o requiring the consumer to 
manually adjust the fetch size.
# Config and Implementation change: same as above but only apply the special 
fetch logic when fetching from internal topics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3799) Turn on endpoint validation in SSL system tests

2016-06-09 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3799:
--
Affects Version/s: 0.10.0.0
   Status: Patch Available  (was: Open)

> Turn on endpoint validation in SSL system tests
> ---
>
> Key: KAFKA-3799
> URL: https://issues.apache.org/jira/browse/KAFKA-3799
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>
> Endpoint validation is off by default and currently system tests are run 
> without endpoint validation. It will be better to run system tests with 
> endpoint validation turned on. KAFKA-3665 will be enabling validation by 
> default as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3562) Null Pointer Exception Found when delete topic and Using New Producer

2016-06-09 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3562:
--
Status: Patch Available  (was: Open)

> Null Pointer Exception Found when delete topic and Using New Producer
> -
>
> Key: KAFKA-3562
> URL: https://issues.apache.org/jira/browse/KAFKA-3562
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.9.0.0
>Reporter: Pengwei
>Assignee: Rajini Sivaram
>Priority: Minor
> Fix For: 0.10.0.1
>
>
> Exception in thread “Thread-2” java.lang.NullPointerException
> at 
> org.apache.kafka.clients.producer.internals.DefaultPartitioner.partition(DefaultPartitioner.java:70)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.partition(KafkaProducer.java:687)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:432)
> at 
> com.huawei.kafka.internal.remove.ProducerMsgThread.run(ProducerMsgThread.java:36)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3788) Potential message lost when switching to new segment

2016-06-09 Thread Arkadiusz Firus (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arkadiusz Firus resolved KAFKA-3788.

Resolution: Not A Problem

> Potential message lost when switching to new segment
> 
>
> Key: KAFKA-3788
> URL: https://issues.apache.org/jira/browse/KAFKA-3788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: Arkadiusz Firus
>Assignee: Jay Kreps
>Priority: Minor
>  Labels: easyfix
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> If a new segment is needed method roll() from class kafka.log.Log is invoked. 
> It prepares new segment and schedules _asynchronous_ flush of the previous 
> segment.
> Asynchronous call can lead to a problematic situation. As far as I know 
> neither Linux nor Windows guarantees that the order of files persisted to 
> disk will be the same as the order of writes to files. This means that 
> records from the new segment can be flushed before the old ones which in case 
> of power outage can lead to gaps between records.
> Changing asynchronous invocation to synchronous one will solve the problem 
> because we have guarantee that all records from the previous segment will be 
> persisted to hard drive before we write any record to the new segment.
> I am guessing that asynchronous invocation was chosen to increase performance 
> but switching between segments is not so often. So it is not a big gain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3788) Potential message lost when switching to new segment

2016-06-09 Thread Arkadiusz Firus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322105#comment-15322105
 ] 

Arkadiusz Firus commented on KAFKA-3788:


Thank you very much for the explanation. I should have checked how the log is 
read during start-up.

> Potential message lost when switching to new segment
> 
>
> Key: KAFKA-3788
> URL: https://issues.apache.org/jira/browse/KAFKA-3788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: Arkadiusz Firus
>Assignee: Jay Kreps
>Priority: Minor
>  Labels: easyfix
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> If a new segment is needed method roll() from class kafka.log.Log is invoked. 
> It prepares new segment and schedules _asynchronous_ flush of the previous 
> segment.
> Asynchronous call can lead to a problematic situation. As far as I know 
> neither Linux nor Windows guarantees that the order of files persisted to 
> disk will be the same as the order of writes to files. This means that 
> records from the new segment can be flushed before the old ones which in case 
> of power outage can lead to gaps between records.
> Changing asynchronous invocation to synchronous one will solve the problem 
> because we have guarantee that all records from the previous segment will be 
> persisted to hard drive before we write any record to the new segment.
> I am guessing that asynchronous invocation was chosen to increase performance 
> but switching between segments is not so often. So it is not a big gain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes

2016-06-09 Thread Michal Turek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322097#comment-15322097
 ] 

Michal Turek commented on KAFKA-3806:
-

Hi Jun and James,

I probably don't see all the assumptions and consequences but I feel there is 
something wrong. I consider the committed offsets to be only tiny metadata for 
huge log data. Each committed offset is in its nature only one single number + 
identification of topic-partition + identification of consumer group. It was 
exactly this when it was stored in ZooKeeper. The new approach - storing of the 
offsets by writing to a special Kafka topic is "only" an implementation detail 
:-). The topic may store a lot of subsequent commits, but each offset commit 
invalidate and fully overwrite all previous ones. I thought the Kafka's log 
compaction feature 
(https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction) apply here 
and only the last commit will survive the compaction, so there is nearly no 
storage overhead long term. Am I wrong? Please correct me if so.

Default log.retention.hours = 7 days is pretty fine, but I would expect default 
for offsets.retention.minutes to be half a year or so. Remember it is basically 
only reasonably small group of tiny numbers. Can you explain me, what is the 
reason to have offsets.retention.minutes so small, only 1 day? What will be the 
consequences if we configure it to one month or one year? Will something wrong 
happen? I feel it's obvious for you but I don't see anything.

I fully agree there should be some TTL expiration for very old "dead" values to 
be able to GC them and free resources. Even the tiny metadata may grow in time. 
But "one day" doesn't belong to the "very old" category for me at all.

If prolonging offsets.retention.minutes was dangerous, would it be possible to 
prevent deletion of the committed offsets in case that the topic still exists 
and the consumer group is active or was active during offsets.retention.minutes 
timeout? I don't know Kafka code, but I would expect behavior like this to 
reliably prevent any (meta)data loss.

Thanks for explanation!
Michal

> Adjust default values of log.retention.hours and offsets.retention.minutes
> --
>
> Key: KAFKA-3806
> URL: https://issues.apache.org/jira/browse/KAFKA-3806
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Michal Turek
>Priority: Minor
>
> Combination of default values of log.retention.hours (168 hours = 7 days) and 
> offsets.retention.minutes (1440 minutes = 1 day) may be dangerous in special 
> cases. Offset retention should be always greater than log retention.
> We have observed the following scenario and issue:
> - Producing of data to a topic was disabled two days ago by producer update, 
> topic wasn't deleted.
> - Consumer consumed all data and properly committed offsets to Kafka.
> - Consumer made no more offset commits for that topic because there was no 
> more incoming data and there was nothing to confirm. (We have auto-commit 
> disabled, I'm not sure how behaves enabled auto-commit.)
> - After one day: Kafka cleared too old offsets according to 
> offsets.retention.minutes.
> - After two days: Long-term running consumer was restarted after update, it 
> didn't find any committed offsets for that topic since they were deleted by 
> offsets.retention.minutes so it started consuming from the beginning.
> - The messages were still in Kafka due to larger log.retention.hours, about 5 
> days of messages were read again.
> Known workaround to solve this issue:
> - Explicitly configure log.retention.hours and offsets.retention.minutes, 
> don't use defaults.
> Proposals:
> - Prolong default value of offsets.retention.minutes to be at least twice 
> larger than log.retention.hours.
> - Check these values during Kafka startup and log a warning if 
> offsets.retention.minutes is smaller than log.retention.hours.
> - Add a note to migration guide about differences between storing of offsets 
> in ZooKeeper and Kafka (http://kafka.apache.org/documentation.html#upgrade).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes

2016-06-09 Thread David Watzke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322076#comment-15322076
 ] 

David Watzke commented on KAFKA-3806:
-

[~junrao] I understand the need for the automatic consumer group offsets 
cleanup but why would you insist on keeping the default so low? It seems like a 
very aggresive default that would complicate the code for everyone who's using 
kafka (by having to implement noop offset commits just to reset timers)... It's 
just I don't see a reason why would anybody prefer this over a saner default 
that would be closer to the default retention (time) value. Thanks

> Adjust default values of log.retention.hours and offsets.retention.minutes
> --
>
> Key: KAFKA-3806
> URL: https://issues.apache.org/jira/browse/KAFKA-3806
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Michal Turek
>Priority: Minor
>
> Combination of default values of log.retention.hours (168 hours = 7 days) and 
> offsets.retention.minutes (1440 minutes = 1 day) may be dangerous in special 
> cases. Offset retention should be always greater than log retention.
> We have observed the following scenario and issue:
> - Producing of data to a topic was disabled two days ago by producer update, 
> topic wasn't deleted.
> - Consumer consumed all data and properly committed offsets to Kafka.
> - Consumer made no more offset commits for that topic because there was no 
> more incoming data and there was nothing to confirm. (We have auto-commit 
> disabled, I'm not sure how behaves enabled auto-commit.)
> - After one day: Kafka cleared too old offsets according to 
> offsets.retention.minutes.
> - After two days: Long-term running consumer was restarted after update, it 
> didn't find any committed offsets for that topic since they were deleted by 
> offsets.retention.minutes so it started consuming from the beginning.
> - The messages were still in Kafka due to larger log.retention.hours, about 5 
> days of messages were read again.
> Known workaround to solve this issue:
> - Explicitly configure log.retention.hours and offsets.retention.minutes, 
> don't use defaults.
> Proposals:
> - Prolong default value of offsets.retention.minutes to be at least twice 
> larger than log.retention.hours.
> - Check these values during Kafka startup and log a warning if 
> offsets.retention.minutes is smaller than log.retention.hours.
> - Add a note to migration guide about differences between storing of offsets 
> in ZooKeeper and Kafka (http://kafka.apache.org/documentation.html#upgrade).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3799) Turn on endpoint validation in SSL system tests

2016-06-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322045#comment-15322045
 ] 

ASF GitHub Bot commented on KAFKA-3799:
---

GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/1483

KAFKA-3799: Enable SSL endpoint validation in system tests

Generate certificates with hostname in SubjectAlternativeName and enable 
hostname validation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-3799

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1483.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1483


commit 0c86afa6f0c1fb69b5fb700ff7b27c352e3ab3a8
Author: Rajini Sivaram 
Date:   2016-06-09T06:53:54Z

KAFKA-3799: Enable SSL endpoint validation in system tests




> Turn on endpoint validation in SSL system tests
> ---
>
> Key: KAFKA-3799
> URL: https://issues.apache.org/jira/browse/KAFKA-3799
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>
> Endpoint validation is off by default and currently system tests are run 
> without endpoint validation. It will be better to run system tests with 
> endpoint validation turned on. KAFKA-3665 will be enabling validation by 
> default as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1483: KAFKA-3799: Enable SSL endpoint validation in syst...

2016-06-09 Thread rajinisivaram
GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/1483

KAFKA-3799: Enable SSL endpoint validation in system tests

Generate certificates with hostname in SubjectAlternativeName and enable 
hostname validation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-3799

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1483.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1483


commit 0c86afa6f0c1fb69b5fb700ff7b27c352e3ab3a8
Author: Rajini Sivaram 
Date:   2016-06-09T06:53:54Z

KAFKA-3799: Enable SSL endpoint validation in system tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1482: MINOR: Fix a variable name semantically correct.

2016-06-09 Thread uchan-nos
GitHub user uchan-nos opened a pull request:

https://github.com/apache/kafka/pull/1482

MINOR: Fix a variable name semantically correct.

Hi all,
This is my first commit to Kafka.

"msec / 1000" turns into sec, isn't it?
I just have fixed a variable name.
@granders 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uchan-nos/kafka elapsed-ms

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1482.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1482


commit 886e1e7f41b9d95b09f153c1e0f157d2016c943c
Author: kota-uchida 
Date:   2016-06-09T06:12:37Z

MINOR: Fix a variable name semantically correct.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---