Re: Accumulating data in Kafka Connect source tasks

2016-01-29 Thread Randall Hauch
On January 28, 2016 at 7:07:02 PM, Ewen Cheslack-Postava (e...@confluent.io) 
wrote:
Randall, 

Great question. Ideally you wouldn't need this type of state since it 
should really be available in the source system. In your case, it might 
actually make sense to be able to grab that information from the DB itself, 
although that will also have issues if, for example, there have been 
multiple schema changes and you can no longer get a previous schema from 
the current state of the tables. 
I agree that ideally connectors would be stateless, or at least have no need 
for maintaining state across restarts. Unfortunately, that’s not always 
possible.

Reading the log but using the current schema does pose a problem if/when the 
schema has evolved since the point in the log that we’re currently reading. 
This is far more of an issue if you’re playing catch up and there’s been 
non-compatible schema changes.

Case in point: when MySQL inserts/updates/removes a row from a table, it writes 
an event in the log that includes (a) a table identifier and (b) the row values 
in column-order. There is no other information. Column renames might be okay, 
but adding or removing columns will likely result in mismatching the row values 
to the appropriate columns.

Luckily, MySQL includes the DDL statements in the log, so my connector parses 
these as part of processing and builds up the schema state as it goes along. 
This works beautifully, with the only issue being how to persist and recover 
this after restarts.


The offset storage is probably pretty close to what you're looking for, 
although we obviously structure that very narrowly. Adding in some sort of 
other state store is an interesting idea, though I'd be curious how many 
other systems encounter similar challenges. I think one way to do this 
without huge changes and in a way that properly handles offset commits 
would be to expose a small API for setting local state and have Connect 
store that state right in the same topic (and message) as offsets. To 
handle offset commits and reviving tasks that hit a fault, we would just 
grab the current state as part of the process of committing offsets. Then 
offsets would just be a special case of that more general state. 

However, I'm also wary of doing something like this. Right now every worker 
has to consume the entire offsets topic. This works fine because offsets, 
while capable of being pretty complex, are generally pretty small such that 
there's no concern having to tail it on all workers (and no concern for the 
load on brokers leading those partitions). Once you provide a generic state 
storage mechanism without clear constraints on how it should be used, 
someone will come along and abuse it. Also, with offsets it is very clear 
(to the connector developer) which task should write to which keys (where 
the key here is the partition of the source partitioned stream). With 
support for arbitrary state, ownership of different subsets of the key 
space is very unclear. I think you may not have that particular problem 
because you probably only have 1 partition anyway since you are reading a 
transaction log. 

In any case, you're right that this doesn't exist today. There is one very 
hacky way to get around this, which is to store that schema information in 
your "offsets". This may not be *too* bad -- it'll increase the size of 
offset data, but probably doesn't affect much else. The data size may not 
be that bad as long as offsets aren't committed too frequently. In terms of 
performance, I'm assuming these schema changes are relatively rare, and you 
can just include the same schema object in every offset you create during 
the periods between schema changes so you (and the GC) are probably only 
doing a negligible amount of extra work. 

Hmm, it sound like hammering accumulated state into the offsets could be pretty 
problematic and potentially risky, especially if the state has very different 
size and frequency characteristics than the offsets.

Re: creating a consumer, Connect doesn't provide any utilities to do that 
since the goal is to handle everything Kafka-related for the connector 
developer so they can just focus on getting the data from the other system! 
We could consider exposing some of the worker config though, which I 
imagine is all you really need -- it'd just be convenient to have the 
connection info for the Kafka brokers. 
Having a way to get the worker config would be awesome, and IMO it a nice 
minimalistic approach. If you think this is a good idea, I can log a JIRA and 
take it to the dev list. I’m willing to work on it, too. 

I’m starting to think that storing state on a separate dedicated topic is the 
best option, at least for me. First, connector tasks can easily record their 
state by simply adding more SourceRecord instances during polling. Second, that 
information might be useful for downstream consumers. And third, recording 
state this way requires no changes to 

Meaning of request.timeout.ms

2016-01-29 Thread tao xiao
Hi team,

I want to understanding the meaning of request.timeout.ms that is used in
producer. As per the doc this property is used to expire records that have
been waiting for response from server for more than request.timeout.ms
which also means the records have been sitting in InFlightRequests for more
than request.timeout.ms.

But look at the codebase I discovered that request.timeout.ms is used for
another purpose: to expire records that sit in accumulator for more than
request.timeout.ms. I believe records that are in accumulator should not be
in InFlightRequests therefore request.timeout.ms is used to expire two
types of records: Records that sit in InFlightRequests for more than
request.timeout.ms and records that sit in accumulator for  more than
request.timeout.ms. Is this understanding correct?


Re: MongoDB Kafka Connect driver

2016-01-29 Thread Ewen Cheslack-Postava
Sunny,

As I said on Twitter, I'm stoked to hear you're working on a Mongo
connector! It struck me as a pretty natural source to tackle since it does
such a nice job of cleanly exposing the op log.

Regarding the problem of only getting deltas, unfortunately there is not a
trivial solution here -- if you want to generate the full updated record,
you're going to have to have a way to recover the original document.

In fact, I'm curious how you were thinking of even bootstrapping. Are you
going to do a full dump and then start reading the op log? Is there a good
way to do the dump and figure out the exact location in the op log that the
query generating the dump was initially performed? I know that internally
mongo effectively does these two steps, but I'm not sure if the necessary
info is exposed via normal queries.

If you want to reconstitute the data, I can think of a couple of options:

1. Try to reconstitute inline in the connector. This seems difficult to
make work in practice. At some point you basically have to query for the
entire data set to bring it into memory and then the connector is
effectively just applying the deltas to its in memory copy and then just
generating one output record containing the full document each time it
applies an update.
2. Make the connector send just the updates and have a separate stream
processing job perform the reconstitution and send to another topic. In
this case, the first topic should not be compacted, but the second one
could be.

