Re: Kafka Streams window retention period question

2017-01-05 Thread Alexander Demidko
Great, thanks for the info guys!

On Thu, Jan 5, 2017 at 10:09 PM, Matthias J. Sax 
wrote:

> Hi Alex,
>
> if a window was purged because its retention time passed it will not
> accept any records anymore -- thus, if a very late record arrives, it
> will get dropped without any further notice.
>
> About stream time and partition: yes. And how time is advanced/tracked
> in independent for the window type.
>
>
> -Matthias
>
> On 1/5/17 9:14 PM, Alexander Demidko wrote:
> > Hi Matthias,
> >
> > Thanks for such a thorough response!
> >
> > I guess there are cases when a determinism might be preferred over
> > computing "more correct results" (e.g. in unit tests, where one manually
> > lays out an order of incoming events and wants to get an exact output),
> but
> > from now on I can simply assume that windows might be stored longer than
> > the specified time.
> >
> > Few more questions if you don't mind.
> >
> > - What should happen when the window 00:00..01:00 will finally get purged
> > (and the internal stream time will get bumped to say time 10:00) but
> then I
> > receive an event ? Will it create the 00:00..01:00 window
> again
> > or the event will be dropped because it's way older than the internal
> > stream time?
> >
> > - I got a bit confused when you mentioned a key in the window name "open
> > (first) >>>b<<<-window". To make it clear – I assume that because in
> Kafka
> > Streams hopping/tumbling windows are aligned, an internal stream time is
> > not related to the aggregation keys but just to the input partitions,
> > right? I.e. if I have only one partition there will be only one internal
> > stream time watermark regardless of how many keys do I have? Will this
> > behavior be the same for sliding windows? Feel free to just point me to
> the
> > code :)
> >
> > Alex
> >
> >
> >> Hi Alexander,
> >>
> >> first, both mailing list should be fine :)
> >>
> >> About internal time tracking: Kafka Streams tracks an internal "stream
> >> time" that is determined as the minimum "partition time" over all its
> >> input partitions.
> >>
> >> The "partition time" is tracked for each input partition individually
> >> and is the minimum timestamp of all currently buffered record of the
> >> partition.
> >>
> >> So depending on how many records from which partitions are fetch on
> >> poll() "stream time" gets advanced accordingly -- this is kinda
> >> non-deterministic because we cannot predict what poll() will return.
> >>
> >> Because all buffered records are considered, "stream time" is advance
> >> conservatively. The main idea about this is to keep windows open longer
> >> if we know in advance that there will be a late record (as we observe
> >> the late record in the buffer already). Thus, we can compute "more
> >> correct" results with regard to late arriving records.
> >> (Just a heads up, we might change this behavior in future releases --
> >> thus, it is not documented anywhere but in the code ;) )
> >>
> >>
> >>
> >> About retention time and purging windows: old windows should be dropped
> >> after "stream time" advances beyond the point on which it is guaranteed
> >> to maintain the window. It should happen "as soon as possible" but there
> >> is no strict guarantee (thus "kept at least").
> >>
> >> Furthermore, Streams applies a minimum retention time of 1 minute --
> >> thus, for your specific use case, the 30 seconds you do specify are not
> >> used (this is unfortunately not documented :( ). However, this in
> >> unrelated to the behavior you see -- I just mention it for completeness.
> >>
> >>
> >> Thus for you specific use case, streams time is most likely not advance
> >> to 5:01 when record  is processed (as record with TS=0:15 is
> >> most likely in the buffer) and thus, the next b-record with TS=15
> >> seconds will be added the the still open (first) b-window and both
> >> values 2 and 10 get added to 12.
> >>
> >>
> >> Also keep in mind that we do some deduplication on KTable result using
> >> an internal cache. This can also influence what output record you see.
> >> For further details see:
> >>
> > http://docs.confluent.io/current/streams/developer-
> guide.html#memory-management
> >
> >
> >> -Matthias
> >
> >
> > On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
> > alexander.demi...@stitchfix.com> wrote:
> >
> >> Hi folks,
> >>
> >> I'm experimenting with Kafka Streams windowed aggregation and came
> across
> >> window retention period behavior I don't fully understand.
> >> I'm using custom timestamp extractor which gets the timestamp from the
> >> payload. Values are aggregated using tumbling time windows and summed by
> >> the key.
> >> I am using kafka and kafka-streams with 0.10.1.1 version.
> >>
> >> Full code can be found at https://gist.github.com/xdralex/
> >> 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the
> following:
> >>
> >> val input: KStream[String, String] = builder.stream(Serdes.String(),
> >> Serdes.String(), "TimeInputTopic")
> >> val windo

Re: Does offsetsForTimes use createtime of logsegment file?

2017-01-05 Thread Vignesh
Thanks. I didn't realize ListOffsetRequestV1 is only available 0.10.1
(which has KIP-33, time index).
When timestamp is set by user (CreationTime), and it is not always
increasing, would this method still return the offset of first message with
timestamp greater than equal to the provided timestamp?


For example, in below scenario

Message1, Timestamp = T1, Offset = 0
Message2, Timestamp = T0 (or T2), Offset = 1
Message3, Timestamp = T1, Offset = 2


Would offsetForTimestamp(T1) return offset for earliest message with
timestamp T1 (i.e. Offset 0 in above example) ?


-Vignesh.

On Thu, Jan 5, 2017 at 8:19 PM, Ewen Cheslack-Postava 
wrote:

> On Wed, Jan 4, 2017 at 11:54 PM, Vignesh  wrote:
>
> > Hi,
> >
> > offsetsForTimes
> >  clients/consumer/
> > KafkaConsumer.html#offsetsForTimes(java.util.Map)>
> > function
> > returns offset for a given timestamp. Does it use message's timestamp
> > (which could be LogAppendTime or set by user) or creation time of
> > logsegment file?
> >
> >
> This is actually tied to how the ListOffsetsRequest is handled. But if
> you're on a recent version, then the KIP
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090
> made it use the more accurate version based on message timestamps.
>
>
> >
> > KIP-33
> >  > 33+-+Add+a+time+based+log+index>
> > adds timestamp based index, and it is available only from 0.10.1 . Does
> >  above function work on 0.10.0 ? If so, are there any differences in how
> it
> > works between versions 0.10.0 and 0.10.1 ?
> >
> >
> The KIP was only adopted and implemented in 0.10.1+. It is not available in
> 0.10.0.
>
>
> > Thanks,
> > Vignesh.
> >
>


Re: Kafka Streams window retention period question

2017-01-05 Thread Matthias J. Sax
Hi Alex,

if a window was purged because its retention time passed it will not
accept any records anymore -- thus, if a very late record arrives, it
will get dropped without any further notice.

About stream time and partition: yes. And how time is advanced/tracked
in independent for the window type.


-Matthias

On 1/5/17 9:14 PM, Alexander Demidko wrote:
> Hi Matthias,
> 
> Thanks for such a thorough response!
> 
> I guess there are cases when a determinism might be preferred over
> computing "more correct results" (e.g. in unit tests, where one manually
> lays out an order of incoming events and wants to get an exact output), but
> from now on I can simply assume that windows might be stored longer than
> the specified time.
> 
> Few more questions if you don't mind.
> 
> - What should happen when the window 00:00..01:00 will finally get purged
> (and the internal stream time will get bumped to say time 10:00) but then I
> receive an event ? Will it create the 00:00..01:00 window again
> or the event will be dropped because it's way older than the internal
> stream time?
> 
> - I got a bit confused when you mentioned a key in the window name "open
> (first) >>>b<<<-window". To make it clear – I assume that because in Kafka
> Streams hopping/tumbling windows are aligned, an internal stream time is
> not related to the aggregation keys but just to the input partitions,
> right? I.e. if I have only one partition there will be only one internal
> stream time watermark regardless of how many keys do I have? Will this
> behavior be the same for sliding windows? Feel free to just point me to the
> code :)
> 
> Alex
> 
> 
>> Hi Alexander,
>>
>> first, both mailing list should be fine :)
>>
>> About internal time tracking: Kafka Streams tracks an internal "stream
>> time" that is determined as the minimum "partition time" over all its
>> input partitions.
>>
>> The "partition time" is tracked for each input partition individually
>> and is the minimum timestamp of all currently buffered record of the
>> partition.
>>
>> So depending on how many records from which partitions are fetch on
>> poll() "stream time" gets advanced accordingly -- this is kinda
>> non-deterministic because we cannot predict what poll() will return.
>>
>> Because all buffered records are considered, "stream time" is advance
>> conservatively. The main idea about this is to keep windows open longer
>> if we know in advance that there will be a late record (as we observe
>> the late record in the buffer already). Thus, we can compute "more
>> correct" results with regard to late arriving records.
>> (Just a heads up, we might change this behavior in future releases --
>> thus, it is not documented anywhere but in the code ;) )
>>
>>
>>
>> About retention time and purging windows: old windows should be dropped
>> after "stream time" advances beyond the point on which it is guaranteed
>> to maintain the window. It should happen "as soon as possible" but there
>> is no strict guarantee (thus "kept at least").
>>
>> Furthermore, Streams applies a minimum retention time of 1 minute --
>> thus, for your specific use case, the 30 seconds you do specify are not
>> used (this is unfortunately not documented :( ). However, this in
>> unrelated to the behavior you see -- I just mention it for completeness.
>>
>>
>> Thus for you specific use case, streams time is most likely not advance
>> to 5:01 when record  is processed (as record with TS=0:15 is
>> most likely in the buffer) and thus, the next b-record with TS=15
>> seconds will be added the the still open (first) b-window and both
>> values 2 and 10 get added to 12.
>>
>>
>> Also keep in mind that we do some deduplication on KTable result using
>> an internal cache. This can also influence what output record you see.
>> For further details see:
>>
> http://docs.confluent.io/current/streams/developer-guide.html#memory-management
> 
> 
>> -Matthias
> 
> 
> On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
> alexander.demi...@stitchfix.com> wrote:
> 
>> Hi folks,
>>
>> I'm experimenting with Kafka Streams windowed aggregation and came across
>> window retention period behavior I don't fully understand.
>> I'm using custom timestamp extractor which gets the timestamp from the
>> payload. Values are aggregated using tumbling time windows and summed by
>> the key.
>> I am using kafka and kafka-streams with 0.10.1.1 version.
>>
>> Full code can be found at https://gist.github.com/xdralex/
>> 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the following:
>>
>> val input: KStream[String, String] = builder.stream(Serdes.String(),
>> Serdes.String(), "TimeInputTopic")
>> val window: Windows[TimeWindow] = TimeWindows.of(6).
>> advanceBy(6).until(3)
>>
>> val aggregated: KTable[Windowed[String], JInt] = input
>>   .mapValues((v: String) => parse(v)._2)
>>   .groupByKey(Serdes.String(), Serdes.Integer())
>>   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
>> "TimeStor

Re: Kafka Streams window retention period question

2017-01-05 Thread Sachin Mittal
What should happen when the window 00:00..01:00 will finally get purged
(and the internal stream time will get bumped to say time 10:00) but then I
receive an event ? Will it create the 00:00..01:00 window again
or the event will be dropped because it's way older than the internal
stream time?

Once window is finally purged and you get a late message for that window
again, it will create that window, aggregate single value to it and again
drop that window.
So this will be of not much use. Suggestion here is to keep the window
until sufficient time so that you don't get any late message.
Another thing I have found is that if until is long enough compaction (and
deletion in later release), will trigger only after until, and your
changelog topics will be huge. This may result in high local state restore
time if streams are resumed. Also in general I see more lag in case until
is set way high.

Thanks
Sachin



On Fri, Jan 6, 2017 at 10:44 AM, Alexander Demidko <
alexander.demi...@stitchfix.com> wrote:

> Hi Matthias,
>
> Thanks for such a thorough response!
>
> I guess there are cases when a determinism might be preferred over
> computing "more correct results" (e.g. in unit tests, where one manually
> lays out an order of incoming events and wants to get an exact output), but
> from now on I can simply assume that windows might be stored longer than
> the specified time.
>
> Few more questions if you don't mind.
>
> - What should happen when the window 00:00..01:00 will finally get purged
> (and the internal stream time will get bumped to say time 10:00) but then I
> receive an event ? Will it create the 00:00..01:00 window again
> or the event will be dropped because it's way older than the internal
> stream time?
>
> - I got a bit confused when you mentioned a key in the window name "open
> (first) >>>b<<<-window". To make it clear – I assume that because in Kafka
> Streams hopping/tumbling windows are aligned, an internal stream time is
> not related to the aggregation keys but just to the input partitions,
> right? I.e. if I have only one partition there will be only one internal
> stream time watermark regardless of how many keys do I have? Will this
> behavior be the same for sliding windows? Feel free to just point me to the
> code :)
>
> Alex
>
>
> > Hi Alexander,
> >
> > first, both mailing list should be fine :)
> >
> > About internal time tracking: Kafka Streams tracks an internal "stream
> > time" that is determined as the minimum "partition time" over all its
> > input partitions.
> >
> > The "partition time" is tracked for each input partition individually
> > and is the minimum timestamp of all currently buffered record of the
> > partition.
> >
> > So depending on how many records from which partitions are fetch on
> > poll() "stream time" gets advanced accordingly -- this is kinda
> > non-deterministic because we cannot predict what poll() will return.
> >
> > Because all buffered records are considered, "stream time" is advance
> > conservatively. The main idea about this is to keep windows open longer
> > if we know in advance that there will be a late record (as we observe
> > the late record in the buffer already). Thus, we can compute "more
> > correct" results with regard to late arriving records.
> > (Just a heads up, we might change this behavior in future releases --
> > thus, it is not documented anywhere but in the code ;) )
> >
> >
> >
> > About retention time and purging windows: old windows should be dropped
> > after "stream time" advances beyond the point on which it is guaranteed
> > to maintain the window. It should happen "as soon as possible" but there
> > is no strict guarantee (thus "kept at least").
> >
> > Furthermore, Streams applies a minimum retention time of 1 minute --
> > thus, for your specific use case, the 30 seconds you do specify are not
> > used (this is unfortunately not documented :( ). However, this in
> > unrelated to the behavior you see -- I just mention it for completeness.
> >
> >
> > Thus for you specific use case, streams time is most likely not advance
> > to 5:01 when record  is processed (as record with TS=0:15 is
> > most likely in the buffer) and thus, the next b-record with TS=15
> > seconds will be added the the still open (first) b-window and both
> > values 2 and 10 get added to 12.
> >
> >
> > Also keep in mind that we do some deduplication on KTable result using
> > an internal cache. This can also influence what output record you see.
> > For further details see:
> >
> http://docs.confluent.io/current/streams/developer-
> guide.html#memory-management
>
>
> > -Matthias
>
>
> On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
> alexander.demi...@stitchfix.com> wrote:
>
> > Hi folks,
> >
> > I'm experimenting with Kafka Streams windowed aggregation and came across
> > window retention period behavior I don't fully understand.
> > I'm using custom timestamp extractor which gets the timestamp from the
> > payload. Values are aggregated us

Re: One big kafka connect cluster or many small ones?

2017-01-05 Thread Stephane Maarek
Thanks!
So I just override the conf while doing the API call? It’d be great to have
this documented somewhere on the confluent website. I couldn’t find it.

On 6 January 2017 at 3:42:45 pm, Ewen Cheslack-Postava (e...@confluent.io)
wrote:

On Thu, Jan 5, 2017 at 7:19 PM, Stephane Maarek <
steph...@simplemachines.com.au> wrote:

> Thanks a lot for the guidance, I think we’ll go ahead with one cluster. I
> just need to figure out how our CD pipeline can talk to our connect cluster
> securely (because it’ll need direct access to perform API calls).
>

The documentation isn't great here, but you can apply all the normal
security configs to Connect (in distributed mode, it's basically equivalent
to a consumer, so everything you can do with a consumer you can do with
Connect).


>
> Lastly, a question or maybe a piece of feedback… is it not possible to
> specify the key serializer and deserializer as part of the rest api job
> config?
> The issue is that sometimes our data is avro, sometimes it’s json. And it
> seems I’d need two separate clusters for that?
>

This is new! As of 0.10.1.0, we have
https://cwiki.apache.org/confluence/display/KAFKA/KIP-75+-+Add+per-connector+Converters
which allows you to include it in the connector config. It's called
"Converter" in Connect because it does a bit more than ser/des if you've
written them for Kafka, but they are basically just pluggable ser/des. We
knew folks would want this, it just took us awhile to find the bandwidth to
implement it. Now, you shouldn't need to do anything special or deploy
multiple clusters -- it's baked in and supported as long as you are willing
to override it on a per-connector basis (and this seems reasonable for most
folks since *ideally* you are *somewhat* standardized on a common
serialization format).

-Ewen


>
> On 6 January 2017 at 1:54:10 pm, Ewen Cheslack-Postava (e...@confluent.io)
> wrote:
>
> On Thu, Jan 5, 2017 at 3:12 PM, Stephane Maarek <
> steph...@simplemachines.com.au> wrote:
>
> > Hi,
> >
> > We like to operate in micro-services (dockerize and ship everything on
> ecs)
> > and I was wondering which approach was preferred.
> > We have one kafka cluster, one zookeeper cluster, etc, but when it comes
> to
> > kafka connect I have some doubts.
> >
> > Is it better to have one big kafka connect with multiple nodes, or many
> > small kafka connect clusters or standalone, for each connector / etl ?
> >
>
> You can do any of these, and it may depend on how you do
> orchestration/deployment.
>
> We built Connect to support running one big cluster running a bunch of
> connectors. It balances work automatically and provides a way to control
> scale up/down via increased parallelism. This means we don't need to make
> any assumptions about how you deploy, how you handle elastically scaling
> your clusters, etc. But if you run in an environment and have the tooling
> in place to do that already, you can also opt to run many smaller clusters
> and use that tooling to scale up/down. In that case you'd just make sure
> there were enough tasks for each connector so that when you scale the # of
> workers for a cluster up the rebalancing of work would ensure there was
> enough tasks for every worker to remain occupied.
>
> The main drawback of doing this is that Connect uses a few topics to for
> configs, status, and offsets and you need these to be unique per cluster.
> This means you'll have 3N more topics. If you're running a *lot* of
> connectors, that could eventually become a problem. It also means you have
> that many more worker configs to handle, clusters to monitor, etc. And
> deploying a connector no longer becomes as simple as just making a call to
> the service's REST API since there isn't a single centralized service. The
> main benefits I can think of are a) if you already have preferred tooling
> for handling elasticity and b) better resource isolation between connectors
> (i.e. an OOM error in one connector won't affect any other connectors).
>
> For standalone mode, we'd generally recommend only using it when
> distributed mode doesn't make sense, e.g. for log file collection. Other
> than that, having the fault tolerance and high availability of distributed
> mode is preferred.
>
> On your specific points:
>
> >
> > The issues I’m trying to address are :
> > - Integration with our CI/CD pipeline
> >
>
> I'm not sure anything about Connect affects this. Is there a specific
> concern you have about the CI/CD pipeline & Connect?
>
>
> > - Efficient resources utilisation
> >
>
> Putting all the connectors into one cluster will probably result in better
> resource utilization unless you're already automatically tracking usage and
> scaling appropriately. The reason is that if you use a bunch of small
> clusters, you're now stuck trying to optimize N uses. Since Connect can
> already (roughly) balance work, putting all the work into one cluster and
> having connect split it up means you just need to watch utilization of the
> node

Re: Kafka Streams window retention period question

2017-01-05 Thread Alexander Demidko
Hi Matthias,

Thanks for such a thorough response!

I guess there are cases when a determinism might be preferred over
computing "more correct results" (e.g. in unit tests, where one manually
lays out an order of incoming events and wants to get an exact output), but
from now on I can simply assume that windows might be stored longer than
the specified time.

Few more questions if you don't mind.

- What should happen when the window 00:00..01:00 will finally get purged
(and the internal stream time will get bumped to say time 10:00) but then I
receive an event ? Will it create the 00:00..01:00 window again
or the event will be dropped because it's way older than the internal
stream time?

- I got a bit confused when you mentioned a key in the window name "open
(first) >>>b<<<-window". To make it clear – I assume that because in Kafka
Streams hopping/tumbling windows are aligned, an internal stream time is
not related to the aggregation keys but just to the input partitions,
right? I.e. if I have only one partition there will be only one internal
stream time watermark regardless of how many keys do I have? Will this
behavior be the same for sliding windows? Feel free to just point me to the
code :)

