accessing state-store ala WordCount example

2016-12-06 Thread Jon Yeargers
I copied out some of the WordCountInteractive

demo
code to see how the REST access works. I have an aggregator

groupByKey().aggregate(LogLine::new,
new aggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "agg_stream");


I incorporated the Jetty-server bits from the sample. When I run it I can
see results via the '/states/instances' entry point but nothing from the
'/states/keyvalues/agg_stream/all'.

The aggregator is churning away so I'd assume the state store would have
plenty of key/value pairs but it comes up empty.

What's the proper way to use this?


Re: Creating a connector with Kafka Connect Distributed returning 500 error

2016-12-06 Thread Konstantine Karantasis
Hi Phillip,

may I ask which Kafka version did you use?

trunk repo in Apache Kafka contained briefly a bug in Connect framework
(during the past week) that produced failures similar to the one you
describe (only in distributed mode). A fix has been pushed since yesterday.

3) Some useful step-by-step information is provided in the quickstart guide
here:
https://kafka.apache.org/quickstart#quickstart_kafkaconnect

as well as in the documentation of Confluent:
http://docs.confluent.io/3.1.0/connect/quickstart.html#

Alternatively, you might want to follow the quickstart guide of one of the
open source connectors, here:
http://docs.confluent.io/3.1.0/connect/connectors.html

2) From what you mention above, it seems more like that you're hitting this
temporary bug. But again that depends on which Kafka version you've been
using.

1) Generating logs, in one of the debug levels (e.g. DEBUG, TRACE) is
usually a useful source of information.
Alternatively you may chose to run Connect in debug mode by setting the
environment variable KAFKA_DEBUG and attaching a remote debugger to it
(such as IntelliJ's remote debugging capability). With respect to live
debugging, we are planning to post a step-by-step guide for Kafka and Kafka
Connect soon.

Regards,
Konstantine

On Tue, Dec 6, 2016 at 11:22 AM, Phillip Mann  wrote:

> I am working on migrating from Camus to Kafka Connect. I am working on the
> implementation of Kafka Connect and specifically focused on distributed
> mode. I am able to start a worker successfully on my local machine which I
> assume communicates with my Kafka cluster. I am further able to run two GET
> commands such as / and /connector-plugins which return the correct JSON.
> However, when I try to POST a command to create a connector, I receive a
> 500 error and a time out. Specifically, I use this command to POST for
> testing:
>
> curl -X POST -H "Content-Type: application/json" --data '{"name":
> "local-file-sink", "config": {"connector.class":"FileStreamSinkConnector",
> "tasks.max":"1", "file":"test.sink.txt", "topics":"myTopic" }}'
> localhost:8083/connectors
>
> and eventually I get this response:
>
> {"error_code": 500, "message": "Request timed out"}
>
> I am lost as to what is going on. The logs from my Kafka Connect
> distributed worker show this:
>
> [2016-12-05 14:34:32,436] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:22:34:32
> +] "GET /connector-plugins HTTP/1.1" 200 315  2
> (org.apache.kafka.connect.runtime.rest.RestServer:60)
> [2016-12-05 15:05:25,422] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:05:25
> +] "GET /connector-plugins HTTP/1.1" 200 315  3
> (org.apache.kafka.connect.runtime.rest.RestServer:60)
> [2016-12-05 15:05:28,389] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:05:28
> +] "GET /connector-plugins HTTP/1.1" 200 315  2
> (org.apache.kafka.connect.runtime.rest.RestServer:60)
> [2016-12-05 15:07:38,644] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:06:08
> +] "GET /connectors HTTP/1.1" 500 48  90003 (org.apache.kafka.connect.
> runtime.rest.RestServer:60)
> [2016-12-05 15:07:44,450] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:07:44
> +] "GET /connector-plugins HTTP/1.1" 200 315  1
> (org.apache.kafka.connect.runtime.rest.RestServer:60)
> [2016-12-05 15:13:06,703] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:11:36
> +] "POST /connectors HTTP/1.1" 500 48  90003 (org.apache.kafka.connect.
> runtime.rest.RestServer:60)
> [2016-12-05 15:15:38,506] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:14:08
> +] "POST /connectors HTTP/1.1" 500 48  90005 (org.apache.kafka.connect.
> runtime.rest.RestServer:60)
>
> where you can see the error codes and the commands.
>
> I guess my main questions and issues are:
>
>   1.  How can I better debug Kafka Connect so I can try and fix this?
>   2.  Is there anything that I'm doing that is glaringly wrong?
>   3.  Is there any step-by-step documentation or blog posts on getting a
> Kafka Connect distributed worker and connector to run? I have not really
> seen anything or even best practices kinds of documentation? Maybe I am
> just way too early of an adopter.
>
> I look forward to hearing back from the community and thank you for your
> help!
>
>


Re: Re: Topic discovery when supporting multiple kafka clusters

2016-12-06 Thread Yifan Ying
Hi Brian,

We have 5 brokers and ~80 topics. And the total # of partitions is around
7k partitions if not including replicas (So it's close to the limit that
Netflix recommends). Most topics have RF as 2. CPU is only around 25%
usage. The average consumers for each topic should be around 3-4. Our disk
space is the current bottleneck as we have some topics producing relatively
large messages, so we have to lower retention for some topics to only 1
hour. When adding our 5th broker, we had trouble to migrate
__consumer_offsets topic because of
https://issues.apache.org/jira/browse/KAFKA-4362. So __consumer_offsets
have to live in the first 4 brokers even we keep adding brokers.

We want to add a new cluster for some specific group of topics which serves
large messages and needs a much longer retention. This is also to reduce
operational complexity. I am willing to get any suggestions on scaling the
current cluster, but also curious to learn how people do topic discovery.

On Tue, Dec 6, 2016 at 12:37 PM, Brian Krahmer  wrote:

> You didn't mention anything about your current configuration, just that
> you are 'out of resources'.  Perhaps you misunderstand how to size your
> partitions per topic, and how partition allocation works.  If your brokers
> are maxed on cpu, and you double the number of brokers but keep the replica
> count the same, I would expect cpu usage to nearly get cut in half.  How
> many brokers do you have, how many topics do you have and how many
> partitions per topic do you have?  What is your resource utilization for
> bandwidth, CPU, and memory?  How many average consumers do you have for
> each topic?
>
> brian
>
>
>
> On 06.12.2016 21:23, Yifan Ying wrote:
>
>> Hi Aseem, the concern is to create too many partitions in total in one
>> cluster no matter how many brokers I have in this cluster. I think the two
>> articles that I mentioned explain why too many partitions in one cluster
>> could cause issues.
>>
>>
>>
>


-- 
Yifan


Re: One Kafka Broker Went Rogue

2016-12-06 Thread Thomas DeVoe
Hi All,

This happened again on our kafka cluster - a single kafka broker seems to
"forget" the existence of the rest of the cluster and shrinks all of its
ISRs to only exist on that node. The other two nodes get stuck in a loop
trying to connect to this rogue node and never even register that it is no
longer part of the cluster. Strangely network connection between all of
these nodes is fine at that time and restarting the node resolves it
(though with some data loss due to unclean leader elections)

Anyone have any ideas? Help would be greatly appreciated.

Thanks,






*Tom DeVoe*
Sofware Engineer, Data

6 East 32nd Street, 2nd Floor
New York, NY 10016



Dataminr is a Twitter Official Partner.
Dataminr in the news: The Economist

 | International Business Times

 | Equities.com

 | SIA



On Tue, Nov 29, 2016 at 1:29 PM, Thomas DeVoe  wrote:

> Hi,
>
> I encountered a strange issue in our kafka cluster, where randomly a
> single broker entered a state where it seemed to think it was the only
> broker in the cluster (it shrank all of its ISRs to just existing on
> itself). Some details about the kafka cluster:
>
> - running in an EC2 VPC on AWS
> - 3 nodes (d2.xlarge)
> - Kafka version : 0.10.1.0
>
> More information about the incident:
>
> Around 19:57 yesterday, one of the nodes somehow lost its connection to
> the cluster and started reporting messages like this for what seemed to be
> all of its hosted topic partitions:
>
> [2016-11-28 19:57:05,426] INFO Partition [arches_stage,0] on broker 1002:
>> Shrinking ISR for partition [arches_stage,0] from 1003,1002,1001 to 1002
>> (kafka.cluster.Partition)
>> [2016-11-28 19:57:05,466] INFO Partition [connect-offsets,13] on broker
>> 1002: Shrinking ISR for partition [connect-offsets,13] from 1003,1002,1001
>> to 1002 (kafka.cluster.Partition)
>> [2016-11-28 19:57:05,489] INFO Partition [lasagna_prod_memstore,2] on
>> broker 1002: Shrinking ISR for partition [lasagna_prod_memstore,2] from
>> 1003,1002,1001 to 1002 (kafka.cluster.Partition)
>> ...
>>
>
> It then added the ISRs from the other machines back in:
>
> [2016-11-28 19:57:18,013] INFO Partition [arches_stage,0] on broker 1002:
>> Expanding ISR for partition [arches_stage,0] from 1002 to 1002,1003
>> (kafka.cluster.Partition)
>> [2016-11-28 19:57:18,015] INFO Partition [connect-offsets,13] on broker
>> 1002: Expanding ISR for partition [connect-offsets,13] from 1002 to
>> 1002,1003 (kafka.cluster.Partition)
>> [2016-11-28 19:57:18,018] INFO Partition [lasagna_prod_memstore,2] on
>> broker 1002: Expanding ISR for partition [lasagna_prod_memstore,2] from
>> 1002 to 1002,1003 (kafka.cluster.Partition)
>> ...
>> [2016-11-28 19:57:18,222] INFO Partition [arches_stage,0] on broker 1002:
>> Expanding ISR for partition [arches_stage,0] from 1002,1003 to
>> 1002,1003,1001 (kafka.cluster.Partition)
>> [2016-11-28 19:57:18,224] INFO Partition [connect-offsets,13] on broker
>> 1002: Expanding ISR for partition [connect-offsets,13] from 1002,1003 to
>> 1002,1003,1001 (kafka.cluster.Partition)
>> [2016-11-28 19:57:18,227] INFO Partition [lasagna_prod_memstore,2] on
>> broker 1002: Expanding ISR for partition [lasagna_prod_memstore,2] from
>> 1002,1003 to 1002,1003,1001 (kafka.cluster.Partition)
>
>
> and eventually removed them again before going on its merry way:
>
> [2016-11-28 19:58:05,408] INFO Partition [arches_stage,0] on broker 1002:
>> Shrinking ISR for partition [arches_stage,0] from 1002,1003,1001 to 1002
>> (kafka.cluster.Partition)
>> [2016-11-28 19:58:05,415] INFO Partition [connect-offsets,13] on broker
>> 1002: Shrinking ISR for partition [connect-offsets,13] from 1002,1003,1001
>> to 1002 (kafka.cluster.Partition)
>> [2016-11-28 19:58:05,416] INFO Partition [lasagna_prod_memstore,2] on
>> broker 1002: Shrinking ISR for partition [lasagna_prod_memstore,2] from
>> 1002,1003,1001 to 1002 (kafka.cluster.Partition)
>
>
> Node 1002 continued running from that point on normally (outside of the
> fact that all of it's partitions were under replicated). Also there were no
> WARN/ERROR before/after this.
>
>
> The other two nodes were not so happy however, with both failing to
> connect to via the ReplicaFetcherThread to the node in question. The
> reported this around the same time as that error:
>
> [2016-11-28 19:57:16,087] WARN [ReplicaFetcherThread-0-1002], Error in
>> fetch kafka.server.ReplicaFetcherThread$FetchRequest@6eb44718
>> (kafka.server.ReplicaFetcherThread)
>> java.io.IOException: Connection to 1002 was disconnected before the
>> 

Fwd: How to disable auto commit for SimpleConsumer kafka 0.8.1

2016-12-06 Thread Anjani Gupta
I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1
version.For High level consumer, config options can be set and passed via
consumerConfig as follows kafka.consumer.Consumer.
createJavaConsumerConnector(this.consumerConfig);

How can I achieve the same for SimpleConsumer? I mainly want to disable
auto commit. I tried setting auto commit to false in consumer.properties
and restarted kafka server, zookeeper and producer. But, that does not
work. I think I need to apply this setting through code, not in
consumer.properties. Can anyone help here?

Here is how my code looks like

List topicAndPartitionList = new ArrayList<>();
topicAndPartitionList.add(topicAndPartition);
OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
 OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0,
correlationId,clientName));


Map offsets =
offsetFetchResponse.offsets();
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
   .addFetch(a_topic, a_partition,
offsets.get(topicAndPartition).offset(), 10)   .build();

long readOffset = offsets.get(topicAndPartition).offset();
FetchResponse fetchResponse = consumer.fetch(req);

//Consume messages from fetchResponse


Map requestInfo = new
HashMap<>  ();
requestInfo.put(topicAndPartition, new
OffsetMetadataAndError(readOffset, "metadata", (short)0));
OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new
OffsetCommitRequest("testGroup", requestInfo, (short)0,
correlationId, clientName));


If above code crashes before committing offset, I still get latest offset
as result of offsets.get(topicAndPartition).offset() in next run which
makes me to think that auto commit of offset happens as code is executed.


Re: NotLeaderForPartitionException

2016-12-06 Thread Apurva Mehta
Hi Sven,

You will see this exception during leader election. When the leader for a
partition moves to another broker, there is a period during which the
replicas would still connect to the original leader, at which point they
will raise this exception. This should be a very short period, after which
they will connect to and replicate from the new leader correctly.

This is not a fatal error, and you will see it if you are bouncing brokers
(since all the leaders on that broker will have to move after the bounce).
You may also see it if some brokers have connectivity issues: they may be
considered dead, and their partitions would be moved elsewhere.

Hope this helps,
Apurva

On Tue, Dec 6, 2016 at 10:06 AM, Sven Ludwig  wrote:

> Hello,
>
> in our Kafka clusters we sometimes observe a specific ERROR log-statement,
> and therefore we have doubts whether it is already running sable in our
> configuration. This occurs every now and then, like two or three times in a
> day. It is actually the predominant ERROR log-statement in our cluster.
> Example:
>
> [2016-12-06 17:14:50,909] ERROR [ReplicaFetcherThread-0-3], Error for
> partition [,] to broker
> 3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is
> not the leader for that topic-partition. (kafka.server.
> ReplicaFetcherThread)
>
> We already asked Google, but we did not find sufficient answers to our
> questions, therefore I am asking on the mailing list:
>
> 1. What are the possible reasons for this particular error?
>
> 2. What are the implications of it?
>
> 3. What can be done to prevent it?
>
> Best Regards,
> Sven
>


Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Here's the solution (props to Damian G)

JsonSerializer keySerializer = new JsonSerializer<>();
JsonDeserializer keyDeserializer = new
JsonDeserializer<>(AggKey.class);
Serde keySerde = Serdes.serdeFrom(keySerializer, keyDeserializer);

then for the aggregator call 'groupByKey(keySerde, prtRecordSerde)'.

In the documentation where it says the 'no param' groupByKey will use the
default serializers - this doesn't seem to be true.

On Tue, Dec 6, 2016 at 12:28 PM, Jon Yeargers 
wrote:

> Hmm. That's odd as the aggregation works ok if I use a String value for
> the key (and the corresponding String serde).
>
> This error only started occurring when I tried to substitute my 'custom'
> key for the original String.
>
> On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski 
> wrote:
>
>> Yeah, I knew that already, this part of the error:
>>
>> > > >>> > > org.apache.kafka.streams.processor.internals.
>> > RecordCollector.send(
>> > > >>> > RecordCollector.java:73)
>>
>> points to this line: https://github.com/apache/kafka/blob/0.10.1/streams/
>> src/main/java/org/apache/kafka/streams/processor/
>> internals/RecordCollector.java#L73
>>
>> which means that your error happens on the value, not the key.
>>
>> –
>> Best regards,
>> Radek Gruchalski
>> ra...@gruchalski.com
>>
>>
>> On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com)
>> wrote:
>>
>> 0.10.1.0
>>
>> On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski 
>> wrote:
>>
>> > Jon,
>> >
>> > Are you using 0.10.1 or 0.10.0.1?
>> >
>> > –
>> > Best regards,
>> > Radek Gruchalski
>> > ra...@gruchalski.com
>> >
>> >
>> > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
>> > wrote:
>> >
>> > Hi Jon,
>> >
>> > At a glance the code looks ok, i.e, i believe the aggregate() should
>> have
>> > picked up the default Serde set in your StreamsConfig. However, you
>> could
>> > try adding the Serdes to the groupBy(..)
>> >
>> > i.e.,
>> > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers 
>> wrote:
>> >
>> > > It's just a bunch of public 'int' and 'String' values. There's an
>> empty
>> > > constructor and a copy constructor.
>> > >
>> > > For functions I override 'equals' and the requirements for 'serde'
>> > (close,
>> > > configure, serializer and deserializer).
>> > >
>> > > @Override
>> > > public Serializer serializer() {
>> > > JsonSerializer jsonSerializer = new JsonSerializer<>();
>> > > return jsonSerializer;
>> > > }
>> > >
>> > > @Override
>> > > public Deserializer deserializer() {
>> > > JsonDeserializer jsonDeserializer = new
>> > > JsonDeserializer<>();
>> > > return jsonDeserializer;
>> > > }
>> > >
>> > >
>> > >
>> > >
>> > > Which relates to:
>> > >
>> > > public class JsonSerializer implements Serializer {
>> > >
>> > > private Gson gson = new Gson();
>> > >
>> > > @Override
>> > > public void configure(Map map, boolean b) {
>> > >
>> > > }
>> > >
>> > > @Override
>> > > public byte[] serialize(String topic, T t) {
>> > > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
>> > > }
>> > >
>> > > @Override
>> > > public void close() {
>> > >
>> > > }
>> > > }
>> > >
>> > >
>> > >
>> > > public class JsonDeserializer implements Deserializer {
>> > >
>> > > private Gson gson = new Gson();
>> > > private Class deserializedClass;
>> > >
>> > > public JsonDeserializer(Class deserializedClass) {
>> > > this.deserializedClass = deserializedClass;
>> > > }
>> > >
>> > > public JsonDeserializer() {
>> > > }
>> > >
>> > > @Override
>> > > @SuppressWarnings("unchecked")
>> > > public void configure(Map map, boolean b) {
>> > > if(deserializedClass == null) {
>> > > deserializedClass = (Class) map.get("serializedClass");
>> > > }
>> > > }
>> > >
>> > > @Override
>> > > public T deserialize(String s, byte[] bytes) {
>> > > if(bytes == null){
>> > > return null;
>> > > }
>> > >
>> > > return gson.fromJson(new String(bytes),deserializedClass);
>> > >
>> > > }
>> > >
>> > > @Override
>> > > public void close() {
>> > >
>> > > }
>> > > }
>> > >
>> > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <
>> ra...@gruchalski.com>
>> > > wrote:
>> > >
>> > > > Do you mind sharing the code of AggKey class?
>> > > >
>> > > > –
>> > > > Best regards,
>> > > > Radek Gruchalski
>> > > > ra...@gruchalski.com
>> > > >
>> > > >
>> > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
>> > > jon.yearg...@cedexis.com)
>> > > > wrote:
>> > > >
>> > > > The 2nd.
>> > > >
>> > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <
>> > ra...@gruchalski.com>
>> > > > wrote:
>> > > >
>> > > >> Is the error happening at this stage?
>> > > >>
>> > > >> KStream rtRekey = rtDetailLines.map((key,
>> > > value)
>> > > >> -> new KeyValue<>(new AggKey(value), value));
>> > > >>
>> > > >> or here:
>> > > >>
>> > > >> 

Re: Re: Topic discovery when supporting multiple kafka clusters

2016-12-06 Thread Brian Krahmer
You didn't mention anything about your current configuration, just that 
you are 'out of resources'.  Perhaps you misunderstand how to size your 
partitions per topic, and how partition allocation works.  If your 
brokers are maxed on cpu, and you double the number of brokers but keep 
the replica count the same, I would expect cpu usage to nearly get cut 
in half.  How many brokers do you have, how many topics do you have and 
how many partitions per topic do you have?  What is your resource 
utilization for bandwidth, CPU, and memory?  How many average consumers 
do you have for each topic?


brian


On 06.12.2016 21:23, Yifan Ying wrote:

Hi Aseem, the concern is to create too many partitions in total in one
cluster no matter how many brokers I have in this cluster. I think the two
articles that I mentioned explain why too many partitions in one cluster
could cause issues.






Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Hmm. That's odd as the aggregation works ok if I use a String value for the
key (and the corresponding String serde).

This error only started occurring when I tried to substitute my 'custom'
key for the original String.

On Tue, Dec 6, 2016 at 12:24 PM, Radek Gruchalski 
wrote:

> Yeah, I knew that already, this part of the error:
>
> > > >>> > > org.apache.kafka.streams.processor.internals.
> > RecordCollector.send(
> > > >>> > RecordCollector.java:73)
>
> points to this line: https://github.com/apache/kafka/blob/0.10.1/
> streams/src/main/java/org/apache/kafka/streams/processor/internals/
> RecordCollector.java#L73
>
> which means that your error happens on the value, not the key.
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> 0.10.1.0
>
> On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski 
> wrote:
>
> > Jon,
> >
> > Are you using 0.10.1 or 0.10.0.1?
> >
> > –
> > Best regards,
> > Radek Gruchalski
> > ra...@gruchalski.com
> >
> >
> > On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
> > wrote:
> >
> > Hi Jon,
> >
> > At a glance the code looks ok, i.e, i believe the aggregate() should
> have
> > picked up the default Serde set in your StreamsConfig. However, you
> could
> > try adding the Serdes to the groupBy(..)
> >
> > i.e.,
> > rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
> >
> > Thanks,
> > Damian
> >
> > On Tue, 6 Dec 2016 at 18:42 Jon Yeargers 
> wrote:
> >
> > > It's just a bunch of public 'int' and 'String' values. There's an
> empty
> > > constructor and a copy constructor.
> > >
> > > For functions I override 'equals' and the requirements for 'serde'
> > (close,
> > > configure, serializer and deserializer).
> > >
> > > @Override
> > > public Serializer serializer() {
> > > JsonSerializer jsonSerializer = new JsonSerializer<>();
> > > return jsonSerializer;
> > > }
> > >
> > > @Override
> > > public Deserializer deserializer() {
> > > JsonDeserializer jsonDeserializer = new
> > > JsonDeserializer<>();
> > > return jsonDeserializer;
> > > }
> > >
> > >
> > >
> > >
> > > Which relates to:
> > >
> > > public class JsonSerializer implements Serializer {
> > >
> > > private Gson gson = new Gson();
> > >
> > > @Override
> > > public void configure(Map map, boolean b) {
> > >
> > > }
> > >
> > > @Override
> > > public byte[] serialize(String topic, T t) {
> > > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> > > }
> > >
> > > @Override
> > > public void close() {
> > >
> > > }
> > > }
> > >
> > >
> > >
> > > public class JsonDeserializer implements Deserializer {
> > >
> > > private Gson gson = new Gson();
> > > private Class deserializedClass;
> > >
> > > public JsonDeserializer(Class deserializedClass) {
> > > this.deserializedClass = deserializedClass;
> > > }
> > >
> > > public JsonDeserializer() {
> > > }
> > >
> > > @Override
> > > @SuppressWarnings("unchecked")
> > > public void configure(Map map, boolean b) {
> > > if(deserializedClass == null) {
> > > deserializedClass = (Class) map.get("serializedClass");
> > > }
> > > }
> > >
> > > @Override
> > > public T deserialize(String s, byte[] bytes) {
> > > if(bytes == null){
> > > return null;
> > > }
> > >
> > > return gson.fromJson(new String(bytes),deserializedClass);
> > >
> > > }
> > >
> > > @Override
> > > public void close() {
> > >
> > > }
> > > }
> > >
> > > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > > wrote:
> > >
> > > > Do you mind sharing the code of AggKey class?
> > > >
> > > > –
> > > > Best regards,
> > > > Radek Gruchalski
> > > > ra...@gruchalski.com
> > > >
> > > >
> > > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> > > jon.yearg...@cedexis.com)
> > > > wrote:
> > > >
> > > > The 2nd.
> > > >
> > > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <
> > ra...@gruchalski.com>
> > > > wrote:
> > > >
> > > >> Is the error happening at this stage?
> > > >>
> > > >> KStream rtRekey = rtDetailLines.map((key,
> > > value)
> > > >> -> new KeyValue<>(new AggKey(value), value));
> > > >>
> > > >> or here:
> > > >>
> > > >> KTable ktRtDetail =
> > > >> rtRekey.groupByKey().aggregate(
> > > >> BqRtDetailLogLine_aggregate::new,
> > > >> new PRTAggregate(),
> > > >> TimeWindows.of(60 * 60 * 1000L),
> > > >> collectorSerde, "prt_minute_agg_stream");
> > > >>
> > > >> –
> > > >>
> > > >> Best regards,
> > > >> Radek Gruchalski
> > > >> ra...@gruchalski.com
> > > >>
> > > >>
> > > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> > > jon.yearg...@cedexis.com)
> > > >> wrote:
> > > >>
> > > >> If I comment out the aggregation step and just .print the .map step
> I
> > > >> don't hit the error. It's coming from aggregating the non-String
> key.
> > > >>
> > > >> On Tue, Dec 

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Yeah, I knew that already, this part of the error:

> > >>> > > org.apache.kafka.streams.processor.internals.
> RecordCollector.send(
> > >>> > RecordCollector.java:73)

points to this line:
https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java#L73

which means that your error happens on the value, not the key.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 9:18:53 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

0.10.1.0

On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski 
wrote:

> Jon,
>
> Are you using 0.10.1 or 0.10.0.1?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
> wrote:
>
> Hi Jon,
>
> At a glance the code looks ok, i.e, i believe the aggregate() should have
> picked up the default Serde set in your StreamsConfig. However, you could
> try adding the Serdes to the groupBy(..)
>
> i.e.,
> rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 18:42 Jon Yeargers 
wrote:
>
> > It's just a bunch of public 'int' and 'String' values. There's an empty
> > constructor and a copy constructor.
> >
> > For functions I override 'equals' and the requirements for 'serde'
> (close,
> > configure, serializer and deserializer).
> >
> > @Override
> > public Serializer serializer() {
> > JsonSerializer jsonSerializer = new JsonSerializer<>();
> > return jsonSerializer;
> > }
> >
> > @Override
> > public Deserializer deserializer() {
> > JsonDeserializer jsonDeserializer = new
> > JsonDeserializer<>();
> > return jsonDeserializer;
> > }
> >
> >
> >
> >
> > Which relates to:
> >
> > public class JsonSerializer implements Serializer {
> >
> > private Gson gson = new Gson();
> >
> > @Override
> > public void configure(Map map, boolean b) {
> >
> > }
> >
> > @Override
> > public byte[] serialize(String topic, T t) {
> > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> >
> >
> > public class JsonDeserializer implements Deserializer {
> >
> > private Gson gson = new Gson();
> > private Class deserializedClass;
> >
> > public JsonDeserializer(Class deserializedClass) {
> > this.deserializedClass = deserializedClass;
> > }
> >
> > public JsonDeserializer() {
> > }
> >
> > @Override
> > @SuppressWarnings("unchecked")
> > public void configure(Map map, boolean b) {
> > if(deserializedClass == null) {
> > deserializedClass = (Class) map.get("serializedClass");
> > }
> > }
> >
> > @Override
> > public T deserialize(String s, byte[] bytes) {
> > if(bytes == null){
> > return null;
> > }
> >
> > return gson.fromJson(new String(bytes),deserializedClass);
> >
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski 

> > wrote:
> >
> > > Do you mind sharing the code of AggKey class?
> > >
> > > –
> > > Best regards,
> > > Radek Gruchalski
> > > ra...@gruchalski.com
> > >
> > >
> > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> > jon.yearg...@cedexis.com)
> > > wrote:
> > >
> > > The 2nd.
> > >
> > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > > wrote:
> > >
> > >> Is the error happening at this stage?
> > >>
> > >> KStream rtRekey = rtDetailLines.map((key,
> > value)
> > >> -> new KeyValue<>(new AggKey(value), value));
> > >>
> > >> or here:
> > >>
> > >> KTable ktRtDetail =
> > >> rtRekey.groupByKey().aggregate(
> > >> BqRtDetailLogLine_aggregate::new,
> > >> new PRTAggregate(),
> > >> TimeWindows.of(60 * 60 * 1000L),
> > >> collectorSerde, "prt_minute_agg_stream");
> > >>
> > >> –
> > >>
> > >> Best regards,
> > >> Radek Gruchalski
> > >> ra...@gruchalski.com
> > >>
> > >>
> > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> > jon.yearg...@cedexis.com)
> > >> wrote:
> > >>
> > >> If I comment out the aggregation step and just .print the .map step
I
> > >> don't hit the error. It's coming from aggregating the non-String
key.
> > >>
> > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > >> wrote:
> > >>
> > >>> Jon,
> > >>>
> > >>> Looking at your code:
> > >>>
> > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > >>> Serdes.String().getClass().getName());
> > >>>
> > >>> and later:
> > >>>
> > >>> KStream rtDetailLines =
> > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> > >>>
> > >>> Is RtDetailLogLine inheriting from String? It is not, as the error
> > >>> suggests.
> > >>> You may have to write your own Serializer / Deserializer for
> > >>> RtDetailLogLine.
> > >>>
> > >>> –
> > >>> Best regards,
> > >>> Radek Gruchalski
> > >>> 

Re: Topic discovery when supporting multiple kafka clusters

2016-12-06 Thread Yifan Ying
Hi Aseem, the concern is to create too many partitions in total in one
cluster no matter how many brokers I have in this cluster. I think the two
articles that I mentioned explain why too many partitions in one cluster
could cause issues.

On Tue, Dec 6, 2016 at 12:08 PM, Aseem Bansal  wrote:

> @Yifan Ying Why not add more brokers in your cluster? That will not
> increase the partitions. Does increasing the number of brokers cause you
> any problem? How many brokers do you have in the cluster already?
>
> On Wed, Dec 7, 2016 at 12:35 AM, Yifan Ying  wrote:
>
> > Thanks Asaf, Aseem.
> >
> > Assigning topics to only a specific set of brokers will probably cause
> > uneven traffic and it won't prevent topics to be re-assigned to other
> > brokers when brokers fail.
> >
> > Like I said, the original cluster is close to out of resources. I
> remember
> > there's some limit on # of partitions that each Kafka cluster can have.
> > Netflix recommends to keep it below 10k to improve availability and
> reduce
> > latency,
> > http://techblog.netflix.com/2016/04/kafka-inside-keystone-pipeline.html.
> > Jun Rao also wrote a blog(
> > https://www.confluent.io/blog/how-to-choose-the-number-of-
> > topicspartitions-in-a-kafka-cluster/)
> > about how too many partitions could hurt availability and latency. That's
> > why we want to create another cluster instead of expanding the current
> one.
> > I know a lot of companies are maintaining multiple clusters, and I'm
> > curious how people are doing topic discovery.
> >
> >
> >
> > On Tue, Dec 6, 2016 at 4:04 AM, Aseem Bansal 
> wrote:
> >
> > > What configurations allow you to assign topics to specific brokers?
> > >
> > > I can see https://kafka.apache.org/documentation#basic_ops_automigrate
> .
> > > This should allow you to move things around but does that keep anything
> > > from being re-assigned to the old ones?
> > >
> > > On Tue, Dec 6, 2016 at 5:25 PM, Asaf Mesika 
> > wrote:
> > >
> > > > Why not re-use same cluster? You can assign topics to be live only
> > > within a
> > > > specific set of brokers. Thus you have one "bus" for messages,
> > > simplifying
> > > > your applications code and configurations
> > > >
> > > > On Mon, Dec 5, 2016 at 9:43 PM Yifan Ying 
> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Initially, we have only one Kafka cluster shared across all teams.
> > But
> > > > now
> > > > > this cluster is very close to out of resources (disk space, # of
> > > > > partitions, etc.). So we are considering adding another Kafka
> > cluster.
> > > > But
> > > > > what's the best practice of topic discovery, so that applications
> > know
> > > > > which cluster their topics live? We have been using Zookeeper for
> > > service
> > > > > discovery, maybe it's also good for this purpose?
> > > > >
> > > > > Thanks
> > > > >
> > > > > --
> > > > > Yifan
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > Yifan
> >
>



-- 
Yifan


Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
0.10.1.0

On Tue, Dec 6, 2016 at 11:11 AM, Radek Gruchalski 
wrote:

> Jon,
>
> Are you using 0.10.1 or 0.10.0.1?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com)
> wrote:
>
> Hi Jon,
>
> At a glance the code looks ok, i.e, i believe the aggregate() should have
> picked up the default Serde set in your StreamsConfig. However, you could
> try adding the Serdes to the groupBy(..)
>
> i.e.,
> rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 18:42 Jon Yeargers  wrote:
>
> > It's just a bunch of public 'int' and 'String' values. There's an empty
> > constructor and a copy constructor.
> >
> > For functions I override 'equals'  and the requirements for 'serde'
> (close,
> > configure, serializer and deserializer).
> >
> >  @Override
> > public Serializer serializer() {
> > JsonSerializer jsonSerializer = new JsonSerializer<>();
> > return jsonSerializer;
> > }
> >
> > @Override
> > public Deserializer deserializer() {
> > JsonDeserializer jsonDeserializer = new
> > JsonDeserializer<>();
> > return jsonDeserializer;
> > }
> >
> >
> >
> >
> > Which relates to:
> >
> > public class JsonSerializer implements Serializer {
> >
> > private Gson gson = new Gson();
> >
> > @Override
> > public void configure(Map map, boolean b) {
> >
> > }
> >
> > @Override
> > public byte[] serialize(String topic, T t) {
> > return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> >
> >
> > public class JsonDeserializer implements Deserializer {
> >
> > private Gson gson = new Gson();
> > private Class deserializedClass;
> >
> > public JsonDeserializer(Class deserializedClass) {
> > this.deserializedClass = deserializedClass;
> > }
> >
> > public JsonDeserializer() {
> > }
> >
> > @Override
> > @SuppressWarnings("unchecked")
> > public void configure(Map map, boolean b) {
> > if(deserializedClass == null) {
> > deserializedClass = (Class) map.get("serializedClass");
> > }
> > }
> >
> > @Override
> > public T deserialize(String s, byte[] bytes) {
> > if(bytes == null){
> > return null;
> > }
> >
> > return gson.fromJson(new String(bytes),deserializedClass);
> >
> > }
> >
> > @Override
> > public void close() {
> >
> > }
> > }
> >
> > On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski 
> > wrote:
> >
> > > Do you mind sharing the code of AggKey class?
> > >
> > > –
> > > Best regards,
> > > Radek Gruchalski
> > > ra...@gruchalski.com
> > >
> > >
> > > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> > jon.yearg...@cedexis.com)
> > > wrote:
> > >
> > > The 2nd.
> > >
> > > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > > wrote:
> > >
> > >> Is the error happening at this stage?
> > >>
> > >> KStream rtRekey = rtDetailLines.map((key,
> > value)
> > >>  -> new KeyValue<>(new AggKey(value), value));
> > >>
> > >> or here:
> > >>
> > >> KTable ktRtDetail =
> > >> rtRekey.groupByKey().aggregate(
> > >> BqRtDetailLogLine_aggregate::new,
> > >> new PRTAggregate(),
> > >> TimeWindows.of(60 * 60 * 1000L),
> > >> collectorSerde, "prt_minute_agg_stream");
> > >>
> > >> –
> > >>
> > >> Best regards,
> > >> Radek Gruchalski
> > >> ra...@gruchalski.com
> > >>
> > >>
> > >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> > jon.yearg...@cedexis.com)
> > >> wrote:
> > >>
> > >> If I comment out the aggregation step and just .print the .map step I
> > >> don't hit the error. It's coming from aggregating the non-String key.
> > >>
> > >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski <
> ra...@gruchalski.com>
> > >> wrote:
> > >>
> > >>> Jon,
> > >>>
> > >>> Looking at your code:
> > >>>
> > >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > >>> Serdes.String().getClass().getName());
> > >>>
> > >>> and later:
> > >>>
> > >>> KStream rtDetailLines =
> > >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> > >>>
> > >>> Is RtDetailLogLine inheriting from String? It is not, as the error
> > >>> suggests.
> > >>> You may have to write your own Serializer / Deserializer for
> > >>> RtDetailLogLine.
> > >>>
> > >>> –
> > >>> Best regards,
> > >>> Radek Gruchalski
> > >>> ra...@gruchalski.com
> > >>>
> > >>>
> > >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
> > >>> jon.yearg...@cedexis.com) wrote:
> > >>>
> > >>> Using 0.10.1.0
> > >>>
> > >>> This is my topology:
> > >>>
> > >>> Properties config = new Properties();
> > >>> 