Unfortunately, without additional hooks into the database, there's not much
you can do besides this pretty heavyweight process. There may be some
tricks you can use to reduce the amount of memory used during the process
(e.g. keep a small cache of actual records and for the rest only store
Kafka offsets for the last full value, performing a (possibly expensive)
random read as necessary to get the full document value back), but to get
full correctness you will need to perform this process.

In terms of Kafka Connect supporting something like this, I'm not sure how
general it could be made, or that you even want to perform the process
inline with the Kafka Connect job. If it's an issue that repeatedly arises
across a variety of systems, then we should consider how to address it more
generally.

-Ewen

On Tue, Jan 26, 2016 at 8:43 PM, Sunny Shah  wrote:

>
> Hi ,
>
> We are trying to write a Kafka-connect connector for Mongodb. The issue
> is, MongoDB does not provide an entire changed document for update
> operations, It just provides the modified fields.
>
> if Kafka allows custom log compaction then It is possible to eventually
> merge an entire document and subsequent update to to create an entire
> record again.
>
> As Ewen pointed out to me on twitter, this is not possible, then What is
> the Kafka-connect way of solving this issue?
>
> @Ewen, Thanks a lot for a really quick answer on twitter.
>
> --
> Thanks and Regards,
>  Sunny
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only. It shall not attach any liability
> on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates. Any
> form of reproduction, dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior written
> consent of the author of this e-mail is strictly prohibited. If you have
> received this email in error please delete it and notify the sender
> immediately. You are liable to the company (TinyOwl Technology Pvt. Ltd.) in
> case of any breach in ​
> ​confidentialy (through any form of communication) wherein the company has
> the right to injunct legal action and an equitable relief for damages.
>



-- 
Thanks,
Ewen


Re: MongoDB Kafka Connect driver

2016-01-29 Thread Jay Kreps
Ah, agreed. This approach is actually quite common in change capture,
though. For many use cases getting the final value is actually preferable
to getting intermediates. The exception is usually if you want to do
analytics on something like number of changes.

On Fri, Jan 29, 2016 at 9:35 AM, Ewen Cheslack-Postava 
wrote:

> Jay,
>
> You can query after the fact, but you're not necessarily going to get the
> same value back. There could easily be dozens of changes to the document in
> the oplog so the delta you see may not even make sense given the current
> state of the document. Even if you can apply it the delta, you'd still be
> seeing data that is newer than the update. You can of course take this
> shortcut, but it won't give correct results. And if the data has been
> deleted since then, you won't even be able to write the full record... As
> far as I know, the way the op log is exposed won't let you do something
> like pin a query to the state of the db at a specific point in the op log
> and you may be reading from the beginning of the op log, so I don't think
> there's a way to get correct results by just querying the DB for the full
> documents.
>
> Strictly speaking you don't need to get all the data in memory, you just
> need a record of the current set of values somewhere. This is what I was
> describing following those two options -- if you do an initial dump to
> Kafka, you could track only offsets in memory and read back full values as
> needed to apply deltas, but this of course requires random reads into your
> Kafka topic (but may perform fine in practice depending on the workload).
>
> -Ewen
>
> On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps  wrote:
>
> > Hey Ewen, how come you need to get it all in memory for approach (1)? I
> > guess the obvious thing to do would just be to query for the record
> > after-image when you get the diff--e.g. just read a batch of changes and
> > multi-get the final values. I don't know how bad the overhead of this
> would
> > be...batching might reduce it a fair amount. The guarantees for this are
> > slightly different than the pure oplog too (you get the current value not
> > every necessarily every intermediate) but that should be okay for most
> > uses.
> >
> > -Jay
> >
> > On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <
> e...@confluent.io>
> > wrote:
> >
> > > Sunny,
> > >
> > > As I said on Twitter, I'm stoked to hear you're working on a Mongo
> > > connector! It struck me as a pretty natural source to tackle since it
> > does
> > > such a nice job of cleanly exposing the op log.
> > >
> > > Regarding the problem of only getting deltas, unfortunately there is
> not
> > a
> > > trivial solution here -- if you want to generate the full updated
> record,
> > > you're going to have to have a way to recover the original document.
> > >
> > > In fact, I'm curious how you were thinking of even bootstrapping. Are
> you
> > > going to do a full dump and then start reading the op log? Is there a
> > good
> > > way to do the dump and figure out the exact location in the op log that
> > the
> > > query generating the dump was initially performed? I know that
> internally
> > > mongo effectively does these two steps, but I'm not sure if the
> necessary
> > > info is exposed via normal queries.
> > >
> > > If you want to reconstitute the data, I can think of a couple of
> options:
> > >
> > > 1. Try to reconstitute inline in the connector. This seems difficult to
> > > make work in practice. At some point you basically have to query for
> the
> > > entire data set to bring it into memory and then the connector is
> > > effectively just applying the deltas to its in memory copy and then
> just
> > > generating one output record containing the full document each time it
> > > applies an update.
> > > 2. Make the connector send just the updates and have a separate stream
> > > processing job perform the reconstitution and send to another topic. In
> > > this case, the first topic should not be compacted, but the second one
> > > could be.
> > >
> > > Unfortunately, without additional hooks into the database, there's not
> > much
> > > you can do besides this pretty heavyweight process. There may be some
> > > tricks you can use to reduce the amount of memory used during the
> process
> > > (e.g. keep a small cache of actual records and for the rest only store
> > > Kafka offsets for the last full value, performing a (possibly
> expensive)
> > > random read as necessary to get the full document value back), but to
> get
> > > full correctness you will need to perform this process.
> > >
> > > In terms of Kafka Connect supporting something like this, I'm not sure
> > how
> > > general it could be made, or that you even want to perform the process
> > > inline with the Kafka Connect job. If it's an issue that repeatedly
> > arises
> > > across a variety of systems, then we should consider how to address it
> > more
> > > 

Re: MongoDB Kafka Connect driver

2016-01-29 Thread Jay Kreps
Also, most database provide a "full logging" option that let's you capture
the whole row in the log (I know Oracle and MySQL have this) but it sounds
like Mongo doesn't yet. That would be the ideal solution.