Alex


> Hi Alexander,
>
> first, both mailing list should be fine :)
>
> About internal time tracking: Kafka Streams tracks an internal "stream
> time" that is determined as the minimum "partition time" over all its
> input partitions.
>
> The "partition time" is tracked for each input partition individually
> and is the minimum timestamp of all currently buffered record of the
> partition.
>
> So depending on how many records from which partitions are fetch on
> poll() "stream time" gets advanced accordingly -- this is kinda
> non-deterministic because we cannot predict what poll() will return.
>
> Because all buffered records are considered, "stream time" is advance
> conservatively. The main idea about this is to keep windows open longer
> if we know in advance that there will be a late record (as we observe
> the late record in the buffer already). Thus, we can compute "more
> correct" results with regard to late arriving records.
> (Just a heads up, we might change this behavior in future releases --
> thus, it is not documented anywhere but in the code ;) )
>
>
>
> About retention time and purging windows: old windows should be dropped
> after "stream time" advances beyond the point on which it is guaranteed
> to maintain the window. It should happen "as soon as possible" but there
> is no strict guarantee (thus "kept at least").
>
> Furthermore, Streams applies a minimum retention time of 1 minute --
> thus, for your specific use case, the 30 seconds you do specify are not
> used (this is unfortunately not documented :( ). However, this in
> unrelated to the behavior you see -- I just mention it for completeness.
>
>
> Thus for you specific use case, streams time is most likely not advance
> to 5:01 when record  is processed (as record with TS=0:15 is
> most likely in the buffer) and thus, the next b-record with TS=15
> seconds will be added the the still open (first) b-window and both
> values 2 and 10 get added to 12.
>
>
> Also keep in mind that we do some deduplication on KTable result using
> an internal cache. This can also influence what output record you see.
> For further details see:
>
http://docs.confluent.io/current/streams/developer-guide.html#memory-management


> -Matthias


On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
alexander.demi...@stitchfix.com> wrote:

> Hi folks,
>
> I'm experimenting with Kafka Streams windowed aggregation and came across
> window retention period behavior I don't fully understand.
> I'm using custom timestamp extractor which gets the timestamp from the
> payload. Values are aggregated using tumbling time windows and summed by
> the key.
> I am using kafka and kafka-streams with 0.10.1.1 version.
>
> Full code can be found at https://gist.github.com/xdralex/
> 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the following:
>
> val input: KStream[String, String] = builder.stream(Serdes.String(),
> Serdes.String(), "TimeInputTopic")
> val window: Windows[TimeWindow] = TimeWindows.of(6).
> advanceBy(6).until(3)
>
> val aggregated: KTable[Windowed[String], JInt] = input
>   .mapValues((v: String) => parse(v)._2)
>   .groupByKey(Serdes.String(), Serdes.Integer())
>   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
> "TimeStore1")
>
> aggregated.foreach {
>   (w: Windowed[String], s: JInt) =>
> val start = new DateTime(w.window().start(), DateTimeZone.UTC)
> val end = new DateTime(w.window().end(), DateTimeZone.UTC)
> println(s"Aggregated: $start..$end - ${w.key()} - $s")
> }
>
> Here is the data being sent to TimeInputTopic:
> a,1970-01-01T00:00:00Z 1
> b,1970-01-01T00:00:01Z 2
> a,1970-01-01T00:00:02Z 3
> b,1970-01-01T00:05:01Z 10
> b,1970-01-01T00:00:15Z 10
>
> Here is the output:
> Aggregated: 1970-01-01T00

RE: Apache Kafka integration using Apache Camel

2017-01-05 Thread Gupta, Swati
Yes, the kafka console consumer displays the message correctly.
I also tested the same with a Java application, it works fine. There seems to 
be an issue with Camel route trying to consume.

There is no error in the console. But, the logs show as below:
kafka.KafkaCamelTestConsumer
Connected to the target VM, address: '127.0.0.1:65007', transport: 'socket'
PID_IS_UNDEFINED: INFO  DefaultCamelContext - Apache Camel 2.17.0 
(CamelContext: camel-1) is starting
PID_IS_UNDEFINED: INFO  ManagedManagementStrategy - JMX is enabled
PID_IS_UNDEFINED: INFO  DefaultTypeConverter - Loaded 183 type converters
PID_IS_UNDEFINED: INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint 
registry is in extended mode gathering usage statistics of all incoming and 
outgoing endpoints (cache limit: 1000)
PID_IS_UNDEFINED: INFO  DefaultCamelContext - AllowUseOriginalMessage is 
enabled. If access to the original message is not needed, then its recommended 
to turn this option off as it may improve performance.
PID_IS_UNDEFINED: INFO  DefaultCamelContext - StreamCaching is not in use. If 
using streams then its recommended to enable stream caching. See more details 
at http://camel.apache.org/stream-caching.html
PID_IS_UNDEFINED: INFO  KafkaConsumer - Starting Kafka consumer
PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values: 
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = 
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1024
group.id = testing
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 4
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 3
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer

PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values: 
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = consumer-1
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1024
group.id = testing
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 4
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 3
ssl.cipher.suites = nu

Re: Apache Kafka integration using Apache Camel

2017-01-05 Thread Ewen Cheslack-Postava
More generally, do you have any log errors/messages or additional info?
It's tough to debug issues like this from 3rd party libraries if they don't
provide logs/exception info that indicates why processing a specific
message failed.

-Ewen

On Thu, Jan 5, 2017 at 8:29 PM, UMESH CHAUDHARY  wrote:

> Did you test that kafka console consumer is displaying the produced
> message?
>
> On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati  wrote:
>
> > Hello All,
> >
> >
> >
> > I am trying to create a Consumer using Apache Camel for a topic in Apache
> > Kafka.
> > I am using Camel 2.17.0 and Kafka 0.10  and JDK 1.8.
> > I have attached a file, KafkaCamelTestConsumer.java which is a standalone
> > application trying to read from a topic  “test1”created in Apache Kafka
> > I am producing messages from the console and also was successful to
> > produce messages using a Camel program in the topic "test1", but not able
> > to consume messages. Ideally, it should get printed, but nothing seems to
> > happen. The log says that the route has started but does not process any
> > message.
> >
> > Please help to confirm if there is anything wrong with the below syntax:
> >
> > from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> autoOffsetReset=earliest"
> > *+
> >
> > *"&consumersCount=1&keyDeserializer=org.apache.
> kafka.common.serialization.StringDeserializer&"
> > *+
> > *"valueDeserializer=org.apache.kafka.common.serialization.
> StringDeserializer"
> > *+
> > *"&autoCommitIntervalMs=1000&sessionTimeoutMs=3&
> autoCommitEnable=true"*
> > ).split()
> > .body()
> > .process(*new *Processor() {
> > @Override
> > *public void *process(Exchange exchange)
> > *throws *Exception {
> > String messageKey = *""*;
> > *if *(exchange.getIn() != *null*) {
> > Message message = exchange.getIn();
> > Integer partitionId = (Integer) message
> > .getHeader(KafkaConstants.*
> PARTITION*
> > );
> > String topicName = (String) message
> > .getHeader(KafkaConstants.*TOPIC*);
> > *if *(message.getHeader(
> KafkaConstants.*KEY*)
> > != *null*)
> > messageKey = (String) message
> > .getHeader(KafkaConstants.*
> KEY*);
> > Object data = message.getBody();
> >
> >
> > System.*out*.println(
> > *"topicName :: " *+ topicName +
> > *" partitionId :: " *+ partitionId +
> > *" messageKey :: " *+ messageKey +
> > *" message :: " *+ data +
> *"**\n**"*);
> > }
> > }
> > }).to(
> > *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*);
> > }
> > });
> >
> >
> >
> > I have also tried with the basic parameters as below and it still fails
> to
> > read messages.
> >
> > from(
> > *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> autoOffsetReset=earliest")*
> >
> > Any help on this will be greatly appreciated.
> >
> > Thanks in advance
> >
> >
> >
> > Thanks & Regards
> >
> > Swati
> >
> > --
> > This e-mail and any attachments to it (the "Communication") is, unless
> > otherwise stated, confidential, may contain copyright material and is for
> > the use only of the intended recipient. If you receive the Communication
> in
> > error, please notify the sender immediately by return e-mail, delete the
> > Communication and the return e-mail, and do not read, copy, retransmit or
> > otherwise deal with it. Any views expressed in the Communication are
> those
> > of the individual sender only, unless expressly stated to be those of
> > Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or
> any
> > of its related entities including ANZ Bank New Zealand Limited (together
> > "ANZ"). ANZ does not accept liability in connection with the integrity of
> > or errors in the Communication, computer virus, data corruption,
> > interference or delay arising from or in respect of the Communication.
> >
> >
>


Re: One big kafka connect cluster or many small ones?

2017-01-05 Thread Ewen Cheslack-Postava
On Thu, Jan 5, 2017 at 7:19 PM, Stephane Maarek <
steph...@simplemachines.com.au> wrote:

> Thanks a lot for the guidance, I think we’ll go ahead with one cluster. I
> just need to figure out how our CD pipeline can talk to our connect cluster
> securely (because it’ll need direct access to perform API calls).
>

The documentation isn't great here, but you can apply all the normal
security configs to Connect (in distributed mode, it's basically equivalent
to a consumer, so everything you can do with a consumer you can do with
Connect).


>
> Lastly, a question or maybe a piece of feedback… is it not possible to
> specify the key serializer and deserializer as part of the rest api job
> config?
> The issue is that sometimes our data is avro, sometimes it’s json. And it
> seems I’d need two separate clusters for that?
>

This is new! As of 0.10.1.0, we have
https://cwiki.apache.org/confluence/display/KAFKA/KIP-75+-+Add+per-connector+Converters
which allows you to include it in the connector config. It's called
"Converter" in Connect because it does a bit more than ser/des if you've
written them for Kafka, but they are basically just pluggable ser/des. We
knew folks would want this, it just took us awhile to find the bandwidth to
implement it. Now, you shouldn't need to do anything special or deploy
multiple clusters -- it's baked in and supported as long as you are willing
to override it on a per-connector basis (and this seems reasonable for most
folks since *ideally* you are *somewhat* standardized on a common
serialization format).

-Ewen


>
> On 6 January 2017 at 1:54:10 pm, Ewen Cheslack-Postava (e...@confluent.io)
> wrote:
>
> On Thu, Jan 5, 2017 at 3:12 PM, Stephane Maarek <
> steph...@simplemachines.com.au> wrote:
>
> > Hi,
> >
> > We like to operate in micro-services (dockerize and ship everything on
> ecs)
> > and I was wondering which approach was preferred.
> > We have one kafka cluster, one zookeeper cluster, etc, but when it comes
> to
> > kafka connect I have some doubts.
> >
> > Is it better to have one big kafka connect with multiple nodes, or many
> > small kafka connect clusters or standalone, for each connector / etl ?
> >
>
> You can do any of these, and it may depend on how you do
> orchestration/deployment.
>
> We built Connect to support running one big cluster running a bunch of
> connectors. It balances work automatically and provides a way to control
> scale up/down via increased parallelism. This means we don't need to make
> any assumptions about how you deploy, how you handle elastically scaling
> your clusters, etc. But if you run in an environment and have the tooling
> in place to do that already, you can also opt to run many smaller clusters
> and use that tooling to scale up/down. In that case you'd just make sure
> there were enough tasks for each connector so that when you scale the # of
> workers for a cluster up the rebalancing of work would ensure there was
> enough tasks for every worker to remain occupied.
>
> The main drawback of doing this is that Connect uses a few topics to for
> configs, status, and offsets and you need these to be unique per cluster.
> This means you'll have 3N more topics. If you're running a *lot* of
> connectors, that could eventually become a problem. It also means you have
> that many more worker configs to handle, clusters to monitor, etc. And
> deploying a connector no longer becomes as simple as just making a call to
> the service's REST API since there isn't a single centralized service. The
> main benefits I can think of are a) if you already have preferred tooling
> for handling elasticity and b) better resource isolation between
> connectors
> (i.e. an OOM error in one connector won't affect any other connectors).
>
> For standalone mode, we'd generally recommend only using it when
> distributed mode doesn't make sense, e.g. for log file collection. Other
> than that, having the fault tolerance and high availability of distributed
> mode is preferred.
>
> On your specific points:
>
> >
> > The issues I’m trying to address are :
> > - Integration with our CI/CD pipeline
> >
>
> I'm not sure anything about Connect affects this. Is there a specific
> concern you have about the CI/CD pipeline & Connect?
>
>
> > - Efficient resources utilisation
> >
>
> Putting all the connectors into one cluster will probably result in better
> resource utilization unless you're already automatically tracking usage
> and
> scaling appropriately. The reason is that if you use a bunch of small
> clusters, you're now stuck trying to optimize N uses. Since Connect can
> already (roughly) balance work, putting all the work into one cluster and
> having connect split it up means you just need to watch utilization of the
> nodes in that one cluster and scale up or down as appropriate.
>
>
> > - Easily add new jar files that connectors depend on with minimal
> downtime
> >
>
> This one is a bit interesting. You shouldn't have any downtime adding jars
> in the 

Re: Apache Kafka integration using Apache Camel

2017-01-05 Thread UMESH CHAUDHARY
Did you test that kafka console consumer is displaying the produced
message?

On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati  wrote:

> Hello All,
>
>
>
> I am trying to create a Consumer using Apache Camel for a topic in Apache
> Kafka.
> I am using Camel 2.17.0 and Kafka 0.10  and JDK 1.8.
> I have attached a file, KafkaCamelTestConsumer.java which is a standalone
> application trying to read from a topic  “test1”created in Apache Kafka
> I am producing messages from the console and also was successful to
> produce messages using a Camel program in the topic "test1", but not able
> to consume messages. Ideally, it should get printed, but nothing seems to
> happen. The log says that the route has started but does not process any
> message.
>
> Please help to confirm if there is anything wrong with the below syntax:
>
> from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest"
> *+
>
> *"&consumersCount=1&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
> *+
> *"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
> *+
> *"&autoCommitIntervalMs=1000&sessionTimeoutMs=3&autoCommitEnable=true"*
> ).split()
> .body()
> .process(*new *Processor() {
> @Override
> *public void *process(Exchange exchange)
> *throws *Exception {
> String messageKey = *""*;
> *if *(exchange.getIn() != *null*) {
> Message message = exchange.getIn();
> Integer partitionId = (Integer) message
> .getHeader(KafkaConstants.*PARTITION*
> );
> String topicName = (String) message
> .getHeader(KafkaConstants.*TOPIC*);
> *if *(message.getHeader(KafkaConstants.*KEY*)
> != *null*)
> messageKey = (String) message
> .getHeader(KafkaConstants.*KEY*);
> Object data = message.getBody();
>
>
> System.*out*.println(
> *"topicName :: " *+ topicName +
> *" partitionId :: " *+ partitionId +
> *" messageKey :: " *+ messageKey +
> *" message :: " *+ data + *"**\n**"*);
> }
> }
> }).to(
> *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*);
> }
> });
>
>
>
> I have also tried with the basic parameters as below and it still fails to
> read messages.
>
> from(
> *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest")*
>
> Any help on this will be greatly appreciated.
>
> Thanks in advance
>
>
>
> Thanks & Regards
>
> Swati
>
> --
> This e-mail and any attachments to it (the "Communication") is, unless
> otherwise stated, confidential, may contain copyright material and is for
> the use only of the intended recipient. If you receive the Communication in
> error, please notify the sender immediately by return e-mail, delete the
> Communication and the return e-mail, and do not read, copy, retransmit or
> otherwise deal with it. Any views expressed in the Communication are those
> of the individual sender only, unless expressly stated to be those of
> Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any
> of its related entities including ANZ Bank New Zealand Limited (together
> "ANZ"). ANZ does not accept liability in connection with the integrity of
> or errors in the Communication, computer virus, data corruption,
> interference or delay arising from or in respect of the Communication.
>
>


Re: Consumer Rebalancing Question

2017-01-05 Thread Ewen Cheslack-Postava
Not sure I understand your question about flapping. The LeaveGroupRequest
is only sent on a graceful shutdown. If a consumer knows it is going to
shutdown, it is good to proactively make sure the group knows it needs to
rebalance work because some of the partitions that were handled by the
consumer need to be handled by some other group members.

There's no "flapping" in the sense that the leave group requests should
just inform the other members that they need to take over some of the work.
I would normally think of "flapping" as meaning that things start/stop
unnecessarily. In this case, *someone* needs to deal with the rebalance and
pick up the work being dropped by the worker. There's no flapping because
it's a one-time event -- one worker is shutting down, decides to drop the
work, and a rebalance sorts it out and reassigns it to another member of
the group. This happens once and then the "issue" is resolved without any
additional interruptions.

-Ewen

On Thu, Jan 5, 2017 at 3:01 PM, Pradeep Gollakota 
wrote:

> I see... doesn't that cause flapping though?
>
> On Wed, Jan 4, 2017 at 8:22 PM, Ewen Cheslack-Postava 
> wrote:
>
> > The coordinator will immediately move the group into a rebalance if it
> > needs it. The reason LeaveGroupRequest was added was to avoid having to
> > wait for the session timeout before completing a rebalance. So aside from
> > the latency of cleanup/committing offests/rejoining after a heartbeat,
> > rolling bounces should be fast for consumer groups.
> >
> > -Ewen
> >
> > On Wed, Jan 4, 2017 at 5:19 PM, Pradeep Gollakota 
> > wrote:
> >
> > > Hi Kafka folks!
> > >
> > > When a consumer is closed, it will issue a LeaveGroupRequest. Does
> anyone
> > > know how long the coordinator waits before reassigning the partitions
> > that
> > > were assigned to the leaving consumer to a new consumer? I ask because
> > I'm
> > > trying to understand the behavior of consumers if you're doing a
> rolling
> > > restart.
> > >
> > > Thanks!
> > > Pradeep
> > >
> >
>


Re: MirrorMaker - Topics Identification and Replication

2017-01-05 Thread Ewen Cheslack-Postava
That all sounds right!

Usually the delay for picking up the metadata update and starting to
replicate the topic won't be an issue since it's a one-time issue (around
topic creation) and the time window is pretty small for that. In steady
state, none of the mentioned delays would apply.

-Ewen

On Thu, Jan 5, 2017 at 3:46 AM, Greenhorn Techie 
wrote:

> Thanks Ewen for your response.
>
> Just to summarise, here is my understanding. Apologies if something is
> mis-understood. I am new to Kafka and hence still short in knowledge.
>
>
>- MirrorMarker process automatically picks-up new topics added on the
>source cluster and hence no restart of the process is needed at regular
>intervals to update the list of topics
>- For existing topics, MirrorMaker will replicate messages to the target
>kafka cluster as and when it sees data in the source kafka cluster's
> topics
>- However for new topics, there might be a delay of up to (default) 5
>min i.e. metadata refresh interval to start replicating the data to the
>target kafka cluster
>
> Please let me know if something is wrong in my understanding.
>
> Thanks
>
> On Tue, 3 Jan 2017 at 23:24 Ewen Cheslack-Postava 
> wrote:
>
> > Yes, the consumer will pick up the new topics when it refreshes metadata
> > (defaults to every 5 min) and start subscribing to the new topics.
> >
> > -Ewen
> >
> > On Tue, Jan 3, 2017 at 3:07 PM, Greenhorn Techie <
> > greenhorntec...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I am new to Kafka and as well as MirrorMaker. So wondering whether MM
> > would
> > > pick-up new topics that are created on the source cluster
> automatically,
> > > provided topic matches the while list pattern?
> > >
> > > For example, if I start MM as below, would it replicate any new topics
> > that
> > > are created after the MM process is launched?
> > >
> > > kafka-mirror-maker.sh --new.consumer --consumer.config
> /opt/kafka/config/
> > > consumer.properties --producer.config /opt/kafka/config/producer.
> > > properties
> > > --whitelist ".*"
> > >
> > > Or is there a need to restart MM process when a new topic / pattern is
> > > started on the source cluster?
> > >
> > > What should be the recommended approach in regards to this?
> > >
> > > Thanks
> > >
> >
>


Re: Does offsetsForTimes use createtime of logsegment file?

2017-01-05 Thread Ewen Cheslack-Postava
On Wed, Jan 4, 2017 at 11:54 PM, Vignesh  wrote:

> Hi,
>
> offsetsForTimes
>  KafkaConsumer.html#offsetsForTimes(java.util.Map)>
> function
> returns offset for a given timestamp. Does it use message's timestamp
> (which could be LogAppendTime or set by user) or creation time of
> logsegment file?
>
>
This is actually tied to how the ListOffsetsRequest is handled. But if
you're on a recent version, then the KIP
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090
made it use the more accurate version based on message timestamps.


>
> KIP-33
>  33+-+Add+a+time+based+log+index>
> adds timestamp based index, and it is available only from 0.10.1 . Does
>  above function work on 0.10.0 ? If so, are there any differences in how it
> works between versions 0.10.0 and 0.10.1 ?
>
>
The KIP was only adopted and implemented in 0.10.1+. It is not available in
0.10.0.


> Thanks,
> Vignesh.
>


Re: Is this a bug or just unintuitive behavior?

2017-01-05 Thread Ewen Cheslack-Postava
The basic issue here is just that the auto.offset.reset defaults to latest,
right? That's not a very good setting for a mirroring tool and this seems
like something we might just want to change the default for. It's debatable
whether it would even need a KIP.

We have other settings in MM where we override them if they aren't set
explicitly but we don't want the normal defaults. Most are producer
properties to avoid duplicates (the acks, retries, max.block.ms, and
max.in.flight.requests.per.connection settings), but there are a couple of
consumer ones too (auto.commit.enable and consumer.timeout.ms).

This is probably something like a 1-line MM patch if someone wants to
tackle it -- the question of whether it needs a KIP or not is,
unfortunately, the more complicated question :(

-Ewen

On Thu, Jan 5, 2017 at 1:10 PM, James Cheng  wrote:

>
> > On Jan 5, 2017, at 12:57 PM, Jeff Widman  wrote:
> >
> > Thanks James and Hans.
> >
> > Will this also happen when we expand the number of partitions in a topic?
> >
> > That also will trigger a rebalance, the consumer won't subscribe to the
> > partition until the rebalance finishes, etc.
> >
> > So it'd seem that any messages published to the new partition in between
> > the partition creation and the rebalance finishing won't be consumed by
> any
> > consumers that have offset=latest
> >
>
> It hadn't occured to me until you mentioned it, but yes, I think it'd also
> happen in those cases.
>
> In the kafka consumer javadocs, they provide a list of things that would
> cause a rebalance:
> http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/
> KafkaConsumer.html#subscribe(java.util.Collection,%20org.
> apache.kafka.clients.consumer.ConsumerRebalanceListener) <
> http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/
> KafkaConsumer.html#subscribe(java.util.Collection,
> org.apache.kafka.clients.consumer.ConsumerRebalanceListener)>
>
> "As part of group management, the consumer will keep track of the list of
> consumers that belong to a particular group and will trigger a rebalance
> operation if one of the following events trigger -
>
> Number of partitions change for any of the subscribed list of topics
> Topic is created or deleted
> An existing member of the consumer group dies
> A new member is added to an existing consumer group via the join API
> "
>
> I'm guessing that this would affect any of those scenarios.
>
> -James
>
>
> >
> >
> >
> > On Thu, Jan 5, 2017 at 12:40 AM, James Cheng 
> wrote:
> >
> >> Jeff,
> >>
> >> Your analysis is correct. I would say that it is known but unintuitive
> >> behavior.
> >>
> >> As an example of a problem caused by this behavior, it's possible for
> >> mirrormaker to miss messages on newly created topics, even thought it
> was
> >> subscribed to them before topics were creted.
> >>
> >> See the following JIRAs:
> >> https://issues.apache.org/jira/browse/KAFKA-3848 <
> >> https://issues.apache.org/jira/browse/KAFKA-3848>
> >> https://issues.apache.org/jira/browse/KAFKA-3370 <
> >> https://issues.apache.org/jira/browse/KAFKA-3370>
> >>
> >> -James
> >>
> >>> On Jan 4, 2017, at 4:37 PM, h...@confluent.io wrote:
> >>>
> >>> This sounds exactly as I would expect things to behave. If you consume
> >> from the beginning I would think you would get all the messages but not
> if
> >> you consume from the latest offset. You can separately tune the metadata
> >> refresh interval if you want to miss fewer messages but that still won't
> >> get you all messages from the beginning if you don't explicitly consume
> >> from the beginning.
> >>>
> >>> Sent from my iPhone
> >>>
>  On Jan 4, 2017, at 6:53 PM, Jeff Widman  wrote:
> 
>  I'm seeing consumers miss messages when they subscribe before the
> topic
> >> is
>  actually created.
> 
>  Scenario:
>  1) kafka 0.10.1.1 cluster with allow-topic no topics, but supports
> topic
>  auto-creation as soon as a message is published to the topic
>  2) consumer subscribes using topic string or a regex pattern.
> Currently
> >> no
>  topics match. Consumer offset is "latest"
>  3) producer publishes to a topic that matches the string or regex
> >> pattern.
>  4) broker immediately creates a topic, writes the message, and also
>  notifies the consumer group that a rebalance needs to happen to assign
> >> the
>  topic_partition to one of the consumers..
>  5) rebalance is fairly quick, maybe a second or so
>  6) a consumer is assigned to the newly-created topic_partition
> 
>  At this point, we've got a consumer steadily polling the recently
> >> created
>  topic_partition. However, the consumer.poll() never returns any
> messages
>  published between topic creation and when the consumer was assigned to
> >> the
>  topic_partition. I'm guessing this may be because when the consumer is
>  assigned to the topic_partition it doesn't find any, so it uses the
> >> latest
>  offset

Apache Kafka integration using Apache Camel

2017-01-05 Thread Gupta, Swati
Hello All,

I am trying to create a Consumer using Apache Camel for a topic in Apache Kafka.
I am using Camel 2.17.0 and Kafka 0.10  and JDK 1.8.
I have attached a file, KafkaCamelTestConsumer.java which is a standalone 
application trying to read from a topic  "test1"created in Apache Kafka
I am producing messages from the console and also was successful to produce 
messages using a Camel program in the topic "test1", but not able to consume 
messages. Ideally, it should get printed, but nothing seems to happen. The log 
says that the route has started but does not process any message.

Please help to confirm if there is anything wrong with the below syntax:
from("kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest"
 +

"&consumersCount=1&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+ 
"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ 
"&autoCommitIntervalMs=1000&sessionTimeoutMs=3&autoCommitEnable=true").split()
.body()
.process(new Processor() {
@Override
public void process(Exchange exchange)
throws Exception {
String messageKey = "";
if (exchange.getIn() != null) {
Message message = exchange.getIn();
Integer partitionId = (Integer) message
.getHeader(KafkaConstants.PARTITION);
String topicName = (String) message
.getHeader(KafkaConstants.TOPIC);
if (message.getHeader(KafkaConstants.KEY) != null)
messageKey = (String) message
.getHeader(KafkaConstants.KEY);
Object data = message.getBody();


System.out.println("topicName :: "
+ topicName + " partitionId :: "
+ partitionId + " messageKey :: "
+ messageKey + " message :: "
+ data + "\n");
}
}

}).to("file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8");
}
});