Re: Topic discovery when supporting multiple kafka clusters

2016-12-06 Thread Aseem Bansal
@Yifan Ying Why not add more brokers in your cluster? That will not
increase the partitions. Does increasing the number of brokers cause you
any problem? How many brokers do you have in the cluster already?

On Wed, Dec 7, 2016 at 12:35 AM, Yifan Ying  wrote:

> Thanks Asaf, Aseem.
>
> Assigning topics to only a specific set of brokers will probably cause
> uneven traffic and it won't prevent topics to be re-assigned to other
> brokers when brokers fail.
>
> Like I said, the original cluster is close to out of resources. I remember
> there's some limit on # of partitions that each Kafka cluster can have.
> Netflix recommends to keep it below 10k to improve availability and reduce
> latency,
> http://techblog.netflix.com/2016/04/kafka-inside-keystone-pipeline.html.
> Jun Rao also wrote a blog(
> https://www.confluent.io/blog/how-to-choose-the-number-of-
> topicspartitions-in-a-kafka-cluster/)
> about how too many partitions could hurt availability and latency. That's
> why we want to create another cluster instead of expanding the current one.
> I know a lot of companies are maintaining multiple clusters, and I'm
> curious how people are doing topic discovery.
>
>
>
> On Tue, Dec 6, 2016 at 4:04 AM, Aseem Bansal  wrote:
>
> > What configurations allow you to assign topics to specific brokers?
> >
> > I can see https://kafka.apache.org/documentation#basic_ops_automigrate.
> > This should allow you to move things around but does that keep anything
> > from being re-assigned to the old ones?
> >
> > On Tue, Dec 6, 2016 at 5:25 PM, Asaf Mesika 
> wrote:
> >
> > > Why not re-use same cluster? You can assign topics to be live only
> > within a
> > > specific set of brokers. Thus you have one "bus" for messages,
> > simplifying
> > > your applications code and configurations
> > >
> > > On Mon, Dec 5, 2016 at 9:43 PM Yifan Ying  wrote:
> > >
> > > > Hi,
> > > >
> > > > Initially, we have only one Kafka cluster shared across all teams.
> But
> > > now
> > > > this cluster is very close to out of resources (disk space, # of
> > > > partitions, etc.). So we are considering adding another Kafka
> cluster.
> > > But
> > > > what's the best practice of topic discovery, so that applications
> know
> > > > which cluster their topics live? We have been using Zookeeper for
> > service
> > > > discovery, maybe it's also good for this purpose?
> > > >
> > > > Thanks
> > > >
> > > > --
> > > > Yifan
> > > >
> > >
> >
>
>
>
> --
> Yifan
>


Re: kafka 0.10.1 / log-cleaner stopped / timeindex issues

2016-12-06 Thread Ismael Juma
Hi, can you please file a JIRA ticket so this doesn't get lost?

Thanks,
Ismael

On 6 Dec 2016 5:06 pm, "Schumann,Robert"  wrote:

> Hi all,
>
> we are facing an issue with latest kafka 0.10.1 and the log cleaner thread
> with regards to the timeindex files. From the log of the log-cleaner we see
> after startup that it tries to cleanup a topic called xdc_listing-status-v2
> [1]. The topic is setup with log compaction [2] and the kafka cluster
> configuration has log.cleaner enabled [3]. Looking at the log and the newly
> created file [4], the cleaner seems to refer to tombstones prior to
> epoch_time=0 - maybe because he finds messages, which don’t have a
> timestamp at all (?). All producers and consumers are using 0.10.1 and the
> topics have been created completely new, so I’m not sure, where this issue
> would come from. The original timeindex file [5] seems to show only valid
> timestamps for the mentioned offsets. I would also like to mention that the
> issue happened in two independent datacenters at the same time, so I would
> rather expect an application/producer issue instead of random disk
> failures. We didn’t have the problem with 0.10.0 for around half a year, it
> appeared shortly after the upgrade to 0.10.1.
>
> The puzzling message from the cleaner “cleaning prior to Fri Dec 02
> 16:35:50 CET 2016, discarding tombstones prior to Thu Jan 01 01:00:00 CET
> 1970” also confuses me a bit. Does that mean, it does not find any log
> segments which can be cleaned up or the last timestamp of the last log
> segment is somehow broken/missing?
>
> I’m also a bit wondering, why the log cleaner thread stops completely
> after an error with one topic. I would at least expect that it keeps on
> cleaning up other topics, but apparently it doesn’t do that, e.g. it’s not
> even cleaning the __consumer_offsets anymore.
>
> Does anybody have the same issues or can explain, what’s going on? Thanks
> for any help or suggestions.
>
> Cheers
> Robert
>
> [1]
> [2016-12-06 12:49:17,885] INFO Starting the log cleaner
> (kafka.log.LogCleaner)
> [2016-12-06 12:49:17,895] INFO [kafka-log-cleaner-thread-0], Starting
> (kafka.log.LogCleaner)
> [2016-12-06 12:49:17,947] INFO Cleaner 0: Beginning cleaning of log
> xdc_listing-status-v2-1. (kafka.log.LogCleaner)
> [2016-12-06 12:49:17,948] INFO Cleaner 0: Building offset map for
> xdc_listing-status-v2-1... (kafka.log.LogCleaner)
> [2016-12-06 12:49:17,989] INFO Cleaner 0: Building offset map for log
> xdc_listing-status-v2-1 for 1 segments in offset range [0, 194991).
> (kafka.log.LogCleaner)
> [2016-12-06 12:49:24,572] INFO Cleaner 0: Offset map for log
> xdc_listing-status-v2-1 complete. (kafka.log.LogCleaner)
> [2016-12-06 12:49:24,577] INFO Cleaner 0: Cleaning log
> xdc_listing-status-v2-1 (cleaning prior to Fri Dec 02 16:35:50 CET 2016,
> discarding tombstones prior to Thu Jan 01 01:00:00 CET 1970)...
> (kafka.log.LogCleaner)
> [2016-12-06 12:49:24,580] INFO Cleaner 0: Cleaning segment 0 in log
> xdc_listing-status-v2-1 (largest timestamp Fri Dec 02 16:35:50 CET 2016)
> into 0, retaining deletes. (kafka.log.LogCleaner)
> [2016-12-06 12:49:24,968] ERROR [kafka-log-cleaner-thread-0], Error due
> to  (kafka.log.LogCleaner)
> kafka.common.InvalidOffsetException: Attempt to append an offset (-1) to
> slot 9 no larger than the last offset appended (11832) to
> /var/lib/kafka/xdc_listing-status-v2-1/.
> timeindex.cleaned.
> at kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply$mcV$sp(
> TimeIndex.scala:117)
> at kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply(TimeIndex.
> scala:107)
> at kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply(TimeIndex.
> scala:107)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:107)
> at kafka.log.LogSegment.append(LogSegment.scala:106)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:518)
> at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(
> LogCleaner.scala:404)
> at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(
> LogCleaner.scala:400)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:400)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:364)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.clean(LogCleaner.scala:363)
> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(
> LogCleaner.scala:239)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:218)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-12-06 12:49:24,971] INFO [kafka-log-cleaner-thread-0], Stopped
> (kafka.log.LogCleaner)
>
> [2]
> $ kafka-configs-topics xdc_listing-status-v2 --describe
> Configs for topic 

Creating a connector with Kafka Connect Distributed returning 500 error

2016-12-06 Thread Phillip Mann
I am working on migrating from Camus to Kafka Connect. I am working on the 
implementation of Kafka Connect and specifically focused on distributed mode. I 
am able to start a worker successfully on my local machine which I assume 
communicates with my Kafka cluster. I am further able to run two GET commands 
such as / and /connector-plugins which return the correct JSON. However, when I 
try to POST a command to create a connector, I receive a 500 error and a time 
out. Specifically, I use this command to POST for testing:

curl -X POST -H "Content-Type: application/json" --data '{"name": 
"local-file-sink", "config": {"connector.class":"FileStreamSinkConnector", 
"tasks.max":"1", "file":"test.sink.txt", "topics":"myTopic" }}' 
localhost:8083/connectors

and eventually I get this response:

{"error_code": 500, "message": "Request timed out"}

I am lost as to what is going on. The logs from my Kafka Connect distributed 
worker show this:

[2016-12-05 14:34:32,436] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:22:34:32 +] 
"GET /connector-plugins HTTP/1.1" 200 315  2 
(org.apache.kafka.connect.runtime.rest.RestServer:60)
[2016-12-05 15:05:25,422] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:05:25 +] 
"GET /connector-plugins HTTP/1.1" 200 315  3 
(org.apache.kafka.connect.runtime.rest.RestServer:60)
[2016-12-05 15:05:28,389] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:05:28 +] 
"GET /connector-plugins HTTP/1.1" 200 315  2 
(org.apache.kafka.connect.runtime.rest.RestServer:60)
[2016-12-05 15:07:38,644] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:06:08 +] 
"GET /connectors HTTP/1.1" 500 48  90003 
(org.apache.kafka.connect.runtime.rest.RestServer:60)
[2016-12-05 15:07:44,450] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:07:44 +] 
"GET /connector-plugins HTTP/1.1" 200 315  1 
(org.apache.kafka.connect.runtime.rest.RestServer:60)
[2016-12-05 15:13:06,703] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:11:36 +] 
"POST /connectors HTTP/1.1" 500 48  90003 
(org.apache.kafka.connect.runtime.rest.RestServer:60)
[2016-12-05 15:15:38,506] INFO 0:0:0:0:0:0:0:1 - - [05/Dec/2016:23:14:08 +] 
"POST /connectors HTTP/1.1" 500 48  90005 
(org.apache.kafka.connect.runtime.rest.RestServer:60)

where you can see the error codes and the commands.

I guess my main questions and issues are:

  1.  How can I better debug Kafka Connect so I can try and fix this?
  2.  Is there anything that I'm doing that is glaringly wrong?
  3.  Is there any step-by-step documentation or blog posts on getting a Kafka 
Connect distributed worker and connector to run? I have not really seen 
anything or even best practices kinds of documentation? Maybe I am just way too 
early of an adopter.

I look forward to hearing back from the community and thank you for your help!



Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Jon,