-Jay

On Fri, Jan 29, 2016 at 9:38 AM, Jay Kreps  wrote:

> Ah, agreed. This approach is actually quite common in change capture,
> though. For many use cases getting the final value is actually preferable
> to getting intermediates. The exception is usually if you want to do
> analytics on something like number of changes.
>
> On Fri, Jan 29, 2016 at 9:35 AM, Ewen Cheslack-Postava 
> wrote:
>
>> Jay,
>>
>> You can query after the fact, but you're not necessarily going to get the
>> same value back. There could easily be dozens of changes to the document
>> in
>> the oplog so the delta you see may not even make sense given the current
>> state of the document. Even if you can apply it the delta, you'd still be
>> seeing data that is newer than the update. You can of course take this
>> shortcut, but it won't give correct results. And if the data has been
>> deleted since then, you won't even be able to write the full record... As
>> far as I know, the way the op log is exposed won't let you do something
>> like pin a query to the state of the db at a specific point in the op log
>> and you may be reading from the beginning of the op log, so I don't think
>> there's a way to get correct results by just querying the DB for the full
>> documents.
>>
>> Strictly speaking you don't need to get all the data in memory, you just
>> need a record of the current set of values somewhere. This is what I was
>> describing following those two options -- if you do an initial dump to
>> Kafka, you could track only offsets in memory and read back full values as
>> needed to apply deltas, but this of course requires random reads into your
>> Kafka topic (but may perform fine in practice depending on the workload).
>>
>> -Ewen
>>
>> On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps  wrote:
>>
>> > Hey Ewen, how come you need to get it all in memory for approach (1)? I
>> > guess the obvious thing to do would just be to query for the record
>> > after-image when you get the diff--e.g. just read a batch of changes and
>> > multi-get the final values. I don't know how bad the overhead of this
>> would
>> > be...batching might reduce it a fair amount. The guarantees for this are
>> > slightly different than the pure oplog too (you get the current value
>> not
>> > every necessarily every intermediate) but that should be okay for most
>> > uses.
>> >
>> > -Jay
>> >
>> > On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <
>> e...@confluent.io>
>> > wrote:
>> >
>> > > Sunny,
>> > >
>> > > As I said on Twitter, I'm stoked to hear you're working on a Mongo
>> > > connector! It struck me as a pretty natural source to tackle since it
>> > does
>> > > such a nice job of cleanly exposing the op log.
>> > >
>> > > Regarding the problem of only getting deltas, unfortunately there is
>> not
>> > a
>> > > trivial solution here -- if you want to generate the full updated
>> record,
>> > > you're going to have to have a way to recover the original document.
>> > >
>> > > In fact, I'm curious how you were thinking of even bootstrapping. Are
>> you
>> > > going to do a full dump and then start reading the op log? Is there a
>> > good
>> > > way to do the dump and figure out the exact location in the op log
>> that
>> > the
>> > > query generating the dump was initially performed? I know that
>> internally
>> > > mongo effectively does these two steps, but I'm not sure if the
>> necessary
>> > > info is exposed via normal queries.
>> > >
>> > > If you want to reconstitute the data, I can think of a couple of
>> options:
>> > >
>> > > 1. Try to reconstitute inline in the connector. This seems difficult
>> to
>> > > make work in practice. At some point you basically have to query for
>> the
>> > > entire data set to bring it into memory and then the connector is
>> > > effectively just applying the deltas to its in memory copy and then
>> just
>> > > generating one output record containing the full document each time it
>> > > applies an update.
>> > > 2. Make the connector send just the updates and have a separate stream
>> > > processing job perform the reconstitution and send to another topic.
>> In
>> > > this case, the first topic should not be compacted, but the second one
>> > > could be.
>> > >
>> > > Unfortunately, without additional hooks into the database, there's not
>> > much
>> > > you can do besides this pretty heavyweight process. There may be some
>> > > tricks you can use to reduce the amount of memory used during the
>> process
>> > > (e.g. keep a small cache of actual records and for the rest only store
>> > > Kafka offsets for the last full value, performing a (possibly
>> expensive)
>> > > random read as necessary to get the full document value back), but to
>> get

Re: How to bind all Kafka tcp port to private net address

2016-01-29 Thread Stephen Powis
Pretty sure you want to set this option in your server.properties file:

# Hostname the broker will bind to. If not set, the server will bind to all
> interfaces
> #host.name=localhost
>

On Thu, Jan 28, 2016 at 10:58 PM, costa xu  wrote:

> My version is kafka_2.11-0.9.0.0. I find that the kafka listen on multi tcp
> port on a linux server.
>
> [gdata@gdataqosconnd2 kafka_2.11-0.9.0.0]$ netstat -plnt|grep java
> (Not all processes could be identified, non-owned process info
>  will not be shown, you would have to be root to see it all.)
> tcp0  0 10.105.7.243:9092   0.0.0.0:*
> LISTEN  31011/java
> tcp0  0 0.0.0.0:51367   0.0.0.0:*
> LISTEN  31011/java
> tcp0  0 0.0.0.0:11050.0.0.0:*
> LISTEN  31011/java
> tcp0  0 0.0.0.0:42592   0.0.0.0:*
> LISTEN  31011/java
>
> 10.105.7.243:9092 is the broker's port.0 0.0.0.0:1105 is the jmx port that
> I set in the start script.
> But I dont know what is the 0 0.0.0.0:51367 and 0 0.0.0.0:42592. And more
> tricky, the port will change after restarting of the kafka process.
>
> So  I want to know how to bind the kafka port to private interface just
> like '10.105.7.243'.
> If I can not bind them, can I set the fixed listened port number?
>
> My kafka server.properties is:
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # see kafka.server.KafkaConfig for additional details and defaults
>
> # Server Basics #
>
> # The id of the broker. This must be set to a unique integer for each
> broker.
> broker.id=1
>
> # Socket Server Settings
> #
>
> listeners=PLAINTEXT://10.105.7.243:9092
>
> # The port the socket server listens on
> #port=9092
>
> # Hostname the broker will bind to. If not set, the server will bind to all
> interfaces
> #host.name=localhost
>
> # Hostname the broker will advertise to producers and consumers. If not
> set, it uses the
> # value for "host.name" if configured.  Otherwise, it will use the value
> returned from
> # java.net.InetAddress.getCanonicalHostName().
> #advertised.host.name=
>
> # The port to publish to ZooKeeper for clients to use. If this is not set,
> # it will publish the same port that the broker binds to.
> #advertised.port=
>
> # The number of threads handling network requests
> num.network.threads=3
>
> # The number of threads doing disk I/O
> num.io.threads=8
>
> # The send buffer (SO_SNDBUF) used by the socket server
> socket.send.buffer.bytes=102400
>
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=102400
>
> # The maximum size of a request that the socket server will accept
> (protection against OOM)
> socket.request.max.bytes=104857600
>
>
> # Log Basics #
>
> # A comma seperated list of directories under which to store log files
> log.dirs=/data/gdata/var/kafka-logs
>
> # The default number of log partitions per topic. More partitions allow
> greater
> # parallelism for consumption, but this will also result in more files
> across
> # the brokers.
> num.partitions=1
>
> # The number of threads per data directory to be used for log recovery at
> startup and flushing at shutdown.
> # This value is recommended to be increased for installations with data
> dirs located in RAID array.
> num.recovery.threads.per.data.dir=1
>
> # Log Flush Policy
> #
>
> # Messages are immediately written to the filesystem but by default we only
> fsync() to sync
> # the OS cache lazily. The following configurations control the flush of
> data to disk.
> # There are a few important trade-offs here:
> #1. Durability: Unflushed data may be lost if you are not using
> replication.
> #2. Latency: Very large flush intervals may lead to latency spikes when
> the flush does occur as there will be a lot of data to flush.
> #3. Throughput: The flush is generally the most expensive operation,
> and a small flush interval may lead to exceessive seeks.
> # The settings below allow one to configure the flush policy to flush data

Re: MongoDB Kafka Connect driver

2016-01-29 Thread Jay Kreps
Hey Ewen, how come you need to get it all in memory for approach (1)? I
guess the obvious thing to do would just be to query for the record
after-image when you get the diff--e.g. just read a batch of changes and
multi-get the final values. I don't know how bad the overhead of this would
be...batching might reduce it a fair amount. The guarantees for this are
slightly different than the pure oplog too (you get the current value not
every necessarily every intermediate) but that should be okay for most uses.

-Jay

On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava 
wrote:

> Sunny,
>
> As I said on Twitter, I'm stoked to hear you're working on a Mongo
> connector! It struck me as a pretty natural source to tackle since it does
> such a nice job of cleanly exposing the op log.
>
> Regarding the problem of only getting deltas, unfortunately there is not a
> trivial solution here -- if you want to generate the full updated record,
> you're going to have to have a way to recover the original document.
>
> In fact, I'm curious how you were thinking of even bootstrapping. Are you
> going to do a full dump and then start reading the op log? Is there a good
> way to do the dump and figure out the exact location in the op log that the
> query generating the dump was initially performed? I know that internally
> mongo effectively does these two steps, but I'm not sure if the necessary
> info is exposed via normal queries.
>
> If you want to reconstitute the data, I can think of a couple of options:
>
> 1. Try to reconstitute inline in the connector. This seems difficult to
> make work in practice. At some point you basically have to query for the
> entire data set to bring it into memory and then the connector is
> effectively just applying the deltas to its in memory copy and then just
> generating one output record containing the full document each time it
> applies an update.
> 2. Make the connector send just the updates and have a separate stream
> processing job perform the reconstitution and send to another topic. In
> this case, the first topic should not be compacted, but the second one
> could be.
>
> Unfortunately, without additional hooks into the database, there's not much
> you can do besides this pretty heavyweight process. There may be some
> tricks you can use to reduce the amount of memory used during the process
> (e.g. keep a small cache of actual records and for the rest only store
> Kafka offsets for the last full value, performing a (possibly expensive)
> random read as necessary to get the full document value back), but to get
> full correctness you will need to perform this process.
>
> In terms of Kafka Connect supporting something like this, I'm not sure how
> general it could be made, or that you even want to perform the process
> inline with the Kafka Connect job. If it's an issue that repeatedly arises
> across a variety of systems, then we should consider how to address it more
> generally.
>
> -Ewen
>
> On Tue, Jan 26, 2016 at 8:43 PM, Sunny Shah  wrote:
>
> >
> > Hi ,
> >
> > We are trying to write a Kafka-connect connector for Mongodb. The issue
> > is, MongoDB does not provide an entire changed document for update
> > operations, It just provides the modified fields.
> >
> > if Kafka allows custom log compaction then It is possible to eventually
> > merge an entire document and subsequent update to to create an entire
> > record again.
> >
> > As Ewen pointed out to me on twitter, this is not possible, then What is
> > the Kafka-connect way of solving this issue?
> >
> > @Ewen, Thanks a lot for a really quick answer on twitter.
> >
> > --
> > Thanks and Regards,
> >  Sunny
> >
> > The contents of this e-mail and any attachment(s) are confidential and
> > intended for the named recipient(s) only. It shall not attach any
> liability
> > on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates. Any
> > form of reproduction, dissemination, copying, disclosure, modification,
> > distribution and / or publication of this message without the prior
> written
> > consent of the author of this e-mail is strictly prohibited. If you have
> > received this email in error please delete it and notify the sender
> > immediately. You are liable to the company (TinyOwl Technology Pvt.
> Ltd.) in
> > case of any breach in ​
> > ​confidentialy (through any form of communication) wherein the company
> has
> > the right to injunct legal action and an equitable relief for damages.
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: MongoDB Kafka Connect driver

2016-01-29 Thread Ewen Cheslack-Postava
Jay,