I have also tried with the basic parameters as below and it still fails to read 
messages.
from("kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest")

Any help on this will be greatly appreciated.

Thanks in advance

Thanks & Regards
Swati


This e-mail and any attachments to it (the "Communication") is, unless 
otherwise stated, confidential, may contain copyright material and is for the 
use only of the intended recipient. If you receive the Communication in error, 
please notify the sender immediately by return e-mail, delete the Communication 
and the return e-mail, and do not read, copy, retransmit or otherwise deal with 
it. Any views expressed in the Communication are those of the individual sender 
only, unless expressly stated to be those of Australia and New Zealand Banking 
Group Limited ABN 11 005 357 522, or any of its related entities including ANZ 
Bank New Zealand Limited (together "ANZ"). ANZ does not accept liability in 
connection with the integrity of or errors in the Communication, computer 
virus, data corruption, interference or delay arising from or in respect of the 
Communication.


Re: Metric meaning

2017-01-05 Thread Ewen Cheslack-Postava
There's not currently anything more detaild than what is included in
http://kafka.apache.org/documentation/#monitoring There's some work trying
to automate the generation of that documentation (
https://issues.apache.org/jira/browse/KAFKA-3480). That combined with some
addition to give longer descriptions for metrics could potentially help
this situation.

That said, for the example you mentioned, this is just what the description
says: when a log segment gets flushed to disk (including the index and time
index files), this tracks how long that flush takes.

-Ewen

On Thu, Jan 5, 2017 at 7:21 AM, Robert Quinlivan 
wrote:

> Hello,
>
> Are there more detailed descriptions available for the metrics exposed by
> Kafka via JMX? The current documentation provides some information but a
> few metrics are not listed in detail – for example, "Log flush rate and
> time."
>
> --
> Robert Quinlivan
> Software Engineer, Signal
>


Re: Query on MirrorMaker Replication - Bi-directional/Failover replication

2017-01-05 Thread Ewen Cheslack-Postava
On Thu, Jan 5, 2017 at 3:07 AM, Greenhorn Techie 
wrote:

> Hi,
>
> We are planning to setup MirrorMaker based Kafka replication for DR
> purposes. The base requirement is to have a DR replication from primary
> (site1) to DR site  (site2)using MirrorMaker,
>
> However, we need the solution to work in case of failover as well i.e.
> where in the event of the site1 kafka cluster failing, site2 kafka cluster
> would be made primary. Later when site1 cluster eventually comes back-up
> online, direction of replication would be from site2 to site1.
>
> But as I understand, the offsets on each of the clusters are different, so
> wondering how to design the solution given this constraint and
> requirements.
>

It turns out this is tricky. And once you start digging in you'll find it's
way more complicated than you might originally think.

Before going down the rabbit hole, I'd suggest taking a look at this great
talk by Jun Rao (one of the original authors of Kafka) about multi-DC Kafka
setups: https://www.youtube.com/watch?v=Dvk0cwqGgws

Additionally, I want to mention that while it is tempting to want to treat
multi-DC DR cases in a way that we get really convenient, strongly
consistent, highly available behavior because that makes it easier to
reason about and avoids pushing much of the burden down to applications,
that's not realistic or practical. And honestly, it's rarely even
necessary. DR cases really are DR. Usually it is possible to make some
tradeoffs you might not make under normal circumstances (the most important
one being the tradeoff between possibly seeing duplicates vs exactly once).
The tension here is often that one team is responsible for maintain the
infrastructure and handling this DR failover scenario, and others are
responsible for the behavior of the applications. The infrastructure team
is responsible for figuring out the DR failover story but if they don't
solve it at the infrastructure layer then they get stuck having to
understand all the current (and future) applications built on that
infrastructure.

That said, here are the details I think you're looking for:

The short answer right now is that doing DR failover like that is not going
to be easy with MM. Confluent is building additional tools to deal with
multi-DC setups because of a bunch of these challenges:
https://www.confluent.io/product/multi-datacenter/

For your specific concern about reversing the direction of replication,
you'd need to build additional tooling to support this. The basic list of
steps would be something like this (assuming non-compacted topics):

1. Use MM normally to replicate your data. Be *very* sure you construct
your setup to ensure *everything* is mirrored (proper # of partitions,
replication factor, topic level configs, etc). (Note that this is something
the Confluent replication solution is addressing that's a significant gap
in MM.)
2. During replication, be sure to record offset deltas for every topic
partition. These are needed to reverse the direction of replication
correctly. Make sure to store them in the backup DC and somewhere very
reliable.
3. Observe DC failure.
4. Decide to do failover. Ensure replication has actually stopped (via your
own tooling, or probably better, by using ACLs to ensure no new data can be
produced from original DC to backup DC)
5. Record all the high watermarks for every topic partition so you know
which data was replicated from the original DC (vs which is new after
failover).
6. Allow failover to proceed. Make the backup DC primary.
7. Once the original DC is back alive, you want to reverse replication and
make it the backup. Lookup the offset deltas, use them to initialize
offsets for the consumer group you'll use to do replication.
8. Go back to the original DC and make sure there isn't any "extra" data,
i.e. stuff that didn't get replicated but was successfully written to the
original DC's cluster. For topic partitions where there is data beyond the
expected offsets, you currently would need to just delete the entire set of
data, or at least to before the offset we expect to start at. (A truncate
operation might be a nice way to avoid having to dump *all* the data, but
doesn't currently exist.)
9. Once you've got the two clusters back in a reasonably synced state with
appropriate starting offsets committed, start up MM again in the reverse
direction.

If this sounds tricky, it turns out that when you add compacted topics,
things get quite a bit messier

-Ewen


>
> Thanks
>


Re: One big kafka connect cluster or many small ones?

2017-01-05 Thread Stephane Maarek
Thanks a lot for the guidance, I think we’ll go ahead with one cluster. I
just need to figure out how our CD pipeline can talk to our connect cluster
securely (because it’ll need direct access to perform API calls).

Lastly, a question or maybe a piece of feedback… is it not possible to
specify the key serializer and deserializer as part of the rest api job
config?
The issue is that sometimes our data is avro, sometimes it’s json. And it
seems I’d need two separate clusters for that?

On 6 January 2017 at 1:54:10 pm, Ewen Cheslack-Postava (e...@confluent.io)
wrote:

On Thu, Jan 5, 2017 at 3:12 PM, Stephane Maarek <
steph...@simplemachines.com.au> wrote:

> Hi,
>
> We like to operate in micro-services (dockerize and ship everything on
ecs)
> and I was wondering which approach was preferred.
> We have one kafka cluster, one zookeeper cluster, etc, but when it comes
to
> kafka connect I have some doubts.
>
> Is it better to have one big kafka connect with multiple nodes, or many
> small kafka connect clusters or standalone, for each connector / etl ?
>

You can do any of these, and it may depend on how you do
orchestration/deployment.

We built Connect to support running one big cluster running a bunch of
connectors. It balances work automatically and provides a way to control
scale up/down via increased parallelism. This means we don't need to make
any assumptions about how you deploy, how you handle elastically scaling
your clusters, etc. But if you run in an environment and have the tooling
in place to do that already, you can also opt to run many smaller clusters
and use that tooling to scale up/down. In that case you'd just make sure
there were enough tasks for each connector so that when you scale the # of
workers for a cluster up the rebalancing of work would ensure there was
enough tasks for every worker to remain occupied.

The main drawback of doing this is that Connect uses a few topics to for
configs, status, and offsets and you need these to be unique per cluster.
This means you'll have 3N more topics. If you're running a *lot* of
connectors, that could eventually become a problem. It also means you have
that many more worker configs to handle, clusters to monitor, etc. And
deploying a connector no longer becomes as simple as just making a call to
the service's REST API since there isn't a single centralized service. The
main benefits I can think of are a) if you already have preferred tooling
for handling elasticity and b) better resource isolation between connectors
(i.e. an OOM error in one connector won't affect any other connectors).

For standalone mode, we'd generally recommend only using it when
distributed mode doesn't make sense, e.g. for log file collection. Other
than that, having the fault tolerance and high availability of distributed
mode is preferred.

On your specific points:

>
> The issues I’m trying to address are :
> - Integration with our CI/CD pipeline
>

I'm not sure anything about Connect affects this. Is there a specific
concern you have about the CI/CD pipeline & Connect?


> - Efficient resources utilisation
>

Putting all the connectors into one cluster will probably result in better
resource utilization unless you're already automatically tracking usage and
scaling appropriately. The reason is that if you use a bunch of small
clusters, you're now stuck trying to optimize N uses. Since Connect can
already (roughly) balance work, putting all the work into one cluster and
having connect split it up means you just need to watch utilization of the
nodes in that one cluster and scale up or down as appropriate.


> - Easily add new jar files that connectors depend on with minimal
downtime
>

This one is a bit interesting. You shouldn't have any downtime adding jars
in the sense that you can do rolling bounces of Connect. The one caveat is
that the current limitation for how it rebalances work involves halting
work for all connectors/tasks, doing the rebalance, and then starting them
up again. We plan to improve this, but the timeframe for it is still
uncertain. Usually these rebalance steps should be pretty quick. The main
reason this can be a concern is that halting some connectors could take
some time (e.g. because they need to fully flush their data). This means
the period of time your connectors are not processing data during one of
those rebalances is controlled by the "worst" connector.

I would recommend trying a single cluster but monitoring whether you see
stalls due to rebalances. If you do, then moving to multiple clusters might
make sense. (This also, obviously, depends a lot on your SLA for data
delivery.)


> - Monitoring operations
>

Multiple clusters definitely seems messier and more complicated for this.
There will be more workers in a single cluster, but it's a single service
you need to monitor and maintain.

Hope that helps!

-Ewen


>
> Thanks for your guidance
>
> Regards,
> Stephane
>


Re: Kafka Connect offset.storage.topic not receiving messages (i.e. how to access Kafka Connect offset metadata?)

2017-01-05 Thread Ewen Cheslack-Postava
On Thu, Jan 5, 2017 at 11:30 AM, Phillip Mann  wrote:

> I am working on setting up a Kafka Connect Distributed Mode application
> which will be a Kafka to S3 pipeline. I am using Kafka 0.10.1.0-1 and Kafka
> Connect 3.1.1-1. So far things are going smoothly but one aspect that is
> important to the larger system I am working with requires knowing offset
> information of the Kafka -> FileSystem pipeline. According to the
> documentation, the offset.storage.topic configuration will be the location
> the distributed mode application uses for storing offset information. This
> makes sense given how Kafka stores consumer offsets in the 'new' Kafka.
> However, after doing some testing with the FileStreamSinkConnector, nothing
> is being written to my offset.storage.topic which is the default value:
> connect-offsets.
>

The documentation may need to be clarified about this -- this same question
has come up at least twice in the past 2 days.

The offset.storage.topic is actually only for source connectors. Source
connectors need to define their own format for offsets since we can't make
assumptions about how source systems define an offset in their stream of
data.

Sink connectors are just reading data out of Kafka and there is already a
good mechanism for tracking offsets, so we use that.


>
> To be specific, I am using a Python Kafka producer to push data to a topic
> and using Kafka Connect with the FileStreamSinkConnect to output the data
> from the topic to a file. This works and behaves as I expect the connector
> to behave. Additionally, when I stop the connector and start the connector,
> the application remembers the state in the topic and there is no data
> duplication. However, when I go to the offset.storage.topic to see what
> offset metadata is stored, there is nothing in the topic.
>
> This is the command that I use:
>
> kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092
> --topic connect-offsets --from-beginning
>
> I receive this message after letting this command run for a minute or so:
>
> Processed a total of 0 messages
>
> So to summarize, I have 2 questions:
>
>
>   1.  Why is offset metadata not being written to the topic that should be
> storing this even though my distributed application is keeping state
> correctly?
>
>
>   1.  How do I access offset metadata information for a Kafka Connect
> distributed mode application? This is 100% necessary for my team's Lambda
> Architecture implementation of our system.
>

We want to improve this (to expose both read and write operations, since
it's also sometimes useful to be able to manually reset committed offsets):
https://issues.apache.org/jira/browse/KAFKA-3820

For sink connectors, however, you can still get this information directly.
You can use the consumer offset checker to lookup offsets for any consumer
group. For connect, the consumer group for a connector will be
connect-.

-Ewen


>
> Thanks for the help.
>


Re: One big kafka connect cluster or many small ones?

2017-01-05 Thread Ewen Cheslack-Postava
On Thu, Jan 5, 2017 at 3:12 PM, Stephane Maarek <
steph...@simplemachines.com.au> wrote:

> Hi,
>
> We like to operate in micro-services (dockerize and ship everything on ecs)
> and I was wondering which approach was preferred.
> We have one kafka cluster, one zookeeper cluster, etc, but when it comes to
> kafka connect I have some doubts.
>
> Is it better to have one big kafka connect with multiple nodes, or many
> small kafka connect clusters or standalone, for each connector / etl ?
>

You can do any of these, and it may depend on how you do
orchestration/deployment.

We built Connect to support running one big cluster running a bunch of
connectors. It balances work automatically and provides a way to control
scale up/down via increased parallelism. This means we don't need to make
any assumptions about how you deploy, how you handle elastically scaling
your clusters, etc. But if you run in an environment and have the tooling
in place to do that already, you can also opt to run many smaller clusters
and use that tooling to scale up/down. In that case you'd just make sure
there were enough tasks for each connector so that when you scale the # of
workers for a cluster up the rebalancing of work would ensure there was
enough tasks for every worker to remain occupied.

The main drawback of doing this is that Connect uses a few topics to for
configs, status, and offsets and you need these to be unique per cluster.
This means you'll have 3N more topics. If you're running a *lot* of
connectors, that could eventually become a problem. It also means you have
that many more worker configs to handle, clusters to monitor, etc. And
deploying a connector no longer becomes as simple as just making a call to
the service's REST API since there isn't a single centralized service. The
main benefits I can think of are a) if you already have preferred tooling
for handling elasticity and b) better resource isolation between connectors
(i.e. an OOM error in one connector won't affect any other connectors).

For standalone mode, we'd generally recommend only using it when
distributed mode doesn't make sense, e.g. for log file collection. Other
than that, having the fault tolerance and high availability of distributed
mode is preferred.

On your specific points:

>
> The issues I’m trying to address are :
>  - Integration with our CI/CD pipeline
>

I'm not sure anything about Connect affects this. Is there a specific
concern you have about the CI/CD pipeline & Connect?


>  - Efficient resources utilisation
>

Putting all the connectors into one cluster will probably result in better
resource utilization unless you're already automatically tracking usage and
scaling appropriately. The reason is that if you use a bunch of small
clusters, you're now stuck trying to optimize N uses. Since Connect can
already (roughly) balance work, putting all the work into one cluster and
having connect split it up means you just need to watch utilization of the
nodes in that one cluster and scale up or down as appropriate.


>  - Easily add new jar files that connectors depend on with minimal downtime
>

This one is a bit interesting. You shouldn't have any downtime adding jars
in the sense that you can do rolling bounces of Connect. The one caveat is
that the current limitation for how it rebalances work involves halting
work for all connectors/tasks, doing the rebalance, and then starting them
up again. We plan to improve this, but the timeframe for it is still
uncertain. Usually these rebalance steps should be pretty quick. The main
reason this can be a concern is that halting some connectors could take
some time (e.g. because they need to fully flush their data). This means
the period of time your connectors are not processing data during one of
those rebalances is controlled by the "worst" connector.

I would recommend trying a single cluster but monitoring whether you see
stalls due to rebalances. If you do, then moving to multiple clusters might
make sense. (This also, obviously, depends a lot on your SLA for data
delivery.)


>  - Monitoring operations
>

Multiple clusters definitely seems messier and more complicated for this.
There will be more workers in a single cluster, but it's a single service
you need to monitor and maintain.

Hope that helps!

-Ewen


>
> Thanks for your guidance
>
> Regards,
> Stephane
>


Re: [VOTE] Vote for KIP-101 - Leader Epochs

2017-01-05 Thread Joel Koshy
(adding the dev list back - as it seems to have gotten dropped earlier in
this thread)

On Thu, Jan 5, 2017 at 6:36 PM, Joel Koshy  wrote:

> +1
>
> This is a very well-written KIP!
> Minor: there is still a mix of terms in the doc that references the
> earlier LeaderGenerationRequest (which is what I'm assuming what it was
> called in previous versions of the wiki). Same for the diagrams which I'm
> guessing are a little harder to make consistent with the text.
>
>
>
> On Thu, Jan 5, 2017 at 5:54 PM, Jun Rao  wrote:
>
>> Hi, Ben,
>>
>> Thanks for the updated KIP. +1
>>
>> 1) In OffsetForLeaderEpochResponse, start_offset probably should be
>> end_offset since it's the end offset of that epoch.
>> 3) That's fine. We can fix KAFKA-1120 separately.
>>
>> Jun
>>
>>
>> On Thu, Jan 5, 2017 at 11:11 AM, Ben Stopford  wrote:
>>
>> > Hi Jun
>> >
>> > Thanks for raising these points. Thorough as ever!
>> >
>> > 1) Changes made as requested.
>> > 2) Done.
>> > 3) My plan for handing returning leaders is to simply to force the
>> Leader
>> > Epoch to increment if a leader returns. I don't plan to fix KAFKA-1120
>> as
>> > part of this KIP. It is really a separate issue with wider implications.
>> > I'd be happy to add KAFKA-1120 into the release though if we have time.
>> > 4) Agreed. Not sure exactly how that's going to play out, but I think
>> we're
>> > on the same page.
>> >
>> > Please could
>> >
>> > Cheers
>> > B
>> >
>> > On Thu, Jan 5, 2017 at 12:50 AM Jun Rao  wrote:
>> >
>> > > Hi, Ben,
>> > >
>> > > Thanks for the proposal. Looks good overall. A few comments below.
>> > >
>> > > 1. For LeaderEpochRequest, we need to include topic right? We probably
>> > want
>> > > to follow other requests by nesting partition inside topic? For
>> > > LeaderEpochResponse,
>> > > do we need to return leader_epoch? I was thinking that we could just
>> > return
>> > > an end_offset, which is the next offset of the last message in the
>> > > requested leader generation. Finally, would
>> OffsetForLeaderEpochRequest
>> > be
>> > > a better name?
>> > >
>> > > 2. We should bump up both the produce request and the fetch request
>> > > protocol version since both include the message set.
>> > >
>> > > 3. Extending LeaderEpoch to include Returning Leaders: To support
>> this,
>> > do
>> > > you plan to use the approach that stores  CZXID in the broker
>> > registration
>> > > and including the CZXID of the leader in /brokers/topics/[topic]/
>> > > partitions/[partitionId]/state in ZK?
>> > >
>> > > 4. Since there are a few other KIPs involving message format too, it
>> > would
>> > > be useful to consider if we could combine the message format changes
>> in
>> > the
>> > > same release.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Wed, Jan 4, 2017 at 9:24 AM, Ben Stopford 
>> wrote:
>> > >
>> > > > Hi All
>> > > >
>> > > > We’re having some problems with this thread being subsumed by the
>> > > > [Discuss] thread. Hopefully this one will appear distinct. If you
>> see
>> > > more
>> > > > than one, please use this one.
>> > > >
>> > > > KIP-101 should now be ready for a vote. As a reminder the KIP
>> proposes
>> > a
>> > > > change to the replication protocol to remove the potential for
>> replicas
>> > > to
>> > > > diverge.
>> > > >
>> > > > The KIP can be found here:  https://cwiki.apache.org/confl
>> > > > uence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+
>> > > > use+Leader+Epoch+rather+than+High+Watermark+for+Truncation <
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-
>> > > > +Alter+Replication+Protocol+to+use+Leader+Epoch+rather+
>> > > > than+High+Watermark+for+Truncation>
>> > > >
>> > > > Please let us know your vote.
>> > > >
>> > > > B
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > >
>> >
>>
>
>