Are you using 0.10.1 or 0.10.0.1?

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 7:55:30 PM, Damian Guy (damian@gmail.com) wrote:

Hi Jon,

At a glance the code looks ok, i.e, i believe the aggregate() should have
picked up the default Serde set in your StreamsConfig. However, you could
try adding the Serdes to the groupBy(..)

i.e.,
rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)

Thanks,
Damian

On Tue, 6 Dec 2016 at 18:42 Jon Yeargers  wrote:

> It's just a bunch of public 'int' and 'String' values. There's an empty
> constructor and a copy constructor.
>
> For functions I override 'equals'  and the requirements for 'serde' (close,
> configure, serializer and deserializer).
>
>  @Override
> public Serializer serializer() {
> JsonSerializer jsonSerializer = new JsonSerializer<>();
> return jsonSerializer;
> }
>
> @Override
> public Deserializer deserializer() {
> JsonDeserializer jsonDeserializer = new
> JsonDeserializer<>();
> return jsonDeserializer;
> }
>
>
>
>
> Which relates to:
>
> public class JsonSerializer implements Serializer {
>
> private Gson gson = new Gson();
>
> @Override
> public void configure(Map map, boolean b) {
>
> }
>
> @Override
> public byte[] serialize(String topic, T t) {
> return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> }
>
> @Override
> public void close() {
>
> }
> }
>
>
>
> public class JsonDeserializer implements Deserializer {
>
> private Gson gson = new Gson();
> private Class deserializedClass;
>
> public JsonDeserializer(Class deserializedClass) {
> this.deserializedClass = deserializedClass;
> }
>
> public JsonDeserializer() {
> }
>
> @Override
> @SuppressWarnings("unchecked")
> public void configure(Map map, boolean b) {
> if(deserializedClass == null) {
> deserializedClass = (Class) map.get("serializedClass");
> }
> }
>
> @Override
> public T deserialize(String s, byte[] bytes) {
> if(bytes == null){
> return null;
> }
>
> return gson.fromJson(new String(bytes),deserializedClass);
>
> }
>
> @Override
> public void close() {
>
> }
> }
>
> On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski 
> wrote:
>
> > Do you mind sharing the code of AggKey class?
> >
> > –
> > Best regards,
> > Radek Gruchalski
> > ra...@gruchalski.com
> >
> >
> > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> > wrote:
> >
> > The 2nd.
> >
> > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski 
> > wrote:
> >
> >> Is the error happening at this stage?
> >>
> >> KStream rtRekey = rtDetailLines.map((key,
> value)
> >>  -> new KeyValue<>(new AggKey(value), value));
> >>
> >> or here:
> >>
> >> KTable ktRtDetail =
> >> rtRekey.groupByKey().aggregate(
> >> BqRtDetailLogLine_aggregate::new,
> >> new PRTAggregate(),
> >> TimeWindows.of(60 * 60 * 1000L),
> >> collectorSerde, "prt_minute_agg_stream");
> >>
> >> –
> >>
> >> Best regards,
> >> Radek Gruchalski
> >> ra...@gruchalski.com
> >>
> >>
> >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> >> wrote:
> >>
> >> If I comment out the aggregation step and just .print the .map step I
> >> don't hit the error. It's coming from aggregating the non-String key.
> >>
> >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
> >> wrote:
> >>
> >>> Jon,
> >>>
> >>> Looking at your code:
> >>>
> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> and later:
> >>>
> >>> KStream rtDetailLines =
> >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> >>>
> >>> Is RtDetailLogLine inheriting from String? It is not, as the error
> >>> suggests.
> >>> You may have to write your own Serializer / Deserializer for
> >>> RtDetailLogLine.
> >>>
> >>> –
> >>> Best regards,
> >>> Radek Gruchalski
> >>> ra...@gruchalski.com
> >>>
> >>>
> >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
> >>> jon.yearg...@cedexis.com) wrote:
> >>>
> >>> Using 0.10.1.0
> >>>
> >>> This is my topology:
> >>>
> >>> Properties config = new Properties();
> >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>> AggKey.class.getName());
> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> JsonSerializer sumRecordsSerializer = new
> >>> JsonSerializer<>();
> >>> JsonDeserializer 

Re: Consumer poll - no results

2016-12-06 Thread Mohit Anchlia
I see this message in the logs:

[2016-12-06 13:54:16,586] INFO [GroupCoordinator 0]: Preparing to
restabilize group DemoConsumer with old generation 3
(kafka.coordinator.GroupCoordinator)



On Tue, Dec 6, 2016 at 10:53 AM, Mohit Anchlia 
wrote:

> I have a consumer polling a topic of Kafka 0.10. Even though the topic has
> messages the consumer poll is not fetching the message. The thread dump
> reveals:
>
> "main" #1 prio=5 os_prio=0 tid=0x7f3ba4008800 nid=0x798 runnable
> [0x7f3baa6c3000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
> java:93)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x0006c6d1f8b8> (a sun.nio.ch.Util$3)
> - locked <0x0006c6d1f8a8> (a java.util.Collections$
> UnmodifiableSet)
> - locked <0x0006c6d1f0b8> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(
> Selector.java:470)
> at org.apache.kafka.common.network.Selector.poll(
> Selector.java:286)
> at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:260)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
> - locked <0x0006c6d1eff8> (a org.apache.kafka.clients.
> consumer.internals.ConsumerNetworkClient)
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1031)
>


Re: Topic discovery when supporting multiple kafka clusters

2016-12-06 Thread Yifan Ying
Thanks Asaf, Aseem.

Assigning topics to only a specific set of brokers will probably cause
uneven traffic and it won't prevent topics to be re-assigned to other
brokers when brokers fail.

Like I said, the original cluster is close to out of resources. I remember
there's some limit on # of partitions that each Kafka cluster can have.
Netflix recommends to keep it below 10k to improve availability and reduce
latency,
http://techblog.netflix.com/2016/04/kafka-inside-keystone-pipeline.html.
Jun Rao also wrote a blog(
https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/)
about how too many partitions could hurt availability and latency. That's
why we want to create another cluster instead of expanding the current one.
I know a lot of companies are maintaining multiple clusters, and I'm
curious how people are doing topic discovery.



On Tue, Dec 6, 2016 at 4:04 AM, Aseem Bansal  wrote:

> What configurations allow you to assign topics to specific brokers?
>
> I can see https://kafka.apache.org/documentation#basic_ops_automigrate.
> This should allow you to move things around but does that keep anything
> from being re-assigned to the old ones?
>
> On Tue, Dec 6, 2016 at 5:25 PM, Asaf Mesika  wrote:
>
> > Why not re-use same cluster? You can assign topics to be live only
> within a
> > specific set of brokers. Thus you have one "bus" for messages,
> simplifying
> > your applications code and configurations
> >
> > On Mon, Dec 5, 2016 at 9:43 PM Yifan Ying  wrote:
> >
> > > Hi,
> > >
> > > Initially, we have only one Kafka cluster shared across all teams. But
> > now
> > > this cluster is very close to out of resources (disk space, # of
> > > partitions, etc.). So we are considering adding another Kafka cluster.
> > But
> > > what's the best practice of topic discovery, so that applications know
> > > which cluster their topics live? We have been using Zookeeper for
> service
> > > discovery, maybe it's also good for this purpose?
> > >
> > > Thanks
> > >
> > > --
> > > Yifan
> > >
> >
>



-- 
Yifan


Re: Implementing custom key serializer

2016-12-06 Thread Damian Guy
Hi Jon,

At a glance the code looks ok, i.e, i believe the aggregate() should have
picked up the default Serde set in your StreamsConfig. However, you could
try adding the Serdes to the groupBy(..)

i.e.,
rtRekey.groupByKey(new AggKeySerDe(), yourValueSerde).aggregate(...)

Thanks,
Damian

On Tue, 6 Dec 2016 at 18:42 Jon Yeargers  wrote:

> It's just a bunch of public 'int' and 'String' values. There's an empty
> constructor and a copy constructor.
>
> For functions I override 'equals'  and the requirements for 'serde' (close,
> configure, serializer and deserializer).
>
>  @Override
> public Serializer serializer() {
> JsonSerializer jsonSerializer = new JsonSerializer<>();
> return jsonSerializer;
> }
>
> @Override
> public Deserializer deserializer() {
> JsonDeserializer jsonDeserializer = new
> JsonDeserializer<>();
> return jsonDeserializer;
> }
>
>
>
>
> Which relates to:
>
> public class JsonSerializer implements Serializer {
>
> private Gson gson = new Gson();
>
> @Override
> public void configure(Map map, boolean b) {
>
> }
>
> @Override
> public byte[] serialize(String topic, T t) {
> return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
> }
>
> @Override
> public void close() {
>
> }
> }
>
>
>
> public class JsonDeserializer implements Deserializer {
>
> private Gson gson = new Gson();
> private Class deserializedClass;
>
> public JsonDeserializer(Class deserializedClass) {
> this.deserializedClass = deserializedClass;
> }
>
> public JsonDeserializer() {
> }
>
> @Override
> @SuppressWarnings("unchecked")
> public void configure(Map map, boolean b) {
> if(deserializedClass == null) {
> deserializedClass = (Class) map.get("serializedClass");
> }
> }
>
> @Override
> public T deserialize(String s, byte[] bytes) {
> if(bytes == null){
> return null;
> }
>
> return gson.fromJson(new String(bytes),deserializedClass);
>
> }
>
> @Override
> public void close() {
>
> }
> }
>
> On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski 
> wrote:
>
> > Do you mind sharing the code of AggKey class?
> >
> > –
> > Best regards,
> > Radek Gruchalski
> > ra...@gruchalski.com
> >
> >
> > On December 6, 2016 at 7:26:51 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> > wrote:
> >
> > The 2nd.
> >
> > On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski 
> > wrote:
> >
> >> Is the error happening at this stage?
> >>
> >> KStream rtRekey = rtDetailLines.map((key,
> value)
> >>  -> new KeyValue<>(new AggKey(value), value));
> >>
> >> or here:
> >>
> >> KTable ktRtDetail =
> >> rtRekey.groupByKey().aggregate(
> >> BqRtDetailLogLine_aggregate::new,
> >> new PRTAggregate(),
> >> TimeWindows.of(60 * 60 * 1000L),
> >> collectorSerde, "prt_minute_agg_stream");
> >>
> >> –
> >>
> >> Best regards,
> >> Radek Gruchalski
> >> ra...@gruchalski.com
> >>
> >>
> >> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (
> jon.yearg...@cedexis.com)
> >> wrote:
> >>
> >> If I comment out the aggregation step and just .print the .map step I
> >> don't hit the error. It's coming from aggregating the non-String key.
> >>
> >> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
> >> wrote:
> >>
> >>> Jon,
> >>>
> >>> Looking at your code:
> >>>
> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> and later:
> >>>
> >>> KStream rtDetailLines =
> >>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
> >>>
> >>> Is RtDetailLogLine inheriting from String? It is not, as the error
> >>> suggests.
> >>> You may have to write your own Serializer / Deserializer for
> >>> RtDetailLogLine.
> >>>
> >>> –
> >>> Best regards,
> >>> Radek Gruchalski
> >>> ra...@gruchalski.com
> >>>
> >>>
> >>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
> >>> jon.yearg...@cedexis.com) wrote:
> >>>
> >>> Using 0.10.1.0
> >>>
> >>> This is my topology:
> >>>
> >>> Properties config = new Properties();
> >>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> >>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> >>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> >>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>> AggKey.class.getName());
> >>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>
> >>> JsonSerializer sumRecordsSerializer = new
> >>> JsonSerializer<>();
> >>> JsonDeserializer sumRecordsDeserializer =
> >>> new
> >>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
> >>> Serde collectorSerde =
> >>> 

Consumer poll - no results

2016-12-06 Thread Mohit Anchlia
I have a consumer polling a topic of Kafka 0.10. Even though the topic has
messages the consumer poll is not fetching the message. The thread dump
reveals:

"main" #1 prio=5 os_prio=0 tid=0x7f3ba4008800 nid=0x798 runnable
[0x7f3baa6c3000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0006c6d1f8b8> (a sun.nio.ch.Util$3)
- locked <0x0006c6d1f8a8> (a
java.util.Collections$UnmodifiableSet)
- locked <0x0006c6d1f0b8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at
org.apache.kafka.common.network.Selector.select(Selector.java:470)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x0006c6d1eff8> (a
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1031)


Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
It's just a bunch of public 'int' and 'String' values. There's an empty
constructor and a copy constructor.

For functions I override 'equals'  and the requirements for 'serde' (close,
configure, serializer and deserializer).

 @Override
public Serializer serializer() {
JsonSerializer jsonSerializer = new JsonSerializer<>();
return jsonSerializer;
}

@Override
public Deserializer deserializer() {
JsonDeserializer jsonDeserializer = new
JsonDeserializer<>();
return jsonDeserializer;
}




Which relates to:

public class JsonSerializer implements Serializer {

private Gson gson = new Gson();

@Override
public void configure(Map map, boolean b) {

}

@Override
public byte[] serialize(String topic, T t) {
return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
}

@Override
public void close() {

}
}



public class JsonDeserializer implements Deserializer {

private Gson gson = new Gson();
private Class deserializedClass;

public JsonDeserializer(Class deserializedClass) {
this.deserializedClass = deserializedClass;
}

public JsonDeserializer() {
}

@Override
@SuppressWarnings("unchecked")
public void configure(Map map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class) map.get("serializedClass");
}
}

@Override
public T deserialize(String s, byte[] bytes) {
if(bytes == null){
return null;
}

return gson.fromJson(new String(bytes),deserializedClass);

}

@Override
public void close() {

}
}

On Tue, Dec 6, 2016 at 10:37 AM, Radek Gruchalski 
wrote:

> Do you mind sharing the code of AggKey class?
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> The 2nd.
>
> On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski 
> wrote:
>
>> Is the error happening at this stage?
>>
>> KStream rtRekey = rtDetailLines.map((key, value)
>>  -> new KeyValue<>(new AggKey(value), value));
>>
>> or here:
>>
>> KTable ktRtDetail =
>> rtRekey.groupByKey().aggregate(
>> BqRtDetailLogLine_aggregate::new,
>> new PRTAggregate(),
>> TimeWindows.of(60 * 60 * 1000L),
>> collectorSerde, "prt_minute_agg_stream");
>>
>> –
>>
>> Best regards,
>> Radek Gruchalski
>> ra...@gruchalski.com
>>
>>
>> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
>> wrote:
>>
>> If I comment out the aggregation step and just .print the .map step I
>> don't hit the error. It's coming from aggregating the non-String key.
>>
>> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
>> wrote:
>>
>>> Jon,
>>>
>>> Looking at your code:
>>>
>>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>>
>>> and later:
>>>
>>> KStream rtDetailLines =
>>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>>
>>> Is RtDetailLogLine inheriting from String? It is not, as the error
>>> suggests.
>>> You may have to write your own Serializer / Deserializer for
>>> RtDetailLogLine.
>>>
>>> –
>>> Best regards,
>>> Radek Gruchalski
>>> ra...@gruchalski.com
>>>
>>>
>>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (
>>> jon.yearg...@cedexis.com) wrote:
>>>
>>> Using 0.10.1.0
>>>
>>> This is my topology:
>>>
>>> Properties config = new Properties();
>>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
>>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
>>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> AggKey.class.getName());
>>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>>
>>> JsonSerializer sumRecordsSerializer = new
>>> JsonSerializer<>();
>>> JsonDeserializer sumRecordsDeserializer =
>>> new
>>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
>>> Serde collectorSerde =
>>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>>>
>>> StringSerializer stringSerializer = new StringSerializer();
>>> StringDeserializer stringDeserializer = new StringDeserializer();
>>> Serde stringSerde =
>>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>>>
>>> JsonDeserializer prtRecordDeserializer = new
>>> JsonDeserializer<>(RtDetailLogLine.class);
>>> JsonSerializer prtRecordJsonSerializer = new
>>> JsonSerializer<>();
>>> Serde prtRecordSerde =
>>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>>>
>>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>>>
>>> KStream rtDetailLines =
>>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>>
>>> // change the 

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Do you mind sharing the code of AggKey class?

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 7:26:51 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