You can query after the fact, but you're not necessarily going to get the
same value back. There could easily be dozens of changes to the document in
the oplog so the delta you see may not even make sense given the current
state of the document. Even if you can apply it the delta, you'd still be
seeing data that is newer than the update. You can of course take this
shortcut, but it won't give correct results. And if the data has been
deleted since then, you won't even be able to write the full record... As
far as I know, the way the op log is exposed won't let you do something
like pin a query to the state of the db at a specific point in the op log
and you may be reading from the beginning of the op log, so I don't think
there's a way to get correct results by just querying the DB for the full
documents.

Strictly speaking you don't need to get all the data in memory, you just
need a record of the current set of values somewhere. This is what I was
describing following those two options -- if you do an initial dump to
Kafka, you could track only offsets in memory and read back full values as
needed to apply deltas, but this of course requires random reads into your
Kafka topic (but may perform fine in practice depending on the workload).

-Ewen

On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps  wrote:

> Hey Ewen, how come you need to get it all in memory for approach (1)? I
> guess the obvious thing to do would just be to query for the record
> after-image when you get the diff--e.g. just read a batch of changes and
> multi-get the final values. I don't know how bad the overhead of this would
> be...batching might reduce it a fair amount. The guarantees for this are
> slightly different than the pure oplog too (you get the current value not
> every necessarily every intermediate) but that should be okay for most
> uses.
>
> -Jay
>
> On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava 
> wrote:
>
> > Sunny,
> >
> > As I said on Twitter, I'm stoked to hear you're working on a Mongo
> > connector! It struck me as a pretty natural source to tackle since it
> does
> > such a nice job of cleanly exposing the op log.
> >
> > Regarding the problem of only getting deltas, unfortunately there is not
> a
> > trivial solution here -- if you want to generate the full updated record,
> > you're going to have to have a way to recover the original document.
> >
> > In fact, I'm curious how you were thinking of even bootstrapping. Are you
> > going to do a full dump and then start reading the op log? Is there a
> good
> > way to do the dump and figure out the exact location in the op log that
> the
> > query generating the dump was initially performed? I know that internally
> > mongo effectively does these two steps, but I'm not sure if the necessary
> > info is exposed via normal queries.
> >
> > If you want to reconstitute the data, I can think of a couple of options:
> >
> > 1. Try to reconstitute inline in the connector. This seems difficult to
> > make work in practice. At some point you basically have to query for the
> > entire data set to bring it into memory and then the connector is
> > effectively just applying the deltas to its in memory copy and then just
> > generating one output record containing the full document each time it
> > applies an update.
> > 2. Make the connector send just the updates and have a separate stream
> > processing job perform the reconstitution and send to another topic. In
> > this case, the first topic should not be compacted, but the second one
> > could be.
> >
> > Unfortunately, without additional hooks into the database, there's not
> much
> > you can do besides this pretty heavyweight process. There may be some
> > tricks you can use to reduce the amount of memory used during the process
> > (e.g. keep a small cache of actual records and for the rest only store
> > Kafka offsets for the last full value, performing a (possibly expensive)
> > random read as necessary to get the full document value back), but to get
> > full correctness you will need to perform this process.
> >
> > In terms of Kafka Connect supporting something like this, I'm not sure
> how
> > general it could be made, or that you even want to perform the process
> > inline with the Kafka Connect job. If it's an issue that repeatedly
> arises
> > across a variety of systems, then we should consider how to address it
> more
> > generally.
> >
> > -Ewen
> >
> > On Tue, Jan 26, 2016 at 8:43 PM, Sunny Shah  wrote:
> >
> > >
> > > Hi ,
> > >
> > > We are trying to write a Kafka-connect connector for Mongodb. The issue
> > > is, MongoDB does not provide an entire changed document for update
> > > operations, It just provides the modified fields.
> > >
> > > if Kafka allows custom log compaction then It is possible to eventually
> > > merge an entire document and subsequent update to to create an entire
> > > record again.
> > >
> > > As Ewen pointed out 

Re: Accumulating data in Kafka Connect source tasks

2016-01-29 Thread Ewen Cheslack-Postava
On Fri, Jan 29, 2016 at 7:06 AM, Randall Hauch  wrote:

> On January 28, 2016 at 7:07:02 PM, Ewen Cheslack-Postava (
> e...@confluent.io) wrote:
>
> Randall,
>
> Great question. Ideally you wouldn't need this type of state since it
> should really be available in the source system. In your case, it might
> actually make sense to be able to grab that information from the DB itself,
>
> although that will also have issues if, for example, there have been
> multiple schema changes and you can no longer get a previous schema from
> the current state of the tables.
>
> I agree that ideally connectors would be stateless, or at least have no
> need for maintaining state across restarts. Unfortunately, that’s not
> always possible.
>
> Reading the log but using the current schema does pose a problem if/when
> the schema has evolved since the point in the log that we’re currently
> reading. This is far more of an issue if you’re playing catch up and
> there’s been non-compatible schema changes.
>
> Case in point: when MySQL inserts/updates/removes a row from a table, it
> writes an event in the log that includes (a) a table identifier and (b) the
> row values in column-order. There is no other information. Column renames
> might be okay, but adding or removing columns will likely result in
> mismatching the row values to the appropriate columns.
>
> Luckily, MySQL includes the DDL statements in the log, so my connector
> parses these as part of processing and builds up the schema state as it
> goes along. This works beautifully, with the only issue being how to
> persist and recover this after restarts.
>

Yeah, this is a common complaint about the MySQL binlog. I know James
mentioned this as well. It's a bit crazy that you need a full parser for
the DDL to make this work :/