Re: [VOTE] Vote for KIP-101 - Leader Epochs

2017-01-05 Thread Joel Koshy
+1

This is a very well-written KIP!
Minor: there is still a mix of terms in the doc that references the earlier
LeaderGenerationRequest (which is what I'm assuming what it was called in
previous versions of the wiki). Same for the diagrams which I'm guessing
are a little harder to make consistent with the text.



On Thu, Jan 5, 2017 at 5:54 PM, Jun Rao  wrote:

> Hi, Ben,
>
> Thanks for the updated KIP. +1
>
> 1) In OffsetForLeaderEpochResponse, start_offset probably should be
> end_offset since it's the end offset of that epoch.
> 3) That's fine. We can fix KAFKA-1120 separately.
>
> Jun
>
>
> On Thu, Jan 5, 2017 at 11:11 AM, Ben Stopford  wrote:
>
> > Hi Jun
> >
> > Thanks for raising these points. Thorough as ever!
> >
> > 1) Changes made as requested.
> > 2) Done.
> > 3) My plan for handing returning leaders is to simply to force the Leader
> > Epoch to increment if a leader returns. I don't plan to fix KAFKA-1120 as
> > part of this KIP. It is really a separate issue with wider implications.
> > I'd be happy to add KAFKA-1120 into the release though if we have time.
> > 4) Agreed. Not sure exactly how that's going to play out, but I think
> we're
> > on the same page.
> >
> > Please could
> >
> > Cheers
> > B
> >
> > On Thu, Jan 5, 2017 at 12:50 AM Jun Rao  wrote:
> >
> > > Hi, Ben,
> > >
> > > Thanks for the proposal. Looks good overall. A few comments below.
> > >
> > > 1. For LeaderEpochRequest, we need to include topic right? We probably
> > want
> > > to follow other requests by nesting partition inside topic? For
> > > LeaderEpochResponse,
> > > do we need to return leader_epoch? I was thinking that we could just
> > return
> > > an end_offset, which is the next offset of the last message in the
> > > requested leader generation. Finally, would OffsetForLeaderEpochRequest
> > be
> > > a better name?
> > >
> > > 2. We should bump up both the produce request and the fetch request
> > > protocol version since both include the message set.
> > >
> > > 3. Extending LeaderEpoch to include Returning Leaders: To support this,
> > do
> > > you plan to use the approach that stores  CZXID in the broker
> > registration
> > > and including the CZXID of the leader in /brokers/topics/[topic]/
> > > partitions/[partitionId]/state in ZK?
> > >
> > > 4. Since there are a few other KIPs involving message format too, it
> > would
> > > be useful to consider if we could combine the message format changes in
> > the
> > > same release.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Jan 4, 2017 at 9:24 AM, Ben Stopford  wrote:
> > >
> > > > Hi All
> > > >
> > > > We’re having some problems with this thread being subsumed by the
> > > > [Discuss] thread. Hopefully this one will appear distinct. If you see
> > > more
> > > > than one, please use this one.
> > > >
> > > > KIP-101 should now be ready for a vote. As a reminder the KIP
> proposes
> > a
> > > > change to the replication protocol to remove the potential for
> replicas
> > > to
> > > > diverge.
> > > >
> > > > The KIP can be found here:  https://cwiki.apache.org/confl
> > > > uence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+
> > > > use+Leader+Epoch+rather+than+High+Watermark+for+Truncation <
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-
> > > > +Alter+Replication+Protocol+to+use+Leader+Epoch+rather+
> > > > than+High+Watermark+for+Truncation>
> > > >
> > > > Please let us know your vote.
> > > >
> > > > B
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> >
>


Re: [VOTE] Vote for KIP-101 - Leader Epochs

2017-01-05 Thread Jun Rao
Hi, Ben,

Thanks for the updated KIP. +1

1) In OffsetForLeaderEpochResponse, start_offset probably should be
end_offset since it's the end offset of that epoch.
3) That's fine. We can fix KAFKA-1120 separately.

Jun


On Thu, Jan 5, 2017 at 11:11 AM, Ben Stopford  wrote:

> Hi Jun
>
> Thanks for raising these points. Thorough as ever!
>
> 1) Changes made as requested.
> 2) Done.
> 3) My plan for handing returning leaders is to simply to force the Leader
> Epoch to increment if a leader returns. I don't plan to fix KAFKA-1120 as
> part of this KIP. It is really a separate issue with wider implications.
> I'd be happy to add KAFKA-1120 into the release though if we have time.
> 4) Agreed. Not sure exactly how that's going to play out, but I think we're
> on the same page.
>
> Please could
>
> Cheers
> B
>
> On Thu, Jan 5, 2017 at 12:50 AM Jun Rao  wrote:
>
> > Hi, Ben,
> >
> > Thanks for the proposal. Looks good overall. A few comments below.
> >
> > 1. For LeaderEpochRequest, we need to include topic right? We probably
> want
> > to follow other requests by nesting partition inside topic? For
> > LeaderEpochResponse,
> > do we need to return leader_epoch? I was thinking that we could just
> return
> > an end_offset, which is the next offset of the last message in the
> > requested leader generation. Finally, would OffsetForLeaderEpochRequest
> be
> > a better name?
> >
> > 2. We should bump up both the produce request and the fetch request
> > protocol version since both include the message set.
> >
> > 3. Extending LeaderEpoch to include Returning Leaders: To support this,
> do
> > you plan to use the approach that stores  CZXID in the broker
> registration
> > and including the CZXID of the leader in /brokers/topics/[topic]/
> > partitions/[partitionId]/state in ZK?
> >
> > 4. Since there are a few other KIPs involving message format too, it
> would
> > be useful to consider if we could combine the message format changes in
> the
> > same release.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Jan 4, 2017 at 9:24 AM, Ben Stopford  wrote:
> >
> > > Hi All
> > >
> > > We’re having some problems with this thread being subsumed by the
> > > [Discuss] thread. Hopefully this one will appear distinct. If you see
> > more
> > > than one, please use this one.
> > >
> > > KIP-101 should now be ready for a vote. As a reminder the KIP proposes
> a
> > > change to the replication protocol to remove the potential for replicas
> > to
> > > diverge.
> > >
> > > The KIP can be found here:  https://cwiki.apache.org/confl
> > > uence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+
> > > use+Leader+Epoch+rather+than+High+Watermark+for+Truncation <
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-
> > > +Alter+Replication+Protocol+to+use+Leader+Epoch+rather+
> > > than+High+Watermark+for+Truncation>
> > >
> > > Please let us know your vote.
> > >
> > > B
> > >
> > >
> > >
> > >
> > >
> >
>


One big kafka connect cluster or many small ones?

2017-01-05 Thread Stephane Maarek
Hi,

We like to operate in micro-services (dockerize and ship everything on ecs)
and I was wondering which approach was preferred.
We have one kafka cluster, one zookeeper cluster, etc, but when it comes to
kafka connect I have some doubts.

Is it better to have one big kafka connect with multiple nodes, or many
small kafka connect clusters or standalone, for each connector / etl ?

The issues I’m trying to address are :
 - Integration with our CI/CD pipeline
 - Efficient resources utilisation
 - Easily add new jar files that connectors depend on with minimal downtime
 - Monitoring operations

Thanks for your guidance

Regards,
Stephane


Re: Consumer Rebalancing Question

2017-01-05 Thread Pradeep Gollakota
I see... doesn't that cause flapping though?

On Wed, Jan 4, 2017 at 8:22 PM, Ewen Cheslack-Postava 
wrote:

> The coordinator will immediately move the group into a rebalance if it
> needs it. The reason LeaveGroupRequest was added was to avoid having to
> wait for the session timeout before completing a rebalance. So aside from
> the latency of cleanup/committing offests/rejoining after a heartbeat,
> rolling bounces should be fast for consumer groups.
>
> -Ewen
>
> On Wed, Jan 4, 2017 at 5:19 PM, Pradeep Gollakota 
> wrote:
>
> > Hi Kafka folks!
> >
> > When a consumer is closed, it will issue a LeaveGroupRequest. Does anyone
> > know how long the coordinator waits before reassigning the partitions
> that
> > were assigned to the leaving consumer to a new consumer? I ask because
> I'm
> > trying to understand the behavior of consumers if you're doing a rolling
> > restart.
> >
> > Thanks!
> > Pradeep
> >
>


Re: Under-replicated Partitions while rolling Kafka nodes in AWS

2017-01-05 Thread James Cheng

> On Jan 5, 2017, at 7:55 AM, Jack Lund  wrote:
> 
> Hello, all.
> 
> We're running multiple Kafka clusters in AWS, and thus multiple Zookeeper
> clusters as well. When we roll out changes to our zookeeper nodes (which
> involves changes to the AMI, which means terminating the zookeeper instance
> and bringing up a new one in its place) we have to restart our Kafka
> brokers one at a time so they can pick up the new zookeeper IP address.
> 

FYI, zookeeper 3.4.8 fixes the issue where you have to restart zookeeper nodes 
when their DNS mapping changes. I'm not sure how it affects restarting kafka 
though, when the zookeeper DNS changes.

https://zookeeper.apache.org/doc/r3.4.8/releasenotes.html 

https://issues.apache.org/jira/browse/ZOOKEEPER-1506 


> What we've noticed is that, as the brokers are restarted, we get alerts for
> under-replicated partitions, which seems strange since it seems like the
> shutdown process should take care of moving any replicas and the leadership
> election process.
> 

During a controlled shutdown, you are right that *leadership* is moved from one 
broker to another. But the replica list does not change. A topic assigned to 
brokers 1 2 3 for example will only live on 1 2 3. If broker 1 is the leader 
for the topic, then during controlled shutdown of 1, leadership may move to 2 
or 3. But a broker 4 would never automatically take over as replica for the 
topic.

You can build such functionality yourself, if you wanted. You could, for 
example, move the topic to 2 3 4 before shutting down 1, and then move it back 
to 1 2 3 once 1 is back up. But that's a bunch of work you've have to do 
yourself.

-James

> This is causing us some pain because it means that we get pages whenever we
> roll out changes to Zookeeper.
> 
> Does anybody have any ideas why this would be happening, and how we can
> avoid it?
> 
> Thanks.
> 
> -Jack Lund
> Braintree Payments



Re: Lost message with Kafka configuration

2017-01-05 Thread James Cheng

> On Jan 5, 2017, at 8:23 AM, Hoang Bao Thien  wrote:
> 
> Yes, the problem is from producer configuration. And James Cheng has told
> me how to fix it.
> However I still get other poblem with a large file:
> 
> org.apache.kafka.common.errors.TimeoutException: Batch containing 36
> record(s) expired due to timeout while requesting metadata from brokers for
> MyTopic-0
> 

kafka-console-producer.sh defaults to retries=0. If there is a timeout, as that 
error indicates, I think it drops the messages it was trying to send.

As a test, try setting retries to something high, by doing "--producer-property 
retries="

See the description of "retries" at 
http://kafka.apache.org/documentation/#producerconfigs 
.

-James