The 2nd.

On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski 
wrote:

> Is the error happening at this stage?
>
> KStream rtRekey = rtDetailLines.map((key, value) ->
> new KeyValue<>(new AggKey(value), value));
>
> or here:
>
> KTable ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> –
>
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> If I comment out the aggregation step and just .print the .map step I
> don't hit the error. It's coming from aggregating the non-String key.
>
> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
> wrote:
>
>> Jon,
>>
>> Looking at your code:
>>
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> and later:
>>
>> KStream rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> Is RtDetailLogLine inheriting from String? It is not, as the error
>> suggests.
>> You may have to write your own Serializer / Deserializer for
>> RtDetailLogLine.
>>
>> –
>> Best regards,
>> Radek Gruchalski
>> ra...@gruchalski.com
>>
>>
>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
>> wrote:
>>
>> Using 0.10.1.0
>>
>> This is my topology:
>>
>> Properties config = new Properties();
>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> JsonSerializer sumRecordsSerializer = new
>> JsonSerializer<>();
>> JsonDeserializer sumRecordsDeserializer =
>> new
>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
>> Serde collectorSerde =
>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>>
>> StringSerializer stringSerializer = new StringSerializer();
>> StringDeserializer stringDeserializer = new StringDeserializer();
>> Serde stringSerde =
>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>>
>> JsonDeserializer prtRecordDeserializer = new
>> JsonDeserializer<>(RtDetailLogLine.class);
>> JsonSerializer prtRecordJsonSerializer = new
>> JsonSerializer<>();
>> Serde prtRecordSerde =
>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>>
>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>>
>> KStream rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> // change the keying
>> KStream rtRekey = rtDetailLines.map((key, value)
>> -> new KeyValue<>(new AggKey(value), value));
>>
>> KTable ktRtDetail =
>> rtRekey.groupByKey().aggregate(
>> BqRtDetailLogLine_aggregate::new,
>> new PRTAggregate(),
>> TimeWindows.of(60 * 60 * 1000L),
>> collectorSerde, "prt_minute_agg_stream");
>>
>> ktRtDetail.toStream().print();
>>
>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>>
>> kafkaStreams.start();
>>
>>
>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:
>>
>> > Hi Jon,
>> >
>> > A couple of things: Which version are you using?
>> > Can you share the code you are using to the build the topology?
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers 
>> wrote:
>> >
>> > > Im using .map to convert my (k/v) string/Object to Object/Object but
>> > when I
>> > > chain this to an aggregation step Im getting this exception:
>> > >
>> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
>> > > java.lang.String
>> > > at
>> > >
>> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
>> > StringSerializer.java:24)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
>> > RecordCollector.java:73)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.SinkNode.
>> > process(SinkNode.java:72)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
>> > KStreamFilterProcessor.process(KStreamFilter.java:44)
>> > > at
>> > >
>> > > 

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
The 2nd.

On Tue, Dec 6, 2016 at 10:01 AM, Radek Gruchalski 
wrote:

> Is the error happening at this stage?
>
> KStream rtRekey = rtDetailLines.map((key, value) ->
> new KeyValue<>(new AggKey(value), value));
>
> or here:
>
> KTable ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> –
>
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> If I comment out the aggregation step and just .print the .map step I
> don't hit the error. It's coming from aggregating the non-String key.
>
> On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
> wrote:
>
>> Jon,
>>
>> Looking at your code:
>>
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> and later:
>>
>> KStream rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> Is RtDetailLogLine inheriting from String? It is not, as the error
>> suggests.
>> You may have to write your own Serializer / Deserializer for
>> RtDetailLogLine.
>>
>> –
>> Best regards,
>> Radek Gruchalski
>> ra...@gruchalski.com
>>
>>
>> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
>> wrote:
>>
>> Using 0.10.1.0
>>
>> This is my topology:
>>
>> Properties config = new Properties();
>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
>> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
>> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>>
>> JsonSerializer sumRecordsSerializer = new
>> JsonSerializer<>();
>> JsonDeserializer sumRecordsDeserializer =
>> new
>> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
>> Serde collectorSerde =
>> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>>
>> StringSerializer stringSerializer = new StringSerializer();
>> StringDeserializer stringDeserializer = new StringDeserializer();
>> Serde stringSerde =
>> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>>
>> JsonDeserializer prtRecordDeserializer = new
>> JsonDeserializer<>(RtDetailLogLine.class);
>> JsonSerializer prtRecordJsonSerializer = new
>> JsonSerializer<>();
>> Serde prtRecordSerde =
>> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>>
>> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>>
>> KStream rtDetailLines =
>> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>>
>> // change the keying
>> KStream rtRekey = rtDetailLines.map((key, value)
>> -> new KeyValue<>(new AggKey(value), value));
>>
>> KTable ktRtDetail =
>> rtRekey.groupByKey().aggregate(
>> BqRtDetailLogLine_aggregate::new,
>> new PRTAggregate(),
>> TimeWindows.of(60 * 60 * 1000L),
>> collectorSerde, "prt_minute_agg_stream");
>>
>> ktRtDetail.toStream().print();
>>
>> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>>
>> kafkaStreams.start();
>>
>>
>> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:
>>
>> > Hi Jon,
>> >
>> > A couple of things: Which version are you using?
>> > Can you share the code you are using to the build the topology?
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers 
>> wrote:
>> >
>> > > Im using .map to convert my (k/v) string/Object to Object/Object but
>> > when I
>> > > chain this to an aggregation step Im getting this exception:
>> > >
>> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
>> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
>> > > java.lang.String
>> > > at
>> > >
>> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
>> > StringSerializer.java:24)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
>> > RecordCollector.java:73)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.SinkNode.
>> > process(SinkNode.java:72)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
>> > KStreamFilterProcessor.process(KStreamFilter.java:44)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > 

Re: Best approach to frequently restarting consumer process

2016-12-06 Thread Gwen Shapira
Can you clarify what you mean by "restart"? If you call
consumer.close() and consumer.subscribe() you will definitely trigger
a rebalance.

It doesn't matter if its "same consumer knocking", we already
rebalance when you call consumer.close().

Since we want both consumer.close() and consumer.subscribe() to cause
rebalance immediately (and not wait for heartbeat), I don't think
we'll be changing their behavior.

Depending on why consumers need to restart, I'm wondering if you can
restart other threads in your application but keep the consumer up and
running to avoid the rebalances.

On Tue, Dec 6, 2016 at 7:18 AM, Harald Kirsch  wrote:
> We have consumer processes which need to restart frequently, say, every 5
> minutes. We have 10 of them so we are facing two restarts every minute on
> average.
>
> 1) It seems that nearly every time a consumer restarts  the group is
> rebalanced. Even if the restart takes less than the heartbeat interval.
>
> 2) My guess is that the group manager just cannot know that the same
> consumer is knocking at the door again.
>
> Are my suspicions (1) and (2) correct? Is there a chance to fix this such
> that a restart within the heartbeat interval does not lead to a re-balance?
> Would a well defined client.id help?
>
> Regards
> Harald
>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


How to disable auto commit for SimpleConsumer kafka 0.8.1

2016-12-06 Thread Anjani Gupta
I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1
version.For High level consumer, config options can be set and passed via
consumerConfig as follows
kafka.consumer.Consumer.createJavaConsumerConnector(this.consumerConfig);

How can I achieve the same for SimpleConsumer? I mainly want to disable
auto commit. I tried setting auto commit to false in consumer.properties
and restarted kafka server, zookeeper and producer. But, that does not
work. I think I need to apply this setting through code, not in
consumer.properties. Can anyone help here?

Here is how my code looks like

List topicAndPartitionList = new ArrayList<>();
topicAndPartitionList.add(topicAndPartition);
OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
 OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0,
correlationId,clientName));


Map offsets =
offsetFetchResponse.offsets();
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
   .addFetch(a_topic, a_partition,
offsets.get(topicAndPartition).offset(), 10)   .build();

long readOffset = offsets.get(topicAndPartition).offset();
FetchResponse fetchResponse = consumer.fetch(req);

//Consume messages from fetchResponse


Map requestInfo = new
HashMap<>  ();
requestInfo.put(topicAndPartition, new
OffsetMetadataAndError(readOffset, "metadata", (short)0));
OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new
OffsetCommitRequest("testGroup", requestInfo, (short)0,
correlationId, clientName));


If above code crashes before committing offset, I still get latest offset
as result of offsets.get(topicAndPartition).offset() in next run which
makes me to think that auto commit of offset happens as code is executed.


Re: Some general questions...

2016-12-06 Thread Gwen Shapira
Yeah, that's a good point - Kafka on Windows has few quirks because
most core Kafka developers are not windows experts and the big
deployments are almost all on Linux.

We discovered that most our .NET users actually run Kafka on Linux.
Turns out that installing few VMs with Linux and running Kafka is
fairly easy, but a programming language is not something you can
easily change.
Fortunately, thanks to librdkafka we can implement a good .NET client
without worrying about windows internals :)

On Tue, Dec 6, 2016 at 7:07 AM, Harald Kirsch  wrote:
> This sounds like you might want to run the Kafka broker on Windows. Have a
> look at https://issues.apache.org/jira/browse/KAFKA-1194 for possible issues
> with regard to log cleaning.
>
> Regards,
> Harald.
>
>
>
> On 06.12.2016 00:50, Doyle, Keith wrote:
>>
>>
>>
>> We’re beginning to make use of Kafka, and it is encouraging.  But there
>> are a couple of questions I’ve had a hard time finding answers for.
>>
>>
>>
>> We’re using the rdkafka-dotnet client on the consumer side and it’s
>> straightforward as far as it goes.  However, documentation seems to be
>> scant—the Wiki points to a FAQ which has, like, two questions neither of
>> which are the questions we have.   And I can’t find a mailing list,
>> forum, blog, or other community where questions can be asked.  I found
>> some indication in the Git repository that there may be some API docs,
>> but it’s not at all clear exactly where those are.
>>
>>
>>
>> So I’m posting that question here because I can’t find anywhere else
>> that might be even remotely relevant to post it—where can I find out
>> more info about rdkafka and particularly rdkafka-dotnet, and some way to
>> ask questions that aren’t answered in the documentation?
>>
>>
>>
>> And second, my current question about rdkafka-dotnet, is the example
>> consumers both seem to read an entire message into memory.   We don’t
>> want to presume any particular message size, and may not want to cache
>> the entire message in memory while processing it.   Is there an
>> interface where we can consume messages via a stream, so that we can
>> read chunks of a message and process them based on some kind of batch
>> size that will allow us better control over memory usage?
>>
>>
>>
>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


NotLeaderForPartitionException

2016-12-06 Thread Sven Ludwig
Hello,
 
in our Kafka clusters we sometimes observe a specific ERROR log-statement, and 
therefore we have doubts whether it is already running sable in our 
configuration. This occurs every now and then, like two or three times in a 
day. It is actually the predominant ERROR log-statement in our cluster. Example:
 
[2016-12-06 17:14:50,909] ERROR [ReplicaFetcherThread-0-3], Error for partition 
[,] to broker 
3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is
not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
 
We already asked Google, but we did not find sufficient answers to our 
questions, therefore I am asking on the mailing list:
 
1. What are the possible reasons for this particular error?
 
2. What are the implications of it?
 
3. What can be done to prevent it?
 
Best Regards,
Sven


Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Is the error happening at this stage?

KStream rtRekey = rtDetailLines.map((key, value) ->
new KeyValue<>(new AggKey(value), value));

or here:

KTable ktRtDetail =
rtRekey.groupByKey().aggregate(
BqRtDetailLogLine_aggregate::new,
new PRTAggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "prt_minute_agg_stream");

–

Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 6:47:38 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

If I comment out the aggregation step and just .print the .map step I don't
hit the error. It's coming from aggregating the non-String key.

On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
wrote:

> Jon,
>
> Looking at your code:
>
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> and later:
>
> KStream rtDetailLines = 
> kStreamBuilder.stream(stringSerde,
> prtRecordSerde, TOPIC);
>
> Is RtDetailLogLine inheriting from String? It is not, as the error
> suggests.
> You may have to write your own Serializer / Deserializer for
> RtDetailLogLine.
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> Using 0.10.1.0
>
> This is my topology:
>
> Properties config = new Properties();
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> JsonSerializer sumRecordsSerializer = new
> JsonSerializer<>();
> JsonDeserializer sumRecordsDeserializer = new
> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
> Serde collectorSerde =
> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>
> StringSerializer stringSerializer = new StringSerializer();
> StringDeserializer stringDeserializer = new StringDeserializer();
> Serde stringSerde =
> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>
> JsonDeserializer prtRecordDeserializer = new
> JsonDeserializer<>(RtDetailLogLine.class);
> JsonSerializer prtRecordJsonSerializer = new
> JsonSerializer<>();
> Serde prtRecordSerde =
> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>
> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>
> KStream rtDetailLines =
> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>
> // change the keying
> KStream rtRekey = rtDetailLines.map((key, value)
> -> new KeyValue<>(new AggKey(value), value));
>
> KTable ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> ktRtDetail.toStream().print();
>
> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>
> kafkaStreams.start();
>
>
> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:
>
> > Hi Jon,
> >
> > A couple of things: Which version are you using?
> > Can you share the code you are using to the build the topology?
> >
> > Thanks,
> > Damian
> >
> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers 
> wrote:
> >
> > > Im using .map to convert my (k/v) string/Object to Object/Object but
> > when I
> > > chain this to an aggregation step Im getting this exception:
> > >
> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > > java.lang.String
> > > at
> > >
> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
> > StringSerializer.java:24)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> > RecordCollector.java:73)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.SinkNode.
> > process(SinkNode.java:72)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> > KStreamFilterProcessor.process(KStreamFilter.java:44)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KStreamMap$
> > KStreamMapProcessor.process(KStreamMap.java:43)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > 

Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
If I comment out the aggregation step and just .print the .map step I don't
hit the error. It's coming from aggregating the non-String key.

On Tue, Dec 6, 2016 at 9:44 AM, Radek Gruchalski 
wrote:

> Jon,
>
> Looking at your code:
>
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> and later:
>
> KStream rtDetailLines = 
> kStreamBuilder.stream(stringSerde,
> prtRecordSerde, TOPIC);
>
> Is RtDetailLogLine inheriting from String? It is not, as the error
> suggests.
> You may have to write your own Serializer / Deserializer for
> RtDetailLogLine.
>
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
> wrote:
>
> Using 0.10.1.0
>
> This is my topology:
>
> Properties config = new Properties();
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> JsonSerializer sumRecordsSerializer = new
> JsonSerializer<>();
> JsonDeserializer sumRecordsDeserializer =
> new
> JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
> Serde collectorSerde =
> Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);
>
> StringSerializer stringSerializer = new StringSerializer();
> StringDeserializer stringDeserializer = new StringDeserializer();
> Serde stringSerde =
> Serdes.serdeFrom(stringSerializer,stringDeserializer);
>
> JsonDeserializer prtRecordDeserializer = new
> JsonDeserializer<>(RtDetailLogLine.class);
> JsonSerializer prtRecordJsonSerializer = new
> JsonSerializer<>();
> Serde prtRecordSerde =
> Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);
>
> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>
> KStream rtDetailLines =
> kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);
>
> // change the keying
> KStream rtRekey = rtDetailLines.map((key, value)
> -> new KeyValue<>(new AggKey(value), value));
>
> KTable ktRtDetail =
> rtRekey.groupByKey().aggregate(
> BqRtDetailLogLine_aggregate::new,
> new PRTAggregate(),
> TimeWindows.of(60 * 60 * 1000L),
> collectorSerde, "prt_minute_agg_stream");
>
> ktRtDetail.toStream().print();
>
> KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
>
> kafkaStreams.start();
>
>
> On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:
>
> > Hi Jon,
> >
> > A couple of things: Which version are you using?
> > Can you share the code you are using to the build the topology?
> >
> > Thanks,
> > Damian
> >
> > On Tue, 6 Dec 2016 at 14:44 Jon Yeargers 
> wrote:
> >
> > > Im using .map to convert my (k/v) string/Object to Object/Object but
> > when I
> > > chain this to an aggregation step Im getting this exception:
> > >
> > > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > > java.lang.String
> > > at
> > >
> > > org.apache.kafka.common.serialization.StringSerializer.serialize(
> > StringSerializer.java:24)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> > RecordCollector.java:73)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.SinkNode.
> > process(SinkNode.java:72)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> > KStreamFilterProcessor.process(KStreamFilter.java:44)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KStreamMap$
> > KStreamMapProcessor.process(KStreamMap.java:43)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:66)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:181)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:436)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > 