>
> The offset storage is probably pretty close to what you're looking for,
> although we obviously structure that very narrowly. Adding in some sort of
>
> other state store is an interesting idea, though I'd be curious how many
> other systems encounter similar challenges. I think one way to do this
> without huge changes and in a way that properly handles offset commits
> would be to expose a small API for setting local state and have Connect
> store that state right in the same topic (and message) as offsets. To
> handle offset commits and reviving tasks that hit a fault, we would just
> grab the current state as part of the process of committing offsets. Then
> offsets would just be a special case of that more general state.
>
> However, I'm also wary of doing something like this. Right now every worker
>
> has to consume the entire offsets topic. This works fine because offsets,
> while capable of being pretty complex, are generally pretty small such that
>
> there's no concern having to tail it on all workers (and no concern for the
>
> load on brokers leading those partitions). Once you provide a generic state
>
> storage mechanism without clear constraints on how it should be used,
> someone will come along and abuse it. Also, with offsets it is very clear
> (to the connector developer) which task should write to which keys (where
> the key here is the partition of the source partitioned stream). With
> support for arbitrary state, ownership of different subsets of the key
> space is very unclear. I think you may not have that particular problem
> because you probably only have 1 partition anyway since you are reading a
> transaction log.
>
> In any case, you're right that this doesn't exist today. There is one very
>
> hacky way to get around this, which is to store that schema information in
>
> your "offsets". This may not be *too* bad -- it'll increase the size of
> offset data, but probably doesn't affect much else. The data size may not
> be that bad as long as offsets aren't committed too frequently. In terms of
>
> performance, I'm assuming these schema changes are relatively rare, and you
>
> can just include the same schema object in every offset you create during
> the periods between schema changes so you (and the GC) are probably only
> doing a negligible amount of extra work.
>
>
> Hmm, it sound like hammering accumulated state into the offsets could be
> pretty problematic and potentially risky, especially if the state has very
> different size and frequency characteristics than the offsets.
>

Yes, as I siad, it is a hack. I mainly mentioned it here because I think
for the amount of metadata you need to carry through, it could be a
workable solution.


>
> Re: creating a consumer, Connect doesn't provide any utilities to do that
> since the goal is to handle everything Kafka-related for the connector
> developer so they can just focus on getting the data from the other system!
>
> We could consider exposing some of the worker config though, which I
> imagine is all you really need -- it'd just be convenient to have the
> connection info for the Kafka brokers.
>
> Having a way to get the worker 

Kafka Latest offsets information

2016-01-29 Thread Kashif Usmani
Hello Community,

When I login to my Kafka Manager(version 0.8.2.0), I can see the latest
offset information for each topic and partition. I want to be able to get
latest offsets programmatically. So I followed the example on
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
but I do not get anything in response.

Also then I read on
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
that offset information should be saved in Zookeeper under
/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

So I checked my Zookeeper (listed in Kafka Manager, under Cluster
Information) and I do not see anything under /consumer (basically an empty
document).

Can someone please help me to find this information? Is there something
wrong with my Kafka configuration that latest offsets are not stored under
/consumer? But then, where is the Kafka Manager getting that information
from? Some other Zookeeper?

Thanks
Kashif


Re: MongoDB Kafka Connect driver

2016-01-29 Thread James Cheng
Not sure if this will help anything, but just throwing it out there.

The Maxwell and mypipe projects both do CDC from MySQL and support 
bootstrapping. The way they do it is kind of "eventually consistent".

1) At time T1, record coordinates of the end of the binlog as of T1.
2) At time T2, do a full dump of the database into Kafka.
3) Connect back to the binlog in the coordinates recorded in step #1, and emit 
all those records into Kafka.

As Jay mentioned, MySQL supports full row images. At the start of step #3, the 
kafka topic contains all rows as of time T2. It is possible that during step 
#3, that you will emit rows that changed between T1 and T2. From the point of 
view of the consumer of the kafka topic, they would see rows that went "back in 
time". However, as step #3 progresses, and the consumer keeps reading, those 
rows would eventually converge down to their final state.

Maxwell: https://github.com/zendesk/maxwell
mypipe: https://github.com/mardambey/mypipe

Does that idea help in any way? Btw, a reason it is done this way is that it is 
"difficult" to do #1 and #2 above in a coordinated way without locking the 
database or without adding additional outside dependencies (LVM snapshots, 
being a specific one).

Btw, I glanced at some docs about the Mongodb oplog. It seems that each oplog 
contains
1) A way to identify the document that the change applies to.
2) A series of mongodb commands (set, unset) to alter the document in #1 to 
become the new document.

Thoughts:
For #1, does it identify a particular "version" of a document? (I don't know 
much about mongodb). If so, you might be able to use it to determine if the 
change should even be attempted to be applied to the object.
For #2, doesn't that mean you'll need "understand" mongodb's syntax and 
commands? Although maybe it is simply sets/unsets/deletes, in which case it's 
maybe pretty simple.

-James

> On Jan 29, 2016, at 9:39 AM, Jay Kreps  wrote:
>
> Also, most database provide a "full logging" option that let's you capture
> the whole row in the log (I know Oracle and MySQL have this) but it sounds
> like Mongo doesn't yet. That would be the ideal solution.
>
> -Jay
>
> On Fri, Jan 29, 2016 at 9:38 AM, Jay Kreps  wrote:
>
>> Ah, agreed. This approach is actually quite common in change capture,
>> though. For many use cases getting the final value is actually preferable
>> to getting intermediates. The exception is usually if you want to do
>> analytics on something like number of changes.
>>
>> On Fri, Jan 29, 2016 at 9:35 AM, Ewen Cheslack-Postava 
>> wrote:
>>
>>> Jay,
>>>
>>> You can query after the fact, but you're not necessarily going to get the
>>> same value back. There could easily be dozens of changes to the document
>>> in
>>> the oplog so the delta you see may not even make sense given the current
>>> state of the document. Even if you can apply it the delta, you'd still be
>>> seeing data that is newer than the update. You can of course take this
>>> shortcut, but it won't give correct results. And if the data has been
>>> deleted since then, you won't even be able to write the full record... As
>>> far as I know, the way the op log is exposed won't let you do something
>>> like pin a query to the state of the db at a specific point in the op log
>>> and you may be reading from the beginning of the op log, so I don't think
>>> there's a way to get correct results by just querying the DB for the full
>>> documents.
>>>
>>> Strictly speaking you don't need to get all the data in memory, you just
>>> need a record of the current set of values somewhere. This is what I was
>>> describing following those two options -- if you do an initial dump to
>>> Kafka, you could track only offsets in memory and read back full values as
>>> needed to apply deltas, but this of course requires random reads into your
>>> Kafka topic (but may perform fine in practice depending on the workload).
>>>
>>> -Ewen
>>>
>>> On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps  wrote:
>>>
 Hey Ewen, how come you need to get it all in memory for approach (1)? I
 guess the obvious thing to do would just be to query for the record
 after-image when you get the diff--e.g. just read a batch of changes and
 multi-get the final values. I don't know how bad the overhead of this