> Best regards,
> 
> On Thu, Jan 5, 2017 at 10:23 AM, Protoss Hu 
> wrote:
> 
>> You mean the messages were lost on the way to broker before the broker
>> actually received?
>> 
>> Protoss Hu
>> Blog: http://hbprotoss.github.io/
>> Weibo: http://weibo.com/hbprotoss
>> 
>> 2017年1月5日 +0800 PM4:53 James Cheng ,写道:
>>> kafka-console-producer.sh defaults to acks=0, which means that the
>> producer essentially throws messages at the broker and doesn't wait/retry
>> to make sure they are properly received.
>>> 
>>> In the kafka-console-producer.sh usage text:
>>> --request-required-acks >> request required acks> requests (default: 0)
>>> 
>>> Try re-running your test with "--request-required-acks -1" or
>> "--request-required-acks all" (They are equivalent) This will tell the
>> broker to wait for messages to be fully saved to all replicas before
>> returning an acknowledgement to the producer. You can read more about acks
>> in the producer configuration section of the kafka docs (
>> http://kafka.apache.org/documentation/#producerconfigs <
>> http://kafka.apache.org/documentation/#producerconfigs>)
>>> 
>>> -James
>>> 
 On Jan 4, 2017, at 1:25 AM, Hoang Bao Thien 
>> wrote:
 
 Hi all,
 
 I have a problem with losing messages from Kafka.
 The situation is as follows: I put a csv file with 286701 rows (size =
 110MB) into Kafka producer with command:
 $ cat test.csv | kafka-console-producer.sh --broker-list localhost:9092
 --topic MyTopic > /dev/null
 
 and then count the number of lines from the Kafka consumer
 (kafka-console-consumer.sh --zookeeper localhost:2181 --topic MyTopic
 --from-beginning)
 However, I only get about 260K-270K, and this number of received
>> messages
 changes for each test.
 
 My configuration in the "config/server.properties" has some minor
>> change
 compared to the original file:
 
 log.retention.check.interval.hours=24
 log.retention.hours=168
 delete.topic.enable = true
 
 The remaining configurations are the same as default value.
 
 Could you please explain why the messages were lost in Kafka? And how
>> to
 fix this problem please?
 
 Thanks a lot.
 
 Best regards
 ,
 Alex
>>> 
>> 



Re: Is this a bug or just unintuitive behavior?

2017-01-05 Thread James Cheng

> On Jan 5, 2017, at 12:57 PM, Jeff Widman  wrote:
> 
> Thanks James and Hans.
> 
> Will this also happen when we expand the number of partitions in a topic?
> 
> That also will trigger a rebalance, the consumer won't subscribe to the
> partition until the rebalance finishes, etc.
> 
> So it'd seem that any messages published to the new partition in between
> the partition creation and the rebalance finishing won't be consumed by any
> consumers that have offset=latest
> 

It hadn't occured to me until you mentioned it, but yes, I think it'd also 
happen in those cases.

In the kafka consumer javadocs, they provide a list of things that would cause 
a rebalance:
http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)
 


"As part of group management, the consumer will keep track of the list of 
consumers that belong to a particular group and will trigger a rebalance 
operation if one of the following events trigger -

Number of partitions change for any of the subscribed list of topics
Topic is created or deleted
An existing member of the consumer group dies
A new member is added to an existing consumer group via the join API
"

I'm guessing that this would affect any of those scenarios.

-James


> 
> 
> 
> On Thu, Jan 5, 2017 at 12:40 AM, James Cheng  wrote:
> 
>> Jeff,
>> 
>> Your analysis is correct. I would say that it is known but unintuitive
>> behavior.
>> 
>> As an example of a problem caused by this behavior, it's possible for
>> mirrormaker to miss messages on newly created topics, even thought it was
>> subscribed to them before topics were creted.
>> 
>> See the following JIRAs:
>> https://issues.apache.org/jira/browse/KAFKA-3848 <
>> https://issues.apache.org/jira/browse/KAFKA-3848>
>> https://issues.apache.org/jira/browse/KAFKA-3370 <
>> https://issues.apache.org/jira/browse/KAFKA-3370>
>> 
>> -James
>> 
>>> On Jan 4, 2017, at 4:37 PM, h...@confluent.io wrote:
>>> 
>>> This sounds exactly as I would expect things to behave. If you consume
>> from the beginning I would think you would get all the messages but not if
>> you consume from the latest offset. You can separately tune the metadata
>> refresh interval if you want to miss fewer messages but that still won't
>> get you all messages from the beginning if you don't explicitly consume
>> from the beginning.
>>> 
>>> Sent from my iPhone
>>> 
 On Jan 4, 2017, at 6:53 PM, Jeff Widman  wrote:
 
 I'm seeing consumers miss messages when they subscribe before the topic
>> is
 actually created.
 
 Scenario:
 1) kafka 0.10.1.1 cluster with allow-topic no topics, but supports topic
 auto-creation as soon as a message is published to the topic
 2) consumer subscribes using topic string or a regex pattern. Currently
>> no
 topics match. Consumer offset is "latest"
 3) producer publishes to a topic that matches the string or regex
>> pattern.
 4) broker immediately creates a topic, writes the message, and also
 notifies the consumer group that a rebalance needs to happen to assign
>> the
 topic_partition to one of the consumers..
 5) rebalance is fairly quick, maybe a second or so
 6) a consumer is assigned to the newly-created topic_partition
 
 At this point, we've got a consumer steadily polling the recently
>> created
 topic_partition. However, the consumer.poll() never returns any messages
 published between topic creation and when the consumer was assigned to
>> the
 topic_partition. I'm guessing this may be because when the consumer is
 assigned to the topic_partition it doesn't find any, so it uses the
>> latest
 offset, which happens to be after the messages that were published to
 create the topic.
 
 This is surprising because the consumer technically was subscribed to
>> the
 topic before the messages were produced, so you'd think the consumer
>> would
 receive these messages.
 
 Is this known behavior? A bug in Kafka broker? Or a bug in my client
 library?
>> 
>> 



Re: Is this a bug or just unintuitive behavior?

2017-01-05 Thread Jeff Widman
Thanks James and Hans.

Will this also happen when we expand the number of partitions in a topic?

That also will trigger a rebalance, the consumer won't subscribe to the
partition until the rebalance finishes, etc.

So it'd seem that any messages published to the new partition in between
the partition creation and the rebalance finishing won't be consumed by any
consumers that have offset=latest




On Thu, Jan 5, 2017 at 12:40 AM, James Cheng  wrote:

> Jeff,
>
> Your analysis is correct. I would say that it is known but unintuitive
> behavior.
>
> As an example of a problem caused by this behavior, it's possible for
> mirrormaker to miss messages on newly created topics, even thought it was
> subscribed to them before topics were creted.
>
> See the following JIRAs:
> https://issues.apache.org/jira/browse/KAFKA-3848 <
> https://issues.apache.org/jira/browse/KAFKA-3848>
> https://issues.apache.org/jira/browse/KAFKA-3370 <
> https://issues.apache.org/jira/browse/KAFKA-3370>
>
> -James
>
> > On Jan 4, 2017, at 4:37 PM, h...@confluent.io wrote:
> >
> > This sounds exactly as I would expect things to behave. If you consume
> from the beginning I would think you would get all the messages but not if
> you consume from the latest offset. You can separately tune the metadata
> refresh interval if you want to miss fewer messages but that still won't
> get you all messages from the beginning if you don't explicitly consume
> from the beginning.
> >
> > Sent from my iPhone
> >
> >> On Jan 4, 2017, at 6:53 PM, Jeff Widman  wrote:
> >>
> >> I'm seeing consumers miss messages when they subscribe before the topic
> is
> >> actually created.
> >>
> >> Scenario:
> >> 1) kafka 0.10.1.1 cluster with allow-topic no topics, but supports topic
> >> auto-creation as soon as a message is published to the topic
> >> 2) consumer subscribes using topic string or a regex pattern. Currently
> no
> >> topics match. Consumer offset is "latest"
> >> 3) producer publishes to a topic that matches the string or regex
> pattern.
> >> 4) broker immediately creates a topic, writes the message, and also
> >> notifies the consumer group that a rebalance needs to happen to assign
> the
> >> topic_partition to one of the consumers..
> >> 5) rebalance is fairly quick, maybe a second or so
> >> 6) a consumer is assigned to the newly-created topic_partition
> >>
> >> At this point, we've got a consumer steadily polling the recently
> created
> >> topic_partition. However, the consumer.poll() never returns any messages
> >> published between topic creation and when the consumer was assigned to
> the
> >> topic_partition. I'm guessing this may be because when the consumer is
> >> assigned to the topic_partition it doesn't find any, so it uses the
> latest
> >> offset, which happens to be after the messages that were published to
> >> create the topic.
> >>
> >> This is surprising because the consumer technically was subscribed to
> the
> >> topic before the messages were produced, so you'd think the consumer
> would
> >> receive these messages.
> >>
> >> Is this known behavior? A bug in Kafka broker? Or a bug in my client
> >> library?
>
>


Re: What makes a KStream app exit?

2017-01-05 Thread Guozhang Wang
Re: "UnsupportedOperationException: null org.apache.kafka.streams.
processor.internals.StandbyContextImpl.recordCollector(
StandyContextImpl.java:81)": I think this is a known issue that has been
fixed in trunk:

https://github.com/apache/kafka/commit/a4592a18641f84a1983c5fe7e697a8
b0ab43eb25


Guozhang

On Fri, Dec 16, 2016 at 11:53 AM, Matthias J. Sax 
wrote:

> I guess. It's bugs, so always hard to be 100% sure.
>
> We know about a null-pointer bug in task assignment/creating -- so I
> assume it what you see.
>
> -Matthias
>
> On 12/16/16 11:19 AM, Jon Yeargers wrote:
> > And these bugs would cause the behaviors Im seeing?
> >
> > On Fri, Dec 16, 2016 at 10:45 AM, Matthias J. Sax  >
> > wrote:
> >
> >> We just discovered a couple of bugs with regard to standby tasks... Not
> >> all bug fix PRs got merged yet.
> >>
> >> You can try running on trunk to get those fixes. Should only be a few
> >> days until the fixes get merged.
> >>
> >>
> >> -Matthias
> >>
> >> On 12/16/16 9:10 AM, Jon Yeargers wrote:
> >>> Have started having this issue with another KStream based app. Digging
> >>> through logs I ran across this message:
> >>>
> >>> When I've seen it before it certainly does kill the application. At the
> >> end
> >>> of the SNIP you can see the exit process starting.
> >>>
> >>>
> >>> 2016-12-16 17:04:51,507 [StreamThread-1] DEBUG
> >>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> >> creating
> >>> new standby task 0_0
> >>>
> >>> 2016-12-16 17:04:51,507 [StreamThread-1] INFO
> >>> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1]
> >> Creating
> >>> new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]]
> >>>
> >>> 2016-12-16 17:04:51,508 [StreamThread-1] INFO
> >>> o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing
> state
> >>> stores
> >>>
> >>> 2016-12-16 17:04:51,508 [StreamThread-1] DEBUG
> >>> o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor
> >> fetching
> >>> committed offsets for partitions: [rtdetail_breakout-0]
> >>>
> >>> 2016-12-16 17:04:51,819 [StreamThread-1] ERROR
> >>> o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> >>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> >>> RtDetailBreakoutProcessor fa
> >>>
> >>> iled on partition assignment
> >>>
> >>> java.lang.UnsupportedOperationException: null
> >>>
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StandbyContextImpl.
> >> recordCollector(StandbyContextImpl.java:81)
> >>>
> >>> at
> >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.(
> >> StoreChangeLogger.java:54)
> >>>
> >>> at
> >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.(
> >> StoreChangeLogger.java:46)
> >>>
> >>> at
> >>> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> >> RocksDBWindowStore.java:197)
> >>>
> >>> at
> >>> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> >> MeteredWindowStore.java:66)
> >>>
> >>> at
> >>> org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> >> CachingWindowStore.java:64)
> >>>
> >>> at
> >>> org.apache.kafka.streams.processor.internals.AbstractTask.
> >> initializeStateStores(AbstractTask.java:86)
> >>>
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StandbyTask.(
> >> StandbyTask.java:68)
> >>>
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamThread.
> >> createStandbyTask(StreamThread.java:733)
> >>>
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamThread.addStandbyTasks(StreamThread.java:757)
> >>>
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamThread.access$200(
> >> StreamThread.java:69)
> >>>
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamThread$1.
> >> onPartitionsAssigned(StreamThread.java:125)
> >>>
> >>> at
> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> >> onJoinComplete(ConsumerCoordinator.java:229)
> >>>
> >>> at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> joinGroupIfNeeded(AbstractCoordinator.java:313)
> >>>
> >>> at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> ensureActiveGroup(AbstractCoordinator.java:277)
> >>>
> >>> at
> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> >> ConsumerCoordinator.java:260)
> >>>
> >>> at
> >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> >> pollOnce(KafkaConsumer.java:1013)
> >>>
> >>> at
> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >> KafkaConsumer.java:979)
> >>>
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> StreamThread.java:442)
> >>>
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:242)
> >>>
> >>> 2016-12-1

Kafka Connect offset.storage.topic not receiving messages (i.e. how to access Kafka Connect offset metadata?)

2017-01-05 Thread Phillip Mann
I am working on setting up a Kafka Connect Distributed Mode application which 
will be a Kafka to S3 pipeline. I am using Kafka 0.10.1.0-1 and Kafka Connect 
3.1.1-1. So far things are going smoothly but one aspect that is important to 
the larger system I am working with requires knowing offset information of the 
Kafka -> FileSystem pipeline. According to the documentation, the 
offset.storage.topic configuration will be the location the distributed mode 
application uses for storing offset information. This makes sense given how 
Kafka stores consumer offsets in the 'new' Kafka. However, after doing some 
testing with the FileStreamSinkConnector, nothing is being written to my 
offset.storage.topic which is the default value: connect-offsets.

To be specific, I am using a Python Kafka producer to push data to a topic and 
using Kafka Connect with the FileStreamSinkConnect to output the data from the 
topic to a file. This works and behaves as I expect the connector to behave. 
Additionally, when I stop the connector and start the connector, the 
application remembers the state in the topic and there is no data duplication. 
However, when I go to the offset.storage.topic to see what offset metadata is 
stored, there is nothing in the topic.

This is the command that I use:

kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 
--topic connect-offsets --from-beginning

I receive this message after letting this command run for a minute or so:

Processed a total of 0 messages

So to summarize, I have 2 questions:


  1.  Why is offset metadata not being written to the topic that should be 
storing this even though my distributed application is keeping state correctly?


  1.  How do I access offset metadata information for a Kafka Connect 
distributed mode application? This is 100% necessary for my team's Lambda 
Architecture implementation of our system.

Thanks for the help.


Re: [VOTE] Vote for KIP-101 - Leader Epochs

2017-01-05 Thread Ben Stopford
Hi Jun

Thanks for raising these points. Thorough as ever!

1) Changes made as requested.
2) Done.
3) My plan for handing returning leaders is to simply to force the Leader
Epoch to increment if a leader returns. I don't plan to fix KAFKA-1120 as
part of this KIP. It is really a separate issue with wider implications.
I'd be happy to add KAFKA-1120 into the release though if we have time.
4) Agreed. Not sure exactly how that's going to play out, but I think we're
on the same page.

Please could

Cheers
B

On Thu, Jan 5, 2017 at 12:50 AM Jun Rao  wrote:

> Hi, Ben,
>
> Thanks for the proposal. Looks good overall. A few comments below.
>
> 1. For LeaderEpochRequest, we need to include topic right? We probably want
> to follow other requests by nesting partition inside topic? For
> LeaderEpochResponse,
> do we need to return leader_epoch? I was thinking that we could just return
> an end_offset, which is the next offset of the last message in the
> requested leader generation. Finally, would OffsetForLeaderEpochRequest be
> a better name?
>
> 2. We should bump up both the produce request and the fetch request
> protocol version since both include the message set.
>
> 3. Extending LeaderEpoch to include Returning Leaders: To support this, do
> you plan to use the approach that stores  CZXID in the broker registration
> and including the CZXID of the leader in /brokers/topics/[topic]/
> partitions/[partitionId]/state in ZK?
>
> 4. Since there are a few other KIPs involving message format too, it would
> be useful to consider if we could combine the message format changes in the
> same release.
>
> Thanks,
>
> Jun
>
>
> On Wed, Jan 4, 2017 at 9:24 AM, Ben Stopford  wrote:
>
> > Hi All
> >
> > We’re having some problems with this thread being subsumed by the
> > [Discuss] thread. Hopefully this one will appear distinct. If you see
> more
> > than one, please use this one.
> >
> > KIP-101 should now be ready for a vote. As a reminder the KIP proposes a
> > change to the replication protocol to remove the potential for replicas
> to
> > diverge.
> >
> > The KIP can be found here:  https://cwiki.apache.org/confl
> > uence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+
> > use+Leader+Epoch+rather+than+High+Watermark+for+Truncation <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-
> > +Alter+Replication+Protocol+to+use+Leader+Epoch+rather+
> > than+High+Watermark+for+Truncation>
> >
> > Please let us know your vote.
> >
> > B
> >
> >
> >
> >
> >
>