Re: Implementing custom key serializer

2016-12-06 Thread Radek Gruchalski
Jon,

Looking at your code:

config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

and later:

KStream rtDetailLines =
kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);

Is RtDetailLogLine inheriting from String? It is not, as the error suggests.
You may have to write your own Serializer / Deserializer for
RtDetailLogLine.

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On December 6, 2016 at 6:28:23 PM, Jon Yeargers (jon.yearg...@cedexis.com)
wrote:

Using 0.10.1.0

This is my topology:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

JsonSerializer sumRecordsSerializer = new
JsonSerializer<>();
JsonDeserializer sumRecordsDeserializer = new
JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
Serde collectorSerde =
Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);

StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
Serde stringSerde =
Serdes.serdeFrom(stringSerializer,stringDeserializer);

JsonDeserializer prtRecordDeserializer = new
JsonDeserializer<>(RtDetailLogLine.class);
JsonSerializer prtRecordJsonSerializer = new
JsonSerializer<>();
Serde prtRecordSerde =
Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);

KStreamBuilder kStreamBuilder = new KStreamBuilder();

KStream rtDetailLines =
kStreamBuilder.stream(stringSerde, prtRecordSerde, TOPIC);

// change the keying
KStream rtRekey = rtDetailLines.map((key, value)
-> new KeyValue<>(new AggKey(value), value));

KTable ktRtDetail =
rtRekey.groupByKey().aggregate(
BqRtDetailLogLine_aggregate::new,
new PRTAggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "prt_minute_agg_stream");

ktRtDetail.toStream().print();

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);

kafkaStreams.start();


On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:

> Hi Jon,
>
> A couple of things: Which version are you using?
> Can you share the code you are using to the build the topology?
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 14:44 Jon Yeargers 
wrote:
>
> > Im using .map to convert my (k/v) string/Object to Object/Object but
> when I
> > chain this to an aggregation step Im getting this exception:
> >
> > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > java.lang.String
> > at
> >
> > org.apache.kafka.common.serialization.StringSerializer.serialize(
> StringSerializer.java:24)
> > at
> >
> > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> RecordCollector.java:73)
> > at
> >
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:72)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> KStreamFilterProcessor.process(KStreamFilter.java:44)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamMap$
> KStreamMapProcessor.process(KStreamMap.java:43)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:436)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > My key object implements Serde and returns a JsonSerializer for the
> > 'Serializer()' override.
> >
> > In the config for the topology Im
> > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > AggKey.class.getName());
> >
> > Where else do I need to specify the (de)serializer for my key class?
> >
>


Re: Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Using 0.10.1.0

This is my topology:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

JsonSerializer sumRecordsSerializer = new
JsonSerializer<>();
JsonDeserializer sumRecordsDeserializer = new
JsonDeserializer<>(BqRtDetailLogLine_aggregate.class);
Serde collectorSerde =
Serdes.serdeFrom(sumRecordsSerializer,sumRecordsDeserializer);

StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
Serde stringSerde =
Serdes.serdeFrom(stringSerializer,stringDeserializer);

JsonDeserializer prtRecordDeserializer = new
JsonDeserializer<>(RtDetailLogLine.class);
JsonSerializer prtRecordJsonSerializer = new
JsonSerializer<>();
Serde prtRecordSerde =
Serdes.serdeFrom(prtRecordJsonSerializer,prtRecordDeserializer);

KStreamBuilder kStreamBuilder = new KStreamBuilder();

KStream rtDetailLines =
kStreamBuilder.stream(stringSerde,  prtRecordSerde, TOPIC);

// change the keying
KStream rtRekey = rtDetailLines.map((key, value)
-> new KeyValue<>(new AggKey(value), value));

KTable ktRtDetail =
rtRekey.groupByKey().aggregate(
BqRtDetailLogLine_aggregate::new,
new PRTAggregate(),
TimeWindows.of(60 * 60 * 1000L),
collectorSerde, "prt_minute_agg_stream");

ktRtDetail.toStream().print();

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);

kafkaStreams.start();


On Tue, Dec 6, 2016 at 6:48 AM, Damian Guy  wrote:

> Hi Jon,
>
> A couple of things: Which version are you using?
> Can you share the code you are using to the build the topology?
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 14:44 Jon Yeargers  wrote:
>
> > Im using .map to convert my (k/v) string/Object to Object/Object but
> when I
> > chain this to an aggregation step Im getting this exception:
> >
> > Exception in thread "StreamThread-1" java.lang.ClassCastException:
> > com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> > java.lang.String
> > at
> >
> > org.apache.kafka.common.serialization.StringSerializer.serialize(
> StringSerializer.java:24)
> > at
> >
> > org.apache.kafka.streams.processor.internals.RecordCollector.send(
> RecordCollector.java:73)
> > at
> >
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:72)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamFilter$
> KStreamFilterProcessor.process(KStreamFilter.java:44)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamMap$
> KStreamMapProcessor.process(KStreamMap.java:43)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:66)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:181)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:436)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> >
> > My key object implements Serde and returns a JsonSerializer for the
> > 'Serializer()' override.
> >
> > In the config for the topology Im
> > setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > AggKey.class.getName());
> >
> > Where else do I need to specify the (de)serializer for my key class?
> >
>


kafka 0.10.1 / log-cleaner stopped / timeindex issues

2016-12-06 Thread Schumann,Robert
Hi all,

we are facing an issue with latest kafka 0.10.1 and the log cleaner thread with 
regards to the timeindex files. From the log of the log-cleaner we see after 
startup that it tries to cleanup a topic called xdc_listing-status-v2 [1]. The 
topic is setup with log compaction [2] and the kafka cluster configuration has 
log.cleaner enabled [3]. Looking at the log and the newly created file [4], the 
cleaner seems to refer to tombstones prior to epoch_time=0 - maybe because he 
finds messages, which don’t have a timestamp at all (?). All producers and 
consumers are using 0.10.1 and the topics have been created completely new, so 
I’m not sure, where this issue would come from. The original timeindex file [5] 
seems to show only valid timestamps for the mentioned offsets. I would also 
like to mention that the issue happened in two independent datacenters at the 
same time, so I would rather expect an application/producer issue instead of 
random disk failures. We didn’t have the problem with 0.10.0 for around half a 
year, it appeared shortly after the upgrade to 0.10.1.

The puzzling message from the cleaner “cleaning prior to Fri Dec 02 16:35:50 
CET 2016, discarding tombstones prior to Thu Jan 01 01:00:00 CET 1970” also 
confuses me a bit. Does that mean, it does not find any log segments which can 
be cleaned up or the last timestamp of the last log segment is somehow 
broken/missing?

I’m also a bit wondering, why the log cleaner thread stops completely after an 
error with one topic. I would at least expect that it keeps on cleaning up 
other topics, but apparently it doesn’t do that, e.g. it’s not even cleaning 
the __consumer_offsets anymore.

Does anybody have the same issues or can explain, what’s going on? Thanks for 
any help or suggestions.

Cheers
Robert

[1]
[2016-12-06 12:49:17,885] INFO Starting the log cleaner (kafka.log.LogCleaner)
[2016-12-06 12:49:17,895] INFO [kafka-log-cleaner-thread-0], Starting  
(kafka.log.LogCleaner)
[2016-12-06 12:49:17,947] INFO Cleaner 0: Beginning cleaning of log 
xdc_listing-status-v2-1. (kafka.log.LogCleaner)
[2016-12-06 12:49:17,948] INFO Cleaner 0: Building offset map for 
xdc_listing-status-v2-1... (kafka.log.LogCleaner)
[2016-12-06 12:49:17,989] INFO Cleaner 0: Building offset map for log 
xdc_listing-status-v2-1 for 1 segments in offset range [0, 194991). 
(kafka.log.LogCleaner)
[2016-12-06 12:49:24,572] INFO Cleaner 0: Offset map for log 
xdc_listing-status-v2-1 complete. (kafka.log.LogCleaner)
[2016-12-06 12:49:24,577] INFO Cleaner 0: Cleaning log xdc_listing-status-v2-1 
(cleaning prior to Fri Dec 02 16:35:50 CET 2016, discarding tombstones prior to 
Thu Jan 01 01:00:00 CET 1970)... (kafka.log.LogCleaner)
[2016-12-06 12:49:24,580] INFO Cleaner 0: Cleaning segment 0 in log 
xdc_listing-status-v2-1 (largest timestamp Fri Dec 02 16:35:50 CET 2016) into 
0, retaining deletes. (kafka.log.LogCleaner)
[2016-12-06 12:49:24,968] ERROR [kafka-log-cleaner-thread-0], Error due to  
(kafka.log.LogCleaner)
kafka.common.InvalidOffsetException: Attempt to append an offset (-1) to slot 9 
no larger than the last offset appended (11832) to 
/var/lib/kafka/xdc_listing-status-v2-1/.timeindex.cleaned.
at 
kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply$mcV$sp(TimeIndex.scala:117)
at kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply(TimeIndex.scala:107)
at kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply(TimeIndex.scala:107)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:107)
at kafka.log.LogSegment.append(LogSegment.scala:106)
at kafka.log.Cleaner.cleanInto(LogCleaner.scala:518)
at 
kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:404)
at 
kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:400)
at scala.collection.immutable.List.foreach(List.scala:381)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:400)
at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:364)
at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
at scala.collection.immutable.List.foreach(List.scala:381)
at kafka.log.Cleaner.clean(LogCleaner.scala:363)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:239)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:218)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-12-06 12:49:24,971] INFO [kafka-log-cleaner-thread-0], Stopped  
(kafka.log.LogCleaner)

[2]
$ kafka-configs-topics xdc_listing-status-v2 --describe
Configs for topic 'xdc_listing-status-v2' are cleanup.policy=compact

[3]
broker.id=74
listeners = PLAINTEXT://0.0.0.0:9092,TRACE://localhost:9093
advertised.listeners = PLAINTEXT://broker2:9092
num.network.threads=2
num.io.threads=2
inter.broker.protocol.version=0.10.1.0
socket.send.buffer.bytes=1048576

Re: Storing Kafka Message JSON to deep storage like S3

2016-12-06 Thread Zachary Smith
You may want to look at Secor also
https://github.com/pinterest/secor

On Tue, Dec 6, 2016 at 10:53 AM, noah  wrote:

> If you are willing to setup Kafka Connect, my company has built this
> connector: https://github.com/spredfast/kafka-connect-s3
>


Re: Storing Kafka Message JSON to deep storage like S3

2016-12-06 Thread Hans Jespersen

I know several people that use the qubole Kafka Sink Connector for S3 ( see 
https://github.com/qubole/streamx ) to store 
Kafka messages in S3 for long term archiving. You can also do this with the 
Confluent HDFS Kafka Connector if you have access to a Hadoop cluster

-hans




> On Dec 6, 2016, at 3:25 AM, Aseem Bansal  wrote:
> 
> Hi
> 
> Has anyone done a storage of Kafka JSON messages to deep storage like S3.
> We are looking to back up all of our raw Kafka JSON messages for
> Exploration. S3, HDFS, MongoDB come to mind initially.
> 
> I know that it can be stored in kafka itself but storing them in Kafka
> itself does not seem like a good option as we won't be able to query it and
> the configurations of machines containing kafka will have to be increased
> as we go. Something like S3 we won't have to manage.



Re: Storing Kafka Message JSON to deep storage like S3

2016-12-06 Thread noah
If you are willing to setup Kafka Connect, my company has built this
connector: https://github.com/spredfast/kafka-connect-s3


Best approach to frequently restarting consumer process

2016-12-06 Thread Harald Kirsch
We have consumer processes which need to restart frequently, say, every 
5 minutes. We have 10 of them so we are facing two restarts every minute 
on average.


1) It seems that nearly every time a consumer restarts  the group is 
rebalanced. Even if the restart takes less than the heartbeat interval.


2) My guess is that the group manager just cannot know that the same 
consumer is knocking at the door again.


Are my suspicions (1) and (2) correct? Is there a chance to fix this 
such that a restart within the heartbeat interval does not lead to a 
re-balance? Would a well defined client.id help?


Regards
Harald



Re: Some general questions...

2016-12-06 Thread Harald Kirsch
This sounds like you might want to run the Kafka broker on Windows. Have 
a look at https://issues.apache.org/jira/browse/KAFKA-1194 for possible 
issues with regard to log cleaning.


Regards,
Harald.


On 06.12.2016 00:50, Doyle, Keith wrote:



We’re beginning to make use of Kafka, and it is encouraging.  But there
are a couple of questions I’ve had a hard time finding answers for.



We’re using the rdkafka-dotnet client on the consumer side and it’s
straightforward as far as it goes.  However, documentation seems to be
scant—the Wiki points to a FAQ which has, like, two questions neither of
which are the questions we have.   And I can’t find a mailing list,
forum, blog, or other community where questions can be asked.  I found
some indication in the Git repository that there may be some API docs,
but it’s not at all clear exactly where those are.



So I’m posting that question here because I can’t find anywhere else
that might be even remotely relevant to post it—where can I find out
more info about rdkafka and particularly rdkafka-dotnet, and some way to
ask questions that aren’t answered in the documentation?



And second, my current question about rdkafka-dotnet, is the example
consumers both seem to read an entire message into memory.   We don’t
want to presume any particular message size, and may not want to cache
the entire message in memory while processing it.   Is there an
interface where we can consume messages via a stream, so that we can
read chunks of a message and process them based on some kind of batch
size that will allow us better control over memory usage?





Re: Implementing custom key serializer

2016-12-06 Thread Damian Guy
Hi Jon,

A couple of things: Which version are you using?
Can you share the code you are using to the build the topology?

Thanks,
Damian

On Tue, 6 Dec 2016 at 14:44 Jon Yeargers  wrote:

> Im using .map to convert my (k/v) string/Object to Object/Object but when I
> chain this to an aggregation step Im getting this exception:
>
> Exception in thread "StreamThread-1" java.lang.ClassCastException:
> com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
> java.lang.String
> at
>
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:73)
> at
>
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
>
> My key object implements Serde and returns a JsonSerializer for the
> 'Serializer()' override.
>
> In the config for the topology Im
> setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> AggKey.class.getName());
>
> Where else do I need to specify the (de)serializer for my key class?
>


Implementing custom key serializer

2016-12-06 Thread Jon Yeargers
Im using .map to convert my (k/v) string/Object to Object/Object but when I
chain this to an aggregation step Im getting this exception:

Exception in thread "StreamThread-1" java.lang.ClassCastException:
com.company.prtminuteagg.types.RtDetailLogLine cannot be cast to
java.lang.String
at
org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at
org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:73)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