>>> would
 be...batching might reduce it a fair amount. The guarantees for this are
 slightly different than the pure oplog too (you get the current value
>>> not
 every necessarily every intermediate) but that should be okay for most
 uses.

 -Jay

 On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <
>>> e...@confluent.io>
 wrote:

> Sunny,
>
> As I said on Twitter, I'm stoked to hear you're working on a Mongo
> connector! It struck me as a pretty natural source to tackle since it
 does
> such a nice job of cleanly exposing the 

Questions from new user

2016-01-29 Thread allen chan
Use case: We are using kafka as broker in one of our elasticsearch
clusters. Kafka caches the logs if elasticsearch has any performance
issues.  I have Kafka set to delete logs pretty quickly to keep things in
the file cache to limit IO.

Questions:
1. in 0.9 it seems like consumer offers are stored only in Kafka. Is there
a way to configure Kafka to delete my production logs pretty quickly but
have a different retention behavior for the consumer offsets?

2. Our consumer lag monitoring show us that a lot of times our consumers
are behind somewhere between 500 to 1000 messages. Looking at the JMX
metrics requestSizeAvg and requestSizeMax, it shows our average request
size is 500 bytes and max request size is 800,000 bytes. I assume the lag
is because that batch could only hold one message given the max is 100
bytes. I plan to enable compression and increase the max.bytes to 10mb to
fix this short term. In a few blogs, people mentioned the ultimate fix
should be splitting the message into smaller chunks in the producer and
then having the consumer put it back together. Is that handled in the kafka
producer/consumer natively or has to be handled outside of it?

Thanks for the attention.
Allen Chan


Unable to send message

2016-01-29 Thread Jatin D Patel
Error logs from zookeeper

[2016-01-29 06:28:20,100] INFO Accepted socket connection from /
172.31.28.145:35607 (org.apache.zookeeper.server.NIOServerCnxnFactory)

[2016-01-29 06:28:20,124] INFO Client attempting to establish new session
at /172.31.28.145:35607

(org.apache.zookeeper.server.ZooKeeperServer)

[2016-01-29 06:28:20,128] INFO Established session 0x1528c0b6ef6 with
negotiated timeout 3 for client /172.31.28.145:35607
(org.apache.zookeeper.server.ZooKeeperServer)

[2016-01-29 06:28:20,265] INFO Processed session termination for sessionid:
0x1528c0b6ef6 (org.apache.zookeeper.server.PrepRequestProcessor)

[2016-01-29 06:28:20,269] INFO Closed socket connection for client /
172.31.28.145:35607 which had sessionid 0x1528c0b6ef6
(org.apache.zookeeper.server.NIOServerCnxn)

[2016-01-29 06:29:05,063] INFO Accepted socket connection from /
172.31.28.145:35718 (org.apache.zookeeper.server.NIOServerCnxnFactory)

[2016-01-29 06:29:05,117] INFO Client attempting to establish new session
at /172.31.28.145:35718 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-01-29 06:29:05,123] INFO Established session 0x1528c0b6ef7 with
negotiated timeout 3 for client /172.31.28.145:35718
(org.apache.zookeeper.server.ZooKeeperServer)

[2016-01-29 06:29:05,156] INFO Accepted socket connection from /
172.31.28.145:35720 (org.apache.zookeeper.server.NIOServerCnxnFactory)

[2016-01-29 06:29:05,160] INFO Client attempting to establish new session
at /172.31.28.145:35720 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-01-29 06:29:05,163] INFO Established session 0x1528c0b6ef8 with
negotiated timeout 3 for client /172.31.28.145:35720
(org.apache.zookeeper.server.ZooKeeperServer)

[2016-01-29 06:29:05,196] INFO Accepted socket connection from /
172.31.28.145:35721 (org.apache.zookeeper.server.NIOServerCnxnFactory)

[2016-01-29 06:29:05,204] INFO Client attempting to establish new session
at /172.31.28.145:35721 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-01-29 06:29:05,208] INFO Established session 0x1528c0b6ef9 with
negotiated timeout 3 for client /172.31.28.145:35721
(org.apache.zookeeper.server.ZooKeeperServer)

[2016-01-29 06:29:05,954] INFO Accepted socket connection from /
172.31.28.145:35730 (org.apache.zookeeper.server.NIOServerCnxnFactory)

[2016-01-29 06:29:05,955] INFO Client attempting to establish new session
at /172.31.28.145:35730 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-01-29 06:29:05,957] INFO Established session 0x1528c0b6efa with
negotiated timeout 6000 for client /172.31.28.145:35730
(org.apache.zookeeper.server.ZooKeeperServer)

[2016-01-29 06:29:06,798] INFO Got user-level KeeperException when
processing sessionid:0x1528c0b6efa type:create cxid:0x2 zxid:0x79
txntype:-1 reqpath:n/a Error Path:/consumers Error:KeeperErrorCode =
NodeExists for /consumers (org.apache.zookeeper.server.PrepRequestProcessor)

[2016-01-29 06:29:08,410] INFO Got user-level KeeperException when
processing sessionid:0x1528c0b6efa type:create cxid:0x19 zxid:0x7d
txntype:-1 reqpath:n/a Error
Path:/consumers/console-consumer-21239/owners/kafkatopic
Error:KeeperErrorCode = NoNode for
/consumers/console-consumer-21239/owners/kafkatopic
(org.apache.zookeeper.server.PrepRequestProcessor)