kafka CN domain and keyword

2017-01-05 Thread Thomas Liu
(Please forward this to your CEO, because this is urgent. Thanks)

This is a formal email. We are the Domain Registration Service company in 
China. Here I have something to confirm with you. On Jan 3, 2017, we received 
an application from Baoda Ltd requested "kafka" as their internet keyword and 
China (CN) domain names (kafka.cn, kafka.com.cn, kafka.net.cn, kafka.org.cn). 
But after checking it, we find this name conflict with your company name or 
trademark. In order to deal with this matter better, it's necessary to send 
email to you and confirm whether this company is associated with your company 
or not?

Best Regards,
Thomas Liu | Service & Operations Manager
China Registry (Head Office) | 6012, Xingdi Building, No. 1698 Yishan Road, 
Shanghai 201103, China
Tel: +86-2161918696 | Fax: +86-2161918697  | Mob: +86-13816428671
Email: tho...@chinaregistry.org.cn
Web: www.chinaregistry.org.cn
 
This email contains privileged and confidential information intended for the 
addressee only. If you are not the intended recipient, please destroy this 
email and inform the sender immediately. We appreciate you respecting the 
confidentiality of this information by not disclosing or using the information 
in this email.

Re: Connect: SourceTask poll & commit interaction

2017-01-05 Thread Shikhar Bhushan
I have created https://issues.apache.org/jira/browse/KAFKA-4598 for this.

On Wed, Dec 14, 2016 at 2:58 PM Shikhar Bhushan 
wrote:

> Hi Mathieu,
>
> I think you are right, there is currently no mutual exclusion between
> `task.commit()` and `task.poll()`. The solution you are thinking of with
> maintaining the committed offset state yourself seems reasonable, though
> inconvenient.
>
> It probably makes sense to add a new parameterized `commit()` method
> carrying the offset map (and possibly deprecate the existing one).
>
> Best,
>
> Shikhar
>
> On Sat, Dec 10, 2016 at 7:57 AM Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
> Hi Kafka Users,
>
> I'm looking for a bit of clarification on the documentation for
> implementing a SourceTask.  I'm reading a replication stream from a
> database in my SourceTask, and I'd like to use commit or commitRecord to
> advance the other system's replication stream pointer so that it knows I
> have successfully read & committed the records to Kafka.  This allows the
> other system to discard unneeded transaction logs.
>
> But I'm uncertain how to use either or SourceTask's commit or commitRecord
> correctly.
>
> For commit, the documentation says that it should "Commit the offsets, up
> to the offsets that have been returned by poll().".  When commit() is
> executed, will poll() currently be running on another thread?  I assume it
> must be, because poll should block, and that would imply you can't commit
> the tailing end of some activity.  If commit is invoked while poll is being
> invoked, I'm concerned that I can't reliably determine where to advance my
> replication stream pointer to -- if I store the location at the end of
> poll, commit might be invoked while poll is still returning some records,
> and advance the pointer further than actually guaranteed.
>
> commitRecord on the other hand is invoked per-record.  The documentation
> says "Commit an individual SourceRecord when the callback from the producer
> client is received."  But if I'm producing to N partitions on different
> brokers, I believe that the producer callback is not called in any
> guaranteed order, so I can't advance my replication stream pointer to any
> single record since an older record being delivered to another partition
> may not have been committed.
>
> The only solution I can see so far is to maintain the replication stream
> positions of all the source records that I've returned from poll, and
> advance the replication pointer in commitRecord only when the lowest
> outstanding record is committed.
>
> Is there anything I've misunderstood or misinterpreted?
>
> Thanks,
>
> Mathieu
>
>


Re: Aggregated windowed counts

2017-01-05 Thread Matthias J. Sax
On a clean restart on the same machine, the local RocksDB will just be
reused as it contains the complete state. Thus there is no need to read
the changelog topic at all.

The changelog topic is only read when a state is moved from one node to
another, or the state got corrupted due to an failure (ie, recovery case).

For both those cases, the whole changelog will be consumed. This might
take some time. But keep in mind, that a changelog topic uses
"compaction policy" thus it will eventually only contain a single entry
per window -- additionally, a "retention policy" is applied and entries
are delete after window retention time expires.

If you still have a too large changelog topic and rebuilding the state
takes too long, you can configure StandByTasks that rebuild the state on
a different machine in the background constantly (they do not do any
actual processing). In case of failure StandByTasks will be used to
quickly recreate the failed tasks.

See:
http://docs.confluent.io/current/streams/architecture.html#fault-tolerance


-Matthias

On 1/5/17 8:13 AM, Benjamin Black wrote:
> I understand now. The commit triggers the output of the window data,
> whether or not the window is complete. For example, if I use .print() as
> you suggest:
> 
> [KSTREAM-AGGREGATE-03]: [kafka@148363192] , (9<-null)
> [KSTREAM-AGGREGATE-03]: [kafka@1483631925000] , (5<-null)
> [KSTREAM-AGGREGATE-03]: [kafka@1483631925000] , (9<-null)
> [KSTREAM-AGGREGATE-03]: [kafka@148363193] , (2<-null)
> 
> The second line is an intermediate result of the third line. I suppose this
> is fine if we are storing the count with the time window, but not if we are
> trying to do a total count of the each word. I'm guessing the only solution
> is a handcrafted solution using the lower level API as suggested in the
> stackoverflow post.
> 
> I have another question concerning how the Count ktable data is stored. If
> I understand correctly, on restart the process will re-create the state of
> the ktable by reading from the beginning the Count topic
> (wordcount-lambda-example-Counts-changelog). Over time wouldn't this be a
> lot of data? Or is there some mechanism used to only read from a position
> near the end?
> 
> On Wed, Jan 4, 2017 at 7:35 PM Matthias J. Sax 
> wrote:
> 
>> There is no such thing as a final window aggregate and you might see
>> intermediate results -- thus the count do not add up.
>>
>> Please have a look here:
>>
>>
>> http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable/38945277#38945277
>>
>> and here:
>>
>>
>> http://docs.confluent.io/current/streams/developer-guide.html#memory-management
>>
>>
>> On each commit, the current intermediate result will be flushed from the
>> de-duplication cache -- thus, for smaller commit interval you see more
>> intermediate results and thus it seems to be more off.
>>
>> In .toStream((k, v) -> k.key()) you get rid of the window-id -- if you
>> keep it, you can see which result record belong to the same window. The
>> simplest way for testing would be to use .print() instead of .toStream()
>> to see the key as window-id plus record-key.
>>
>>
>> -Matthias
>>
>>
>> On 1/4/17 2:09 PM, Benjamin Black wrote:
>>> I'm hoping the DSL will do what I want :) Currently the example is
>>> continuously adding instead of bucketing, so if I modify it by adding a
>>> window to the count function:
>>>
>>> .groupBy((key, word) -> word)
>>> .count(TimeWindows.of(5000L), "Counts")
>>> .toStream((k, v) -> k.key());
>>>
>>> Then I do see bucketing happening. However, it isn't accurate. For
>> example,
>>> I type into the console "kafka" as 20 sentences, but the output I get is:
>>>
>>> kafka 4
>>> kafka 9
>>> kafka 2
>>> kafka 7
>>>
>>> Which equals 22. What am I doing wrong? What is the relationship between
>>> commit interval and time window. The smaller I make commit interval, the
>>> less accurate it becomes.
>>>
>>>
>>> On Wed, Jan 4, 2017 at 3:53 PM Matthias J. Sax 
>>> wrote:
>>>
 Do you know about Kafka Streams? It's DSL gives you exactly what you
 want to do.

 Check out the documentation and WordCount example:

 http://docs.confluent.io/current/streams/index.html


>> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java


 Let us know if you have further questions.


 -Matthias

 On 1/4/17 12:48 PM, Benjamin Black wrote:
> Hello,
>
> I'm looking for guidance on how to approach a counting problem. We want
 to
> consume a stream of data that consists of IDs and generate an output of
 the
> aggregated count with a window size of X seconds using processing time
 and
> a hopping time window. For example, using a window size of 1 second, if
 we
> get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=

Re: Problem with processor API partition assignments

2017-01-05 Thread Matthias J. Sax
It would also be helpful to know the number of partitions for each topic.

-Matthias

On 1/5/17 4:37 AM, Damian Guy wrote:
> Hi Brian,
> 
> It might be helpful if you provide some code showing your Topology.
> 
> Thanks,
> Damian
> 
> On Thu, 5 Jan 2017 at 10:59 Brian Krahmer  wrote:
> 
>> Hey guys,
>>
>>I'm fighting an issue where I can currently only run one instance of
>> my streams application because when other instances come up, the
>> partition reassignment (looks to me) to be incorrect.
>>
>> I'm testing with docker-compose at the moment.  When I scale my
>> application to 3 instances and the 2nd and 3rd connect to kafka, causing
>> a rebalance, I get the following assignment on one of my instances:
>>
>> FleetData-0
>> FleetData-1
>> VehicleJourneyMapData-0
>> VehicleJourneyMapData-1
>> JourneyStarted-0
>> VehicleStateChanged-1
>> VehicleIgnitionData-0
>> VinVehicleMapData-1
>>
>> As you can see, the assignments are clearly not symmetric, which causes
>> problems, as I'm essentially doing join operations.  All topics have 3
>> partition in this testing scenario.  I'm using version 0.10.1.0.  Any
>> ideas?
>>
>> thanks,
>> brian
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Lost message with Kafka configuration

2017-01-05 Thread Hoang Bao Thien
Yes, the problem is from producer configuration. And James Cheng has told
me how to fix it.
However I still get other poblem with a large file:

org.apache.kafka.common.errors.TimeoutException: Batch containing 36
record(s) expired due to timeout while requesting metadata from brokers for
MyTopic-0

Best regards,

On Thu, Jan 5, 2017 at 10:23 AM, Protoss Hu 
wrote:

> You mean the messages were lost on the way to broker before the broker
> actually received?
>
> Protoss Hu
> Blog: http://hbprotoss.github.io/
> Weibo: http://weibo.com/hbprotoss
>
> 2017年1月5日 +0800 PM4:53 James Cheng ,写道:
> > kafka-console-producer.sh defaults to acks=0, which means that the
> producer essentially throws messages at the broker and doesn't wait/retry
> to make sure they are properly received.
> >
> > In the kafka-console-producer.sh usage text:
> > --request-required-acks  > request required acks> requests (default: 0)
> >
> > Try re-running your test with "--request-required-acks -1" or
> "--request-required-acks all" (They are equivalent) This will tell the
> broker to wait for messages to be fully saved to all replicas before
> returning an acknowledgement to the producer. You can read more about acks
> in the producer configuration section of the kafka docs (
> http://kafka.apache.org/documentation/#producerconfigs <
> http://kafka.apache.org/documentation/#producerconfigs>)
> >
> > -James
> >
> > > On Jan 4, 2017, at 1:25 AM, Hoang Bao Thien 
> wrote:
> > >
> > > Hi all,
> > >
> > > I have a problem with losing messages from Kafka.
> > > The situation is as follows: I put a csv file with 286701 rows (size =
> > > 110MB) into Kafka producer with command:
> > > $ cat test.csv | kafka-console-producer.sh --broker-list localhost:9092
> > > --topic MyTopic > /dev/null
> > >
> > > and then count the number of lines from the Kafka consumer
> > > (kafka-console-consumer.sh --zookeeper localhost:2181 --topic MyTopic
> > > --from-beginning)
> > > However, I only get about 260K-270K, and this number of received
> messages
> > > changes for each test.
> > >
> > > My configuration in the "config/server.properties" has some minor
> change
> > > compared to the original file:
> > >
> > > log.retention.check.interval.hours=24
> > > log.retention.hours=168
> > > delete.topic.enable = true
> > >
> > > The remaining configurations are the same as default value.
> > >
> > > Could you please explain why the messages were lost in Kafka? And how
> to
> > > fix this problem please?
> > >
> > > Thanks a lot.
> > >
> > > Best regards
> > > ,
> > > Alex
> >
>


Re: Aggregated windowed counts

2017-01-05 Thread Benjamin Black
I understand now. The commit triggers the output of the window data,
whether or not the window is complete. For example, if I use .print() as
you suggest:

[KSTREAM-AGGREGATE-03]: [kafka@148363192] , (9<-null)
[KSTREAM-AGGREGATE-03]: [kafka@1483631925000] , (5<-null)
[KSTREAM-AGGREGATE-03]: [kafka@1483631925000] , (9<-null)
[KSTREAM-AGGREGATE-03]: [kafka@148363193] , (2<-null)

The second line is an intermediate result of the third line. I suppose this
is fine if we are storing the count with the time window, but not if we are
trying to do a total count of the each word. I'm guessing the only solution
is a handcrafted solution using the lower level API as suggested in the
stackoverflow post.

I have another question concerning how the Count ktable data is stored. If
I understand correctly, on restart the process will re-create the state of
the ktable by reading from the beginning the Count topic
(wordcount-lambda-example-Counts-changelog). Over time wouldn't this be a
lot of data? Or is there some mechanism used to only read from a position
near the end?

On Wed, Jan 4, 2017 at 7:35 PM Matthias J. Sax 
wrote:

> There is no such thing as a final window aggregate and you might see
> intermediate results -- thus the count do not add up.
>
> Please have a look here:
>
>
> http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable/38945277#38945277
>
> and here:
>
>
> http://docs.confluent.io/current/streams/developer-guide.html#memory-management
>
>
> On each commit, the current intermediate result will be flushed from the
> de-duplication cache -- thus, for smaller commit interval you see more
> intermediate results and thus it seems to be more off.
>
> In .toStream((k, v) -> k.key()) you get rid of the window-id -- if you
> keep it, you can see which result record belong to the same window. The
> simplest way for testing would be to use .print() instead of .toStream()
> to see the key as window-id plus record-key.
>
>
> -Matthias
>
>
> On 1/4/17 2:09 PM, Benjamin Black wrote:
> > I'm hoping the DSL will do what I want :) Currently the example is
> > continuously adding instead of bucketing, so if I modify it by adding a
> > window to the count function:
> >
> > .groupBy((key, word) -> word)
> > .count(TimeWindows.of(5000L), "Counts")
> > .toStream((k, v) -> k.key());
> >
> > Then I do see bucketing happening. However, it isn't accurate. For
> example,
> > I type into the console "kafka" as 20 sentences, but the output I get is:
> >
> > kafka 4
> > kafka 9
> > kafka 2
> > kafka 7
> >
> > Which equals 22. What am I doing wrong? What is the relationship between
> > commit interval and time window. The smaller I make commit interval, the
> > less accurate it becomes.
> >
> >
> > On Wed, Jan 4, 2017 at 3:53 PM Matthias J. Sax 
> > wrote:
> >
> >> Do you know about Kafka Streams? It's DSL gives you exactly what you
> >> want to do.
> >>
> >> Check out the documentation and WordCount example:
> >>
> >> http://docs.confluent.io/current/streams/index.html
> >>
> >>
> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
> >>
> >>
> >> Let us know if you have further questions.
> >>
> >>
> >> -Matthias
> >>
> >> On 1/4/17 12:48 PM, Benjamin Black wrote:
> >>> Hello,
> >>>
> >>> I'm looking for guidance on how to approach a counting problem. We want
> >> to
> >>> consume a stream of data that consists of IDs and generate an output of
> >> the
> >>> aggregated count with a window size of X seconds using processing time
> >> and
> >>> a hopping time window. For example, using a window size of 1 second, if
> >> we
> >>> get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1,
> 2=3.
> >> If
> >>> we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2.
> >> The
> >>> aggregated count will then be turned into increment commands to a cache
> >> and
> >>> a database.
> >>>
> >>> Obviously we will need some state to be stored during the count of a
> >>> window, but we only need to keep it for the time period of the window
> >> (i.e.
> >>> a second). I was thinking this could be achieved by using a persistent
> >>> store, where the counts are reset during the punctuate and the store
> >> topic
> >>> uses log compression. Alternatively, we could simple have an in memory
> >>> store that is reset during the punctuate. My concern with the in memory
> >>> store is that I don't know when the input topic offset is committed or
> >> when
> >>> the output data is written and therefore we could lose data.
> Ultimately,
> >> at
> >>> the end of the second, the input offset and output data should be
> written
> >>> at the same time, reducing the likelihood of lost data. We would rather
> >>> lose data, than have duplicate counts. What is the correct approach? Is
> >>> there a better way of tackling the problem?
> >>>
> >>> I hav

Under-replicated Partitions while rolling Kafka nodes in AWS

2017-01-05 Thread Jack Lund
Hello, all.

We're running multiple Kafka clusters in AWS, and thus multiple Zookeeper
clusters as well. When we roll out changes to our zookeeper nodes (which
involves changes to the AMI, which means terminating the zookeeper instance
and bringing up a new one in its place) we have to restart our Kafka
brokers one at a time so they can pick up the new zookeeper IP address.

What we've noticed is that, as the brokers are restarted, we get alerts for
under-replicated partitions, which seems strange since it seems like the
shutdown process should take care of moving any replicas and the leadership
election process.

This is causing us some pain because it means that we get pages whenever we
roll out changes to Zookeeper.

Does anybody have any ideas why this would be happening, and how we can
avoid it?

Thanks.

-Jack Lund
 Braintree Payments


Does offsetsForTimes use createtime of logsegment file?

2017-01-05 Thread Vignesh
Hi,

offsetsForTimes

function
returns offset for a given timestamp. Does it use message's timestamp
(which could be LogAppendTime or set by user) or creation time of
logsegment file?


KIP-33

adds timestamp based index, and it is available only from 0.10.1 . Does
 above function work on 0.10.0 ? If so, are there any differences in how it
works between versions 0.10.0 and 0.10.1 ?

Thanks,
Vignesh.


Re: Lost message with Kafka configuration

2017-01-05 Thread Protoss Hu
You mean the messages were lost on the way to broker before the broker actually 
received?

Protoss Hu
Blog: http://hbprotoss.github.io/
Weibo: http://weibo.com/hbprotoss

2017年1月5日 +0800 PM4:53 James Cheng ,写道:
> kafka-console-producer.sh defaults to acks=0, which means that the producer 
> essentially throws messages at the broker and doesn't wait/retry to make sure 
> they are properly received.
>
> In the kafka-console-producer.sh usage text:
> --request-required-acks  request required acks> requests (default: 0)
>
> Try re-running your test with "--request-required-acks -1" or 
> "--request-required-acks all" (They are equivalent) This will tell the broker 
> to wait for messages to be fully saved to all replicas before returning an 
> acknowledgement to the producer. You can read more about acks in the producer 
> configuration section of the kafka docs 
> (http://kafka.apache.org/documentation/#producerconfigs 
> )
>
> -James
>
> > On Jan 4, 2017, at 1:25 AM, Hoang Bao Thien  wrote:
> >
> > Hi all,
> >
> > I have a problem with losing messages from Kafka.
> > The situation is as follows: I put a csv file with 286701 rows (size =
> > 110MB) into Kafka producer with command:
> > $ cat test.csv | kafka-console-producer.sh --broker-list localhost:9092
> > --topic MyTopic > /dev/null
> >
> > and then count the number of lines from the Kafka consumer
> > (kafka-console-consumer.sh --zookeeper localhost:2181 --topic MyTopic
> > --from-beginning)
> > However, I only get about 260K-270K, and this number of received messages
> > changes for each test.
> >
> > My configuration in the "config/server.properties" has some minor change
> > compared to the original file:
> >
> > log.retention.check.interval.hours=24
> > log.retention.hours=168
> > delete.topic.enable = true
> >
> > The remaining configurations are the same as default value.
> >
> > Could you please explain why the messages were lost in Kafka? And how to
> > fix this problem please?
> >
> > Thanks a lot.
> >
> > Best regards
> > ,
> > Alex
>


Metric meaning

2017-01-05 Thread Robert Quinlivan
Hello,

Are there more detailed descriptions available for the metrics exposed by
Kafka via JMX? The current documentation provides some information but a
few metrics are not listed in detail – for example, "Log flush rate and
time."

-- 
Robert Quinlivan
Software Engineer, Signal


Re: Problem with processor API partition assignments

2017-01-05 Thread Damian Guy
Hi Brian,

It might be helpful if you provide some code showing your Topology.

Thanks,
Damian

On Thu, 5 Jan 2017 at 10:59 Brian Krahmer  wrote:

> Hey guys,
>
>I'm fighting an issue where I can currently only run one instance of
> my streams application because when other instances come up, the
> partition reassignment (looks to me) to be incorrect.
>
> I'm testing with docker-compose at the moment.  When I scale my
> application to 3 instances and the 2nd and 3rd connect to kafka, causing
> a rebalance, I get the following assignment on one of my instances:
>
> FleetData-0
> FleetData-1
> VehicleJourneyMapData-0
> VehicleJourneyMapData-1
> JourneyStarted-0
> VehicleStateChanged-1
> VehicleIgnitionData-0
> VinVehicleMapData-1
>
> As you can see, the assignments are clearly not symmetric, which causes
> problems, as I'm essentially doing join operations.  All topics have 3
> partition in this testing scenario.  I'm using version 0.10.1.0.  Any
> ideas?
>
> thanks,
> brian
>
>


Re: MirrorMaker - Topics Identification and Replication

2017-01-05 Thread Greenhorn Techie
Thanks Ewen for your response.

Just to summarise, here is my understanding. Apologies if something is
mis-understood. I am new to Kafka and hence still short in knowledge.


   - MirrorMarker process automatically picks-up new topics added on the
   source cluster and hence no restart of the process is needed at regular
   intervals to update the list of topics
   - For existing topics, MirrorMaker will replicate messages to the target
   kafka cluster as and when it sees data in the source kafka cluster's topics
   - However for new topics, there might be a delay of up to (default) 5
   min i.e. metadata refresh interval to start replicating the data to the
   target kafka cluster

Please let me know if something is wrong in my understanding.

Thanks

On Tue, 3 Jan 2017 at 23:24 Ewen Cheslack-Postava  wrote:

> Yes, the consumer will pick up the new topics when it refreshes metadata
> (defaults to every 5 min) and start subscribing to the new topics.
>
> -Ewen
>
> On Tue, Jan 3, 2017 at 3:07 PM, Greenhorn Techie <
> greenhorntec...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I am new to Kafka and as well as MirrorMaker. So wondering whether MM
> would
> > pick-up new topics that are created on the source cluster automatically,
> > provided topic matches the while list pattern?
> >
> > For example, if I start MM as below, would it replicate any new topics
> that
> > are created after the MM process is launched?
> >
> > kafka-mirror-maker.sh --new.consumer --consumer.config /opt/kafka/config/
> > consumer.properties --producer.config /opt/kafka/config/producer.
> > properties
> > --whitelist ".*"
> >
> > Or is there a need to restart MM process when a new topic / pattern is
> > started on the source cluster?
> >
> > What should be the recommended approach in regards to this?
> >
> > Thanks
> >
>


Query on MirrorMaker Replication - Bi-directional/Failover replication

2017-01-05 Thread Greenhorn Techie
Hi,

We are planning to setup MirrorMaker based Kafka replication for DR
purposes. The base requirement is to have a DR replication from primary
(site1) to DR site  (site2)using MirrorMaker,

However, we need the solution to work in case of failover as well i.e.
where in the event of the site1 kafka cluster failing, site2 kafka cluster
would be made primary. Later when site1 cluster eventually comes back-up
online, direction of replication would be from site2 to site1.

But as I understand, the offsets on each of the clusters are different, so
wondering how to design the solution given this constraint and requirements.

Thanks


Problem with processor API partition assignments

2017-01-05 Thread Brian Krahmer

Hey guys,

  I'm fighting an issue where I can currently only run one instance of 
my streams application because when other instances come up, the 
partition reassignment (looks to me) to be incorrect.


I'm testing with docker-compose at the moment.  When I scale my 
application to 3 instances and the 2nd and 3rd connect to kafka, causing 
a rebalance, I get the following assignment on one of my instances:


FleetData-0
FleetData-1
VehicleJourneyMapData-0
VehicleJourneyMapData-1
JourneyStarted-0
VehicleStateChanged-1
VehicleIgnitionData-0
VinVehicleMapData-1

As you can see, the assignments are clearly not symmetric, which causes 
problems, as I'm essentially doing join operations.  All topics have 3 
partition in this testing scenario.  I'm using version 0.10.1.0.  Any ideas?


thanks,
brian



Re: Lost message with Kafka configuration

2017-01-05 Thread Hoang Bao Thien
Hi James et all,

Thanks for your help.
It works well when that parameter, but only for one CSV file.
If I run >=5 CSV files, each of size 110MB, the data is lost too (when I
check the number of received messages and the number of messages of
original files)
I get many lots of errors after re-running producers:

[2017-01-05 10:18:55,110] ERROR Error when sending message to topic MyTopic
with key: null, value: 380 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 36
record(s) expired due to timeout while requesting metadata from brokers for
MyTopic-0
[2017-01-05 10:18:55,110] ERROR Error when sending message to topic MyTopic
with key: null, value: 373 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 36
record(s) expired due to timeout while requesting metadata from brokers for
MyTopic-0


Could you please help me fix this problem?

Thanks a lot.

Best regards,
Alex


On Thu, Jan 5, 2017 at 9:53 AM, James Cheng  wrote:

> kafka-console-producer.sh defaults to acks=0, which means that the
> producer essentially throws messages at the broker and doesn't wait/retry
> to make sure they are properly received.
>
> In the kafka-console-producer.sh usage text:
> --request-required-acksrequest required acks>   requests (default: 0)
>
> Try re-running your test with "--request-required-acks -1" or
> "--request-required-acks all" (They are equivalent) This will tell the
> broker to wait for messages to be fully saved to all replicas before
> returning an acknowledgement to the producer. You can read more about acks
> in the producer configuration section of the kafka docs (
> http://kafka.apache.org/documentation/#producerconfigs <
> http://kafka.apache.org/documentation/#producerconfigs>)
>
> -James
>
> > On Jan 4, 2017, at 1:25 AM, Hoang Bao Thien 
> wrote:
> >
> > Hi all,
> >
> > I have a problem with losing messages from Kafka.
> > The situation is as follows: I put a csv file with 286701 rows (size =
> > 110MB)  into Kafka producer with command:
> > $ cat test.csv | kafka-console-producer.sh --broker-list localhost:9092
> > --topic MyTopic > /dev/null
> >
> > and then count the number of lines from the Kafka consumer
> > (kafka-console-consumer.sh --zookeeper localhost:2181 --topic MyTopic
> > --from-beginning)
> > However, I only get about 260K-270K, and this number of received messages
> > changes for each test.
> >
> > My configuration in the "config/server.properties" has some minor change
> > compared to the original file:
> >
> > log.retention.check.interval.hours=24
> > log.retention.hours=168
> > delete.topic.enable = true
> >
> > The remaining configurations are the same as default value.
> >
> > Could you please explain why the messages were lost in Kafka? And how to
> > fix this problem please?
> >
> > Thanks a lot.
> >
> > Best regards
> > ,
> > Alex
>
>


Re: Lost message with Kafka configuration

2017-01-05 Thread James Cheng
kafka-console-producer.sh defaults to acks=0, which means that the producer 
essentially throws messages at the broker and doesn't wait/retry to make sure 
they are properly received.

In the kafka-console-producer.sh usage text:
--request-required-acksrequests (default: 0)

Try re-running your test with "--request-required-acks -1" or  
"--request-required-acks all" (They are equivalent) This will tell the broker 
to wait for messages to be fully saved to all replicas before returning an 
acknowledgement to the producer. You can read more about acks in the producer 
configuration section of the kafka docs 
(http://kafka.apache.org/documentation/#producerconfigs 
)

-James

> On Jan 4, 2017, at 1:25 AM, Hoang Bao Thien  wrote:
> 
> Hi all,
> 
> I have a problem with losing messages from Kafka.
> The situation is as follows: I put a csv file with 286701 rows (size =
> 110MB)  into Kafka producer with command:
> $ cat test.csv | kafka-console-producer.sh --broker-list localhost:9092
> --topic MyTopic > /dev/null
> 
> and then count the number of lines from the Kafka consumer
> (kafka-console-consumer.sh --zookeeper localhost:2181 --topic MyTopic
> --from-beginning)
> However, I only get about 260K-270K, and this number of received messages
> changes for each test.
> 
> My configuration in the "config/server.properties" has some minor change
> compared to the original file:
> 
> log.retention.check.interval.hours=24
> log.retention.hours=168
> delete.topic.enable = true
> 
> The remaining configurations are the same as default value.
> 
> Could you please explain why the messages were lost in Kafka? And how to
> fix this problem please?
> 
> Thanks a lot.
> 
> Best regards
> ,
> Alex



Re: Is this a bug or just unintuitive behavior?

2017-01-05 Thread James Cheng
Jeff,

Your analysis is correct. I would say that it is known but unintuitive behavior.

As an example of a problem caused by this behavior, it's possible for 
mirrormaker to miss messages on newly created topics, even thought it was 
subscribed to them before topics were creted.

See the following JIRAs:
https://issues.apache.org/jira/browse/KAFKA-3848 

https://issues.apache.org/jira/browse/KAFKA-3370 


-James

> On Jan 4, 2017, at 4:37 PM, h...@confluent.io wrote:
> 
> This sounds exactly as I would expect things to behave. If you consume from 
> the beginning I would think you would get all the messages but not if you 
> consume from the latest offset. You can separately tune the metadata refresh 
> interval if you want to miss fewer messages but that still won't get you all 
> messages from the beginning if you don't explicitly consume from the 
> beginning.
> 
> Sent from my iPhone
> 
>> On Jan 4, 2017, at 6:53 PM, Jeff Widman  wrote:
>> 
>> I'm seeing consumers miss messages when they subscribe before the topic is
>> actually created.
>> 
>> Scenario:
>> 1) kafka 0.10.1.1 cluster with allow-topic no topics, but supports topic
>> auto-creation as soon as a message is published to the topic
>> 2) consumer subscribes using topic string or a regex pattern. Currently no
>> topics match. Consumer offset is "latest"
>> 3) producer publishes to a topic that matches the string or regex pattern.
>> 4) broker immediately creates a topic, writes the message, and also
>> notifies the consumer group that a rebalance needs to happen to assign the
>> topic_partition to one of the consumers..
>> 5) rebalance is fairly quick, maybe a second or so
>> 6) a consumer is assigned to the newly-created topic_partition
>> 
>> At this point, we've got a consumer steadily polling the recently created
>> topic_partition. However, the consumer.poll() never returns any messages
>> published between topic creation and when the consumer was assigned to the
>> topic_partition. I'm guessing this may be because when the consumer is
>> assigned to the topic_partition it doesn't find any, so it uses the latest
>> offset, which happens to be after the messages that were published to
>> create the topic.
>> 
>> This is surprising because the consumer technically was subscribed to the
>> topic before the messages were produced, so you'd think the consumer would
>> receive these messages.
>> 
>> Is this known behavior? A bug in Kafka broker? Or a bug in my client
>> library?