My key object implements Serde and returns a JsonSerializer for the
'Serializer()' override.

In the config for the topology Im
setting: config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
AggKey.class.getName());

Where else do I need to specify the (de)serializer for my key class?


Re: Is CreateTopics and DeleteTopics ready for production usage?

2016-12-06 Thread Ismael Juma
As Apurva said, KIP-4 is still in progress and, as part of it, we will be
to introduce an AdminClient which will then be used by the various tools.

It's a large piece of work and it makes sense to merge it in stages. The
protocol side is ready (KIP vote passed, code and tests were reviewed and
merged) and we are not aware of issues with those requests. In other words,
you are welcome to try them out and provide feedback. :)

Ismael

On 5 Dec 2016 9:48 pm, "Apurva Mehta"  wrote:

> I should clarify, that those requests may work, but are not used in any
> active code. The integration with the rest of the system is yet to happen.
>
> On Mon, Dec 5, 2016 at 1:45 PM, Apurva Mehta  wrote:
>
> > It isn't ready yet. It is part of the work related to
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 4+-+Command+line+and+centralized+administrative+operations
> >
> > Thanks,
> > Apurva
> >
> > On Mon, Dec 5, 2016 at 11:10 AM, Dmitry Lazurkin 
> > wrote:
> >
> >> Hello.
> >>
> >> Are requests CreateTopics and DeleteTopics ready for production usage?
> >>
> >> Why TopicCommand doesn't use CreateTopics / DeleteTopics?
> >>
> >> Thanks.
> >>
> >>
> >
>


Re: Topic discovery when supporting multiple kafka clusters

2016-12-06 Thread Aseem Bansal
What configurations allow you to assign topics to specific brokers?

I can see https://kafka.apache.org/documentation#basic_ops_automigrate.
This should allow you to move things around but does that keep anything
from being re-assigned to the old ones?

On Tue, Dec 6, 2016 at 5:25 PM, Asaf Mesika  wrote:

> Why not re-use same cluster? You can assign topics to be live only within a
> specific set of brokers. Thus you have one "bus" for messages, simplifying
> your applications code and configurations
>
> On Mon, Dec 5, 2016 at 9:43 PM Yifan Ying  wrote:
>
> > Hi,
> >
> > Initially, we have only one Kafka cluster shared across all teams. But
> now
> > this cluster is very close to out of resources (disk space, # of
> > partitions, etc.). So we are considering adding another Kafka cluster.
> But
> > what's the best practice of topic discovery, so that applications know
> > which cluster their topics live? We have been using Zookeeper for service
> > discovery, maybe it's also good for this purpose?
> >
> > Thanks
> >
> > --
> > Yifan
> >
>


Re: Storing Kafka Message JSON to deep storage like S3

2016-12-06 Thread Asaf Mesika
We rolled our own since we couldn't (1.5 years ago) find one. The code is
quite simple and short.


On Tue, Dec 6, 2016 at 1:55 PM Aseem Bansal  wrote:

> I just meant that is there an existing tool which does that. Basically I
> tell it "Listen to all X streams and write them to S3/HDFS at Y path as
> JSON". I know spark streaming can be used and there is flume. But I am not
> sure about their scalability/reliability. That's why I thought to initiate
> a discussion here to see whether someone knows about that already.
>
> On Tue, Dec 6, 2016 at 5:14 PM, Sharninder  wrote:
>
> > What do you mean by streaming way? The logic to push to S3 will be in
> your
> > consumer, so it totally depends on how you want to read and store. I
> think
> > that's an easier way to do what you want to, instead of trying to backup
> > kafka and then read messages from there. Not even sure that's possible.
> >
> > On Tue, Dec 6, 2016 at 5:11 PM, Aseem Bansal 
> wrote:
> >
> > > I get that we can read them and store them in batches but is there some
> > > streaming way?
> > >
> > > On Tue, Dec 6, 2016 at 5:09 PM, Aseem Bansal 
> > wrote:
> > >
> > > > Because we need to do exploratory data analysis and machine learning.
> > We
> > > > need to backup the messages somewhere so that the data scientists can
> > > > query/load them.
> > > >
> > > > So we need something like a router that just opens up a new consumer
> > > group
> > > > which just keeps on storing them to S3.
> > > >
> > > > On Tue, Dec 6, 2016 at 5:05 PM, Sharninder Khera <
> sharnin...@gmail.com
> > >
> > > > wrote:
> > > >
> > > >> Why not just have a parallel consumer read all messages from
> whichever
> > > >> topics you're interested in and store them wherever you want to? You
> > > don't
> > > >> need to "backup" Kafka messages.
> > > >>
> > > >> _
> > > >> From: Aseem Bansal 
> > > >> Sent: Tuesday, December 6, 2016 4:55 PM
> > > >> Subject: Storing Kafka Message JSON to deep storage like S3
> > > >> To:  
> > > >>
> > > >>
> > > >> Hi
> > > >>
> > > >> Has anyone done a storage of Kafka JSON messages to deep storage
> like
> > > S3.
> > > >> We are looking to back up all of our raw Kafka JSON messages for
> > > >> Exploration. S3, HDFS, MongoDB come to mind initially.
> > > >>
> > > >> I know that it can be stored in kafka itself but storing them in
> Kafka
> > > >> itself does not seem like a good option as we won't be able to query
> > it
> > > >> and
> > > >> the configurations of machines containing kafka will have to be
> > > increased
> > > >> as we go. Something like S3 we won't have to manage.
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > --
> > Sharninder
> >
>


Re: Detecting when all the retries are expired for a message

2016-12-06 Thread Rajini Sivaram
I believe batches in RecordAccumulator are expired after request.timeout.ms,
so they wouldn't get retried in this case. I think the config options are
quite confusing, making it hard to figure out the behavior without looking
into the code.

On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika  wrote:

> Vatsal:
>
> I don't think they merged the fix for this bug (retries doesn't work) in
> 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
>
>
> On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal 
> wrote:
>
> > Hello,
> >
> > Bumping up this thread in case anyone of you have any say on this issue.
> >
> > Regards,
> > Vatsal
> >
> > -Original Message-
> > From: Mevada, Vatsal
> > Sent: 02 December 2016 16:16
> > To: Kafka Users 
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > I executed the same producer code for a single record file with following
> > config:
> >
> > properties.put("bootstrap.servers", bootstrapServer);
> > properties.put("key.serializer",
> > StringSerializer.class.getCanonicalName());
> > properties.put("value.serializer",
> > StringSerializer.class.getCanonicalName());
> > properties.put("acks", "-1");
> > properties.put("retries", 5);
> > properties.put("request.timeout.ms", 1);
> >
> > I have kept request.timeout.ms=1 to make sure that message delivery will
> > fail with TimeoutException. Since the retries are 5 then the program
> > should take at-least 5 ms (50 seconds) to complete for single record.
> > However the program is completing almost instantly with only one callback
> > with TimeoutException. I suspect that producer is not going for any
> > retries. Or am I missing something in my code?
> >
> > My Kafka version is 0.10.0.1.
> >
> > Regards,
> > Vatsal
> > Am I missing any configuration or
> > -Original Message-
> > From: Ismael Juma [mailto:isma...@gmail.com]
> > Sent: 02 December 2016 13:30
> > To: Kafka Users 
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > The callback is called after the retries have been exhausted.
> >
> > Ismael
> >
> > On 2 Dec 2016 3:34 am, "Mevada, Vatsal"  wrote:
> >
> > > @Ismael:
> > >
> > > I can handle TimeoutException in the callback. However as per the
> > > documentation of Callback(link: https://kafka.apache.org/0100/
> > > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > > TimeoutException is a retriable exception and it says that it "may be
> > > covered by increasing #.retries". So even if I get TimeoutException in
> > > callback, wouldn't it try to send message again until all the retries
> > > are done? Would it be safe to assume that message delivery is failed
> > > permanently just by encountering TimeoutException in callback?
> > >
> > > Here is a snippet from above mentioned documentation:
> > > "exception - The exception thrown during processing of this record.
> > > Null if no error occurred. Possible thrown exceptions include:
> > > Non-Retriable exceptions (fatal, the message will never be sent):
> > > InvalidTopicException OffsetMetadataTooLargeException
> > > RecordBatchTooLargeException RecordTooLargeException
> > > UnknownServerException Retriable exceptions (transient, may be covered
> > > by increasing #.retries): CorruptRecordException
> > > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> > > UnknownTopicOrPartitionException"
> > >
> > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > > face the issue that you are mentioning. I mentioned documentation link
> > > of 0.9 by mistake.
> > >
> > > Regards,
> > > Vatsal
> > > -Original Message-
> > > From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
> > > Sent: 02 December 2016 00:32
> > > To: Kafka Users 
> > > Subject: Re: Detecting when all the retries are expired for a message
> > >
> > > There's a critical bug in that section that has only been fixed in
> > > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> > really retry.
> > > I forked the kafka repo, applied the fix, built it and placed it in
> > > our own Nexus Maven repository until 0.9.0.2 will be released.
> > >
> > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> > >
> > > Feel free to use it.
> > >
> > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:
> > >
> > > > The callback should give you what you are asking for. Has it not
> > > > worked as you expect when you tried it?
> > > >
> > > > Ismael
> > > >
> > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > > 
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > >
> > > > > I am reading a file and dumping each record on Kafka. Here is my
> > 

Re: Storing Kafka Message JSON to deep storage like S3

2016-12-06 Thread Aseem Bansal
I just meant that is there an existing tool which does that. Basically I
tell it "Listen to all X streams and write them to S3/HDFS at Y path as
JSON". I know spark streaming can be used and there is flume. But I am not
sure about their scalability/reliability. That's why I thought to initiate
a discussion here to see whether someone knows about that already.

On Tue, Dec 6, 2016 at 5:14 PM, Sharninder  wrote:

> What do you mean by streaming way? The logic to push to S3 will be in your
> consumer, so it totally depends on how you want to read and store. I think
> that's an easier way to do what you want to, instead of trying to backup
> kafka and then read messages from there. Not even sure that's possible.
>
> On Tue, Dec 6, 2016 at 5:11 PM, Aseem Bansal  wrote:
>
> > I get that we can read them and store them in batches but is there some
> > streaming way?
> >
> > On Tue, Dec 6, 2016 at 5:09 PM, Aseem Bansal 
> wrote:
> >
> > > Because we need to do exploratory data analysis and machine learning.
> We
> > > need to backup the messages somewhere so that the data scientists can
> > > query/load them.
> > >
> > > So we need something like a router that just opens up a new consumer
> > group
> > > which just keeps on storing them to S3.
> > >
> > > On Tue, Dec 6, 2016 at 5:05 PM, Sharninder Khera  >
> > > wrote:
> > >
> > >> Why not just have a parallel consumer read all messages from whichever
> > >> topics you're interested in and store them wherever you want to? You
> > don't
> > >> need to "backup" Kafka messages.
> > >>
> > >> _
> > >> From: Aseem Bansal 
> > >> Sent: Tuesday, December 6, 2016 4:55 PM
> > >> Subject: Storing Kafka Message JSON to deep storage like S3
> > >> To:  
> > >>
> > >>
> > >> Hi
> > >>
> > >> Has anyone done a storage of Kafka JSON messages to deep storage like
> > S3.
> > >> We are looking to back up all of our raw Kafka JSON messages for
> > >> Exploration. S3, HDFS, MongoDB come to mind initially.
> > >>
> > >> I know that it can be stored in kafka itself but storing them in Kafka
> > >> itself does not seem like a good option as we won't be able to query
> it
> > >> and
> > >> the configurations of machines containing kafka will have to be
> > increased
> > >> as we go. Something like S3 we won't have to manage.
> > >>
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> >
>
>
>
> --
> --
> Sharninder
>


Re: Topic discovery when supporting multiple kafka clusters

2016-12-06 Thread Asaf Mesika
Why not re-use same cluster? You can assign topics to be live only within a
specific set of brokers. Thus you have one "bus" for messages, simplifying
your applications code and configurations

On Mon, Dec 5, 2016 at 9:43 PM Yifan Ying  wrote:

> Hi,
>
> Initially, we have only one Kafka cluster shared across all teams. But now
> this cluster is very close to out of resources (disk space, # of
> partitions, etc.). So we are considering adding another Kafka cluster. But
> what's the best practice of topic discovery, so that applications know
> which cluster their topics live? We have been using Zookeeper for service
> discovery, maybe it's also good for this purpose?
>
> Thanks
>
> --
> Yifan
>


Re: Storing Kafka Message JSON to deep storage like S3

2016-12-06 Thread Sudev A C
HI Aseem,

You can run Apache Flume to consume messages from Kafka and write them to
s3/HDFS in (micro batches)streaming fashion.

Writes to s3/HDFS should be in micro batches, you can do it for every
message (not every sure if s3 supports append) but it won't be performant.

https://flume.apache.org/

Thanks
Sudev

On Tue, Dec 6, 2016 at 5:14 PM Sharninder  wrote:

> What do you mean by streaming way? The logic to push to S3 will be in your
> consumer, so it totally depends on how you want to read and store. I think
> that's an easier way to do what you want to, instead of trying to backup
> kafka and then read messages from there. Not even sure that's possible.
>
> On Tue, Dec 6, 2016 at 5:11 PM, Aseem Bansal  wrote:
>
> > I get that we can read them and store them in batches but is there some
> > streaming way?
> >
> > On Tue, Dec 6, 2016 at 5:09 PM, Aseem Bansal 
> wrote:
> >
> > > Because we need to do exploratory data analysis and machine learning.
> We
> > > need to backup the messages somewhere so that the data scientists can
> > > query/load them.
> > >
> > > So we need something like a router that just opens up a new consumer
> > group
> > > which just keeps on storing them to S3.
> > >
> > > On Tue, Dec 6, 2016 at 5:05 PM, Sharninder Khera  >
> > > wrote:
> > >
> > >> Why not just have a parallel consumer read all messages from whichever
> > >> topics you're interested in and store them wherever you want to? You
> > don't
> > >> need to "backup" Kafka messages.
> > >>
> > >> _
> > >> From: Aseem Bansal 
> > >> Sent: Tuesday, December 6, 2016 4:55 PM
> > >> Subject: Storing Kafka Message JSON to deep storage like S3
> > >> To:  
> > >>
> > >>
> > >> Hi
> > >>
> > >> Has anyone done a storage of Kafka JSON messages to deep storage like
> > S3.
> > >> We are looking to back up all of our raw Kafka JSON messages for
> > >> Exploration. S3, HDFS, MongoDB come to mind initially.
> > >>
> > >> I know that it can be stored in kafka itself but storing them in Kafka
> > >> itself does not seem like a good option as we won't be able to query
> it
> > >> and
> > >> the configurations of machines containing kafka will have to be
> > increased
> > >> as we go. Something like S3 we won't have to manage.
> > >>
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> >
>
>
>
> --
> --
> Sharninder
>


Re: Storing Kafka Message JSON to deep storage like S3

2016-12-06 Thread Sharninder
What do you mean by streaming way? The logic to push to S3 will be in your
consumer, so it totally depends on how you want to read and store. I think
that's an easier way to do what you want to, instead of trying to backup
kafka and then read messages from there. Not even sure that's possible.

On Tue, Dec 6, 2016 at 5:11 PM, Aseem Bansal  wrote:

> I get that we can read them and store them in batches but is there some
> streaming way?
>
> On Tue, Dec 6, 2016 at 5:09 PM, Aseem Bansal  wrote:
>
> > Because we need to do exploratory data analysis and machine learning. We
> > need to backup the messages somewhere so that the data scientists can
> > query/load them.
> >
> > So we need something like a router that just opens up a new consumer
> group
> > which just keeps on storing them to S3.
> >
> > On Tue, Dec 6, 2016 at 5:05 PM, Sharninder Khera 
> > wrote:
> >
> >> Why not just have a parallel consumer read all messages from whichever
> >> topics you're interested in and store them wherever you want to? You
> don't
> >> need to "backup" Kafka messages.
> >>
> >> _
> >> From: Aseem Bansal 
> >> Sent: Tuesday, December 6, 2016 4:55 PM
> >> Subject: Storing Kafka Message JSON to deep storage like S3
> >> To:  
> >>
> >>
> >> Hi
> >>
> >> Has anyone done a storage of Kafka JSON messages to deep storage like
> S3.
> >> We are looking to back up all of our raw Kafka JSON messages for
> >> Exploration. S3, HDFS, MongoDB come to mind initially.
> >>
> >> I know that it can be stored in kafka itself but storing them in Kafka
> >> itself does not seem like a good option as we won't be able to query it
> >> and
> >> the configurations of machines containing kafka will have to be
> increased
> >> as we go. Something like S3 we won't have to manage.
> >>
> >>
> >>
> >>
> >>
> >
> >
>



-- 
--
Sharninder


Re: Storing Kafka Message JSON to deep storage like S3

2016-12-06 Thread Aseem Bansal
I get that we can read them and store them in batches but is there some
streaming way?

On Tue, Dec 6, 2016 at 5:09 PM, Aseem Bansal  wrote:

> Because we need to do exploratory data analysis and machine learning. We
> need to backup the messages somewhere so that the data scientists can
> query/load them.
>
> So we need something like a router that just opens up a new consumer group
> which just keeps on storing them to S3.
>
> On Tue, Dec 6, 2016 at 5:05 PM, Sharninder Khera 
> wrote:
>
>> Why not just have a parallel consumer read all messages from whichever
>> topics you're interested in and store them wherever you want to? You don't
>> need to "backup" Kafka messages.
>>
>> _
>> From: Aseem Bansal 
>> Sent: Tuesday, December 6, 2016 4:55 PM
>> Subject: Storing Kafka Message JSON to deep storage like S3
>> To:  
>>
>>
>> Hi
>>
>> Has anyone done a storage of Kafka JSON messages to deep storage like S3.
>> We are looking to back up all of our raw Kafka JSON messages for
>> Exploration. S3, HDFS, MongoDB come to mind initially.
>>
>> I know that it can be stored in kafka itself but storing them in Kafka
>> itself does not seem like a good option as we won't be able to query it
>> and
>> the configurations of machines containing kafka will have to be increased
>> as we go. Something like S3 we won't have to manage.
>>
>>
>>
>>
>>
>
>


Re: Storing Kafka Message JSON to deep storage like S3

2016-12-06 Thread Sharninder Khera
Why not just have a parallel consumer read all messages from whichever topics 
you're interested in and store them wherever you want to? You don't need to 
"backup" Kafka messages. 

_
From: Aseem Bansal 
Sent: Tuesday, December 6, 2016 4:55 PM
Subject: Storing Kafka Message JSON to deep storage like S3
To:  


Hi

Has anyone done a storage of Kafka JSON messages to deep storage like S3.
We are looking to back up all of our raw Kafka JSON messages for
Exploration. S3, HDFS, MongoDB come to mind initially.

I know that it can be stored in kafka itself but storing them in Kafka
itself does not seem like a good option as we won't be able to query it and
the configurations of machines containing kafka will have to be increased
as we go. Something like S3 we won't have to manage.





Storing Kafka Message JSON to deep storage like S3

2016-12-06 Thread Aseem Bansal
Hi

Has anyone done a storage of Kafka JSON messages to deep storage like S3.
We are looking to back up all of our raw Kafka JSON messages for
Exploration. S3, HDFS, MongoDB come to mind initially.

I know that it can be stored in kafka itself but storing them in Kafka
itself does not seem like a good option as we won't be able to query it and
the configurations of machines containing kafka will have to be increased
as we go. Something like S3 we won't have to manage.


Re: Processing older records Kafka Consumer

2016-12-06 Thread Asaf Mesika
Seek will do the trick. Just make sure that when you run it, it only runs
on partitions the current reader is assigned (call assignments() and filter
only the ones assigned to you now)

On Tue, Dec 6, 2016 at 12:30 PM Amit K  wrote:

> Sorry for not providing complete information.
>
> I use the auto-commit. Most of the other properties are more or less the
> default one.
>
> Actually further analysis reveled that the records are consumed by consumer
> but some dependent component was down (unfortunately it went completely
> un-detected :( ). Hence now I need to reconsume them all for last 2 days.
>
> Will seek() be helpful, like having another application tuned to same topic
> and start consuming in it from approx offset that was there 2 days before?
>
> Thanks for help in advance!
>
> On Tue, Dec 6, 2016 at 3:35 PM, Asaf Mesika  wrote:
>
> > Do you use auto-commit or committing your self? I'm trying to figure out
> > how the offset moved if it was stuck.
> >
> > On Tue, Dec 6, 2016 at 10:28 AM Amit K  wrote:
> >
> > > Hi,
> > >
> > > Is there any way to re-consume the older records from Kafka broker with
> > > kafka consumer?
> > >
> > > I am using kafka 0.9.0.0 In one of the scenario, I saw records for 2
> days
> > > from today were not consumed as consumer was stuck. When the consumer
> > > restarted, it started processing records from today but older records
> for
> > > last 2 days are not processed.
> > >
> > > Is there any way to achieve the same?
> > > Any help will be highly appreciated.
> > >
> > > Thanks,
> > > Amit
> > >
> >
>


Re: Processing older records Kafka Consumer

2016-12-06 Thread Amit K
Sorry for not providing complete information.

I use the auto-commit. Most of the other properties are more or less the
default one.

Actually further analysis reveled that the records are consumed by consumer
but some dependent component was down (unfortunately it went completely
un-detected :( ). Hence now I need to reconsume them all for last 2 days.

Will seek() be helpful, like having another application tuned to same topic
and start consuming in it from approx offset that was there 2 days before?

Thanks for help in advance!

On Tue, Dec 6, 2016 at 3:35 PM, Asaf Mesika  wrote:

> Do you use auto-commit or committing your self? I'm trying to figure out
> how the offset moved if it was stuck.
>
> On Tue, Dec 6, 2016 at 10:28 AM Amit K  wrote:
>
> > Hi,
> >
> > Is there any way to re-consume the older records from Kafka broker with
> > kafka consumer?
> >
> > I am using kafka 0.9.0.0 In one of the scenario, I saw records for 2 days
> > from today were not consumed as consumer was stuck. When the consumer
> > restarted, it started processing records from today but older records for
> > last 2 days are not processed.
> >
> > Is there any way to achieve the same?
> > Any help will be highly appreciated.
> >
> > Thanks,
> > Amit
> >
>


Re: Detecting when all the retries are expired for a message

2016-12-06 Thread Asaf Mesika
Vatsal:

I don't think they merged the fix for this bug (retries doesn't work) in
0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547


On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal 
wrote:

> Hello,
>
> Bumping up this thread in case anyone of you have any say on this issue.
>
> Regards,
> Vatsal
>
> -Original Message-
> From: Mevada, Vatsal
> Sent: 02 December 2016 16:16
> To: Kafka Users 
> Subject: RE: Detecting when all the retries are expired for a message
>
> I executed the same producer code for a single record file with following
> config:
>
> properties.put("bootstrap.servers", bootstrapServer);
> properties.put("key.serializer",
> StringSerializer.class.getCanonicalName());
> properties.put("value.serializer",
> StringSerializer.class.getCanonicalName());
> properties.put("acks", "-1");
> properties.put("retries", 5);
> properties.put("request.timeout.ms", 1);
>
> I have kept request.timeout.ms=1 to make sure that message delivery will
> fail with TimeoutException. Since the retries are 5 then the program
> should take at-least 5 ms (50 seconds) to complete for single record.
> However the program is completing almost instantly with only one callback
> with TimeoutException. I suspect that producer is not going for any
> retries. Or am I missing something in my code?
>
> My Kafka version is 0.10.0.1.
>
> Regards,
> Vatsal
> Am I missing any configuration or
> -Original Message-
> From: Ismael Juma [mailto:isma...@gmail.com]
> Sent: 02 December 2016 13:30
> To: Kafka Users 
> Subject: RE: Detecting when all the retries are expired for a message
>
> The callback is called after the retries have been exhausted.
>
> Ismael
>
> On 2 Dec 2016 3:34 am, "Mevada, Vatsal"  wrote:
>
> > @Ismael:
> >
> > I can handle TimeoutException in the callback. However as per the
> > documentation of Callback(link: https://kafka.apache.org/0100/
> > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > TimeoutException is a retriable exception and it says that it "may be
> > covered by increasing #.retries". So even if I get TimeoutException in
> > callback, wouldn't it try to send message again until all the retries
> > are done? Would it be safe to assume that message delivery is failed
> > permanently just by encountering TimeoutException in callback?
> >
> > Here is a snippet from above mentioned documentation:
> > "exception - The exception thrown during processing of this record.
> > Null if no error occurred. Possible thrown exceptions include:
> > Non-Retriable exceptions (fatal, the message will never be sent):
> > InvalidTopicException OffsetMetadataTooLargeException
> > RecordBatchTooLargeException RecordTooLargeException
> > UnknownServerException Retriable exceptions (transient, may be covered
> > by increasing #.retries): CorruptRecordException
> > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> > UnknownTopicOrPartitionException"
> >
> > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > face the issue that you are mentioning. I mentioned documentation link
> > of 0.9 by mistake.
> >
> > Regards,
> > Vatsal
> > -Original Message-
> > From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
> > Sent: 02 December 2016 00:32
> > To: Kafka Users 
> > Subject: Re: Detecting when all the retries are expired for a message
> >
> > There's a critical bug in that section that has only been fixed in
> > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> really retry.
> > I forked the kafka repo, applied the fix, built it and placed it in
> > our own Nexus Maven repository until 0.9.0.2 will be released.
> >
> > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> >
> > Feel free to use it.
> >
> > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:
> >
> > > The callback should give you what you are asking for. Has it not
> > > worked as you expect when you tried it?
> > >
> > > Ismael
> > >
> > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > I am reading a file and dumping each record on Kafka. Here is my
> > > > producer
> > > > code:
> > > >
> > > >
> > > >
> > > > public void produce(String topicName, String filePath, String
> > > > bootstrapServers, String encoding) {
> > > >
> > > > try (BufferedReader bf =
> > > > getBufferedReader(filePath, encoding);
> > > >
> > > > KafkaProducer
> > > > producer =
> > > > initKafkaProducer(bootstrapServers)) {
> > > >
> > > > String line;
> > > >
> > > > while ((line = bf.readLine()) 

Re: Processing older records Kafka Consumer

2016-12-06 Thread Asaf Mesika
Do you use auto-commit or committing your self? I'm trying to figure out
how the offset moved if it was stuck.

On Tue, Dec 6, 2016 at 10:28 AM Amit K  wrote:

> Hi,
>
> Is there any way to re-consume the older records from Kafka broker with
> kafka consumer?
>
> I am using kafka 0.9.0.0 In one of the scenario, I saw records for 2 days
> from today were not consumed as consumer was stuck. When the consumer
> restarted, it started processing records from today but older records for
> last 2 days are not processed.
>
> Is there any way to achieve the same?
> Any help will be highly appreciated.
>
> Thanks,
> Amit
>


Processing older records Kafka Consumer

2016-12-06 Thread Amit K
Hi,

Is there any way to re-consume the older records from Kafka broker with
kafka consumer?

I am using kafka 0.9.0.0 In one of the scenario, I saw records for 2 days
from today were not consumed as consumer was stuck. When the consumer
restarted, it started processing records from today but older records for
last 2 days are not processed.

Is there any way to achieve the same?
Any help will be highly appreciated.

Thanks,
Amit


RE: Detecting when all the retries are expired for a message

2016-12-06 Thread Mevada, Vatsal
Hello,

Bumping up this thread in case anyone of you have any say on this issue.

Regards,
Vatsal

-Original Message-
From: Mevada, Vatsal 
Sent: 02 December 2016 16:16
To: Kafka Users 
Subject: RE: Detecting when all the retries are expired for a message

I executed the same producer code for a single record file with following 
config:

properties.put("bootstrap.servers", bootstrapServer);
properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());
properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());
properties.put("acks", "-1");
properties.put("retries", 5);
properties.put("request.timeout.ms", 1);

I have kept request.timeout.ms=1 to make sure that message delivery will fail 
with TimeoutException. Since the retries are 5 then the program should take 
at-least 5 ms (50 seconds) to complete for single record. However the 
program is completing almost instantly with only one callback with 
TimeoutException. I suspect that producer is not going for any retries. Or am I 
missing something in my code? 

My Kafka version is 0.10.0.1.

Regards,
Vatsal
Am I missing any configuration or
-Original Message-
From: Ismael Juma [mailto:isma...@gmail.com]
Sent: 02 December 2016 13:30
To: Kafka Users 
Subject: RE: Detecting when all the retries are expired for a message

The callback is called after the retries have been exhausted.

Ismael

On 2 Dec 2016 3:34 am, "Mevada, Vatsal"  wrote:

> @Ismael:
>
> I can handle TimeoutException in the callback. However as per the 
> documentation of Callback(link: https://kafka.apache.org/0100/ 
> javadoc/org/apache/kafka/clients/producer/Callback.html),
> TimeoutException is a retriable exception and it says that it "may be 
> covered by increasing #.retries". So even if I get TimeoutException in 
> callback, wouldn't it try to send message again until all the retries 
> are done? Would it be safe to assume that message delivery is failed 
> permanently just by encountering TimeoutException in callback?
>
> Here is a snippet from above mentioned documentation:
> "exception - The exception thrown during processing of this record. 
> Null if no error occurred. Possible thrown exceptions include: 
> Non-Retriable exceptions (fatal, the message will never be sent): 
> InvalidTopicException OffsetMetadataTooLargeException 
> RecordBatchTooLargeException RecordTooLargeException 
> UnknownServerException Retriable exceptions (transient, may be covered 
> by increasing #.retries): CorruptRecordException 
> InvalidMetadataException NotEnoughReplicasAfterAppendException
> NotEnoughReplicasException OffsetOutOfRangeException TimeoutException 
> UnknownTopicOrPartitionException"
>
> @asaf :My kafka - API version is 0.10.0.1. So I think I should not 
> face the issue that you are mentioning. I mentioned documentation link 
> of 0.9 by mistake.
>
> Regards,
> Vatsal
> -Original Message-
> From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
> Sent: 02 December 2016 00:32
> To: Kafka Users 
> Subject: Re: Detecting when all the retries are expired for a message
>
> There's a critical bug in that section that has only been fixed in
> 0.9.0.2 which has not been release yet. Without the fix it doesn't really 
> retry.
> I forked the kafka repo, applied the fix, built it and placed it in 
> our own Nexus Maven repository until 0.9.0.2 will be released.
>
> https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
>
> Feel free to use it.
>
> On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:
>
> > The callback should give you what you are asking for. Has it not 
> > worked as you expect when you tried it?
> >
> > Ismael
> >
> > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> > 
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > I am reading a file and dumping each record on Kafka. Here is my 
> > > producer
> > > code:
> > >
> > >
> > >
> > > public void produce(String topicName, String filePath, String 
> > > bootstrapServers, String encoding) {
> > >
> > > try (BufferedReader bf = 
> > > getBufferedReader(filePath, encoding);
> > >
> > > KafkaProducer 
> > > producer =
> > > initKafkaProducer(bootstrapServers)) {
> > >
> > > String line;
> > >
> > > while ((line = bf.readLine()) !=
> > > null) {
> > >
> > > producer.send(new 
> > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > >
> > > if 
> > > (e !=
> > > null) {
> > >
> > >
> > >   e.printStackTrace();
> > >
> > > }
> > >
> > >