[2016-01-29 06:29:08,422] INFO Got user-level KeeperException when
processing sessionid:0x1528c0b6efa type:create cxid:0x1a zxid:0x7e
txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-21239/owners
Error:KeeperErrorCode = NoNode for /consumers/console-consumer-21239/owners
(org.apache.zookeeper.server.PrepRequestProcessor)

[2016-01-29 06:30:06,475] INFO Got user-level KeeperException when
processing sessionid:0x1528c0b6efa type:setData cxid:0x24 zxid:0x82
txntype:-1 reqpath:n/a Error
Path:/consumers/console-consumer-21239/offsets/kafkatopic/0
Error:KeeperErrorCode = NoNode for
/consumers/console-consumer-21239/offsets/kafkatopic/0
(org.apache.zookeeper.server.PrepRequestProcessor)

[2016-01-29 06:30:06,478] INFO Got user-level KeeperException when
processing sessionid:0x1528c0b6efa type:create cxid:0x25 zxid:0x83
txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-21239/offsets
Error:KeeperErrorCode = NoNode for
/consumers/console-consumer-21239/offsets
(org.apache.zookeeper.server.PrepRequestProcessor)


Can someone plz help ?


Re: Accumulating data in Kafka Connect source tasks

2016-01-29 Thread James Cheng

> On Jan 29, 2016, at 7:06 AM, Randall Hauch  wrote:
>
> On January 28, 2016 at 7:07:02 PM, Ewen Cheslack-Postava (e...@confluent.io) 
> wrote:
> Randall,
>
> Great question. Ideally you wouldn't need this type of state since it
> should really be available in the source system. In your case, it might
> actually make sense to be able to grab that information from the DB itself,
> although that will also have issues if, for example, there have been
> multiple schema changes and you can no longer get a previous schema from
> the current state of the tables.
> I agree that ideally connectors would be stateless, or at least have no need 
> for maintaining state across restarts. Unfortunately, that’s not always 
> possible.
>
> Reading the log but using the current schema does pose a problem if/when the 
> schema has evolved since the point in the log that we’re currently reading. 
> This is far more of an issue if you’re playing catch up and there’s been 
> non-compatible schema changes.
>
> Case in point: when MySQL inserts/updates/removes a row from a table, it 
> writes an event in the log that includes (a) a table identifier and (b) the 
> row values in column-order. There is no other information. Column renames 
> might be okay, but adding or removing columns will likely result in 
> mismatching the row values to the appropriate columns.
>
> Luckily, MySQL includes the DDL statements in the log, so my connector parses 
> these as part of processing and builds up the schema state as it goes along. 
> This works beautifully, with the only issue being how to persist and recover 
> this after restarts.
>
>
> The offset storage is probably pretty close to what you're looking for,
> although we obviously structure that very narrowly. Adding in some sort of
> other state store is an interesting idea, though I'd be curious how many
> other systems encounter similar challenges. I think one way to do this
> without huge changes and in a way that properly handles offset commits
> would be to expose a small API for setting local state and have Connect
> store that state right in the same topic (and message) as offsets. To
> handle offset commits and reviving tasks that hit a fault, we would just
> grab the current state as part of the process of committing offsets. Then
> offsets would just be a special case of that more general state.
>
> However, I'm also wary of doing something like this. Right now every worker
> has to consume the entire offsets topic. This works fine because offsets,
> while capable of being pretty complex, are generally pretty small such that
> there's no concern having to tail it on all workers (and no concern for the
> load on brokers leading those partitions). Once you provide a generic state
> storage mechanism without clear constraints on how it should be used,
> someone will come along and abuse it. Also, with offsets it is very clear
> (to the connector developer) which task should write to which keys (where
> the key here is the partition of the source partitioned stream). With
> support for arbitrary state, ownership of different subsets of the key
> space is very unclear. I think you may not have that particular problem
> because you probably only have 1 partition anyway since you are reading a
> transaction log.
>
> In any case, you're right that this doesn't exist today. There is one very
> hacky way to get around this, which is to store that schema information in
> your "offsets". This may not be *too* bad -- it'll increase the size of
> offset data, but probably doesn't affect much else. The data size may not
> be that bad as long as offsets aren't committed too frequently. In terms of
> performance, I'm assuming these schema changes are relatively rare, and you
> can just include the same schema object in every offset you create during
> the periods between schema changes so you (and the GC) are probably only
> doing a negligible amount of extra work.
>
> Hmm, it sound like hammering accumulated state into the offsets could be 
> pretty problematic and potentially risky, especially if the state has very 
> different size and frequency characteristics than the offsets.
>
> Re: creating a consumer, Connect doesn't provide any utilities to do that
> since the goal is to handle everything Kafka-related for the connector
> developer so they can just focus on getting the data from the other system!
> We could consider exposing some of the worker config though, which I
> imagine is all you really need -- it'd just be convenient to have the
> connection info for the Kafka brokers.
> Having a way to get the worker config would be awesome, and IMO it a nice 
> minimalistic approach. If you think this is a good idea, I can log a JIRA and 
> take it to the dev list. I’m willing to work on it, too.
>
> I’m starting to think that storing state on a separate dedicated topic is the 
> best option, at least for me. First, connector tasks can easily record their 
> state by 

Re: Meaning of request.timeout.ms

2016-01-29 Thread Jason Gustafson
That is correct. KIP-19 has the details:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
.

-Jason

On Fri, Jan 29, 2016 at 3:08 AM, tao xiao  wrote:

> Hi team,
>
> I want to understanding the meaning of request.timeout.ms that is used in
> producer. As per the doc this property is used to expire records that have
> been waiting for response from server for more than request.timeout.ms
> which also means the records have been sitting in InFlightRequests for more
> than request.timeout.ms.
>
> But look at the codebase I discovered that request.timeout.ms is used for
> another purpose: to expire records that sit in accumulator for more than
> request.timeout.ms. I believe records that are in accumulator should not
> be
> in InFlightRequests therefore request.timeout.ms is used to expire two
> types of records: Records that sit in InFlightRequests for more than
> request.timeout.ms and records that sit in accumulator for  more than
> request.timeout.ms. Is this understanding correct?
>