Re: How to clear a particular partition?

2017-08-18 Thread Hans Jespersen
Yes thanks Manikumar! I just tested this and it is indeed all in and working 
great in 0.11! I thought I would have to wait until 1.0 to be able to use and 
recommend this in production.

I published 100 messages 

seq 100 | ./bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic mytest

then deleted 90 of those 100 messages with the new kafka-delete-records.sh 
command line tool

./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 
--offset-json-file ./offsetfile.json

where offsetfile.json contains

{"partitions": [{"topic": “mytest", "partition": 0, "offset": 90}], 
"version":1 }

and then consume the messages from the beginning to verify that 90 of the 100 
messages are indeed deleted.

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--topic mytest --from-beginning
91
92
93
94
95
96
97
98
99
100


-hans




> On Aug 18, 2017, at 10:32 AM, Manikumar  wrote:
> 
> This feature got released in Kafka 0.11.0.0. You can
> use kafka-delete-records.sh script to delete data.
> 
> On Sun, Aug 13, 2017 at 11:27 PM, Hans Jespersen  wrote:
> 
>> This is an area that is being worked on. See KIP-107 for details.
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 107:+Add+purgeDataBefore()+API+in+AdminClient>
>> 
>> -hans
>> 
>> 
>>> On Aug 10, 2017, at 10:52 AM, Sven Ludwig  wrote:
>>> 
>>> Hello,
>>> 
>>> assume that all producers and consumers regarding a topic-partition have
>> been shutdown.
>>> 
>>> Is it possible in this situation to empty that topic-partition, while
>> the other topic-partitions keep working?
>>> 
>>> Like for example, is it possible to trigger a log truncation to 0 on the
>> leader for that partition using some admin tool?
>>> 
>>> Kind Regards,
>>> Sven
>>> 
>> 
>> 



Re: Making sure all of you know about Kafka Summit

2017-08-18 Thread M. Manna
I guess It's kinda late since I am already in transit for work.

Is there any plan to do something in Europe e.g. London or some other place?

On 18 Aug 2017 4:41 pm, "Gwen Shapira"  wrote:

> Hi,
>
> I figured everyone in this list kinda cares about Kafka, so just making
> sure you all know.
>
> Kafka Summit SF happens in about a week:
> https://kafka-summit.org/events/kafka-summit-sf/
>
> August 28 in San Francisco. It is not too late to register.
>
> The talks are pretty great (and very relevant to everyone here) - from new
> Kafka features to how different companies run Kafka in different ways.
>
> Even better, most of the Apache Kafka committers will attend. So if you
> ever wanted to discuss Kafka internals with the people who are writing this
> thing and reviewing every line of code, this is a good opportunity. Also a
> good time to discuss your favorite KIPs in person and argue about that
> JIRA.
>
> There will also be good food, parties and fun swag.
>
> As a member of the mailing list and a fellow procrastinator you can use the
> code kafpcf17 for 30% off your conference pass.
>
> Looking forward to see you there and to toast for Apache Kafka 1.0 :)
>
> Gwen
>


Making sure all of you know about Kafka Summit

2017-08-18 Thread Gwen Shapira
Hi,

I figured everyone in this list kinda cares about Kafka, so just making
sure you all know.

Kafka Summit SF happens in about a week:
https://kafka-summit.org/events/kafka-summit-sf/

August 28 in San Francisco. It is not too late to register.

The talks are pretty great (and very relevant to everyone here) - from new
Kafka features to how different companies run Kafka in different ways.

Even better, most of the Apache Kafka committers will attend. So if you
ever wanted to discuss Kafka internals with the people who are writing this
thing and reviewing every line of code, this is a good opportunity. Also a
good time to discuss your favorite KIPs in person and argue about that
JIRA.

There will also be good food, parties and fun swag.

As a member of the mailing list and a fellow procrastinator you can use the
code kafpcf17 for 30% off your conference pass.

Looking forward to see you there and to toast for Apache Kafka 1.0 :)

Gwen


Kafka Transactions in Connect

2017-08-18 Thread Bryan Baugher
I'm interested in knowing if theres any plan or idea to add transactions to
connect.

We make use of the JDBC source connector and its bulk extract mode. It
would be great if the connector could create a transaction around the
entire extraction in order to ensure the entire table's data made it into
Kafka before being available.


Re: Different Data Types under same topic

2017-08-18 Thread SenthilKumar K
+ dev experts for inputs.


--Senthil

On Fri, Aug 18, 2017 at 9:15 PM, SenthilKumar K 
wrote:

> Hi Users , We have planned to use Kafka for one of the use to collect data
> from different server and persist into Message Bus ..
>
> Flow Would Be :
> Source --> Kafka  -->  Streaming Engine --> Reports
>
> We like to store different types of data in the same topic , same time
> data should be accessed easily ..
>
> here is sample data :
> {"code" :"100" , "type" : "a" , "data" : " hello"}
> {"code" :"100" , "type" : "b" , "data" : " hello"}
> {"code" :"100" , "type" : "c" , "data" : " hello"}
>
> This case we want to create topic called :  *topic_100* and store all
> data but the access pattern is using type.
>
> Example :
>  1) Read only *Type : "a"* data
>
>
> There is an Option to Partition the data using type so that all types goes
> to same partition. The problem is the data is not distributed across
> cluster.
>
> What is the preferred approach to Use Same Topic but different types of
> data ?
>
>
> --Senthil
>


Re: How to clear a particular partition?

2017-08-18 Thread Sean Glover
Alternatively you can set topic overrides for retention.bytes.  By turning
back file.delete.delay.ms that change should be nearly instant after the
next log cleanup cycle.

# Apply topic config override
$ kafka-configs --alter --entity-type topics --entity-name test --zookeeper
localhost:32181 --add-config file.delete.delay.ms=0,retention.bytes=0
  Completed Updating config for entity: topic 'test'.

# Remove topic config overrides
$ kafka-configs --alter --entity-type topics --entity-name test --zookeeper
localhost:32181 --delete-config file.delete.delay.ms,retention.bytes
Completed Updating config for entity: topic 'test'.


On Fri, Aug 18, 2017 at 1:32 PM, Manikumar 
wrote:

> This feature got released in Kafka 0.11.0.0. You can
> use kafka-delete-records.sh script to delete data.
>
> On Sun, Aug 13, 2017 at 11:27 PM, Hans Jespersen 
> wrote:
>
> > This is an area that is being worked on. See KIP-107 for details.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 107:+Add+purgeDataBefore()+API+in+AdminClient>
> >
> > -hans
> >
> >
> > > On Aug 10, 2017, at 10:52 AM, Sven Ludwig  wrote:
> > >
> > > Hello,
> > >
> > > assume that all producers and consumers regarding a topic-partition
> have
> > been shutdown.
> > >
> > > Is it possible in this situation to empty that topic-partition, while
> > the other topic-partitions keep working?
> > >
> > > Like for example, is it possible to trigger a log truncation to 0 on
> the
> > leader for that partition using some admin tool?
> > >
> > > Kind Regards,
> > > Sven
> > >
> >
> >
>



-- 
Senior Software Engineer, Lightbend, Inc.



@seg1o 


Re: How to clear a particular partition?

2017-08-18 Thread Manikumar
This feature got released in Kafka 0.11.0.0. You can
use kafka-delete-records.sh script to delete data.

On Sun, Aug 13, 2017 at 11:27 PM, Hans Jespersen  wrote:

> This is an area that is being worked on. See KIP-107 for details.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 107:+Add+purgeDataBefore()+API+in+AdminClient>
>
> -hans
>
>
> > On Aug 10, 2017, at 10:52 AM, Sven Ludwig  wrote:
> >
> > Hello,
> >
> > assume that all producers and consumers regarding a topic-partition have
> been shutdown.
> >
> > Is it possible in this situation to empty that topic-partition, while
> the other topic-partitions keep working?
> >
> > Like for example, is it possible to trigger a log truncation to 0 on the
> leader for that partition using some admin tool?
> >
> > Kind Regards,
> > Sven
> >
>
>


Different Data Types under same topic

2017-08-18 Thread SenthilKumar K
Hi Users , We have planned to use Kafka for one of the use to collect data
from different server and persist into Message Bus ..

Flow Would Be :
Source --> Kafka  -->  Streaming Engine --> Reports

We like to store different types of data in the same topic , same time data
should be accessed easily ..

here is sample data :
{"code" :"100" , "type" : "a" , "data" : " hello"}
{"code" :"100" , "type" : "b" , "data" : " hello"}
{"code" :"100" , "type" : "c" , "data" : " hello"}

This case we want to create topic called :  *topic_100* and store all data
but the access pattern is using type.

Example :
 1) Read only *Type : "a"* data


There is an Option to Partition the data using type so that all types goes
to same partition. The problem is the data is not distributed across
cluster.

What is the preferred approach to Use Same Topic but different types of
data ?


--Senthil


Re: Querying consumer groups programmatically (from Golang)

2017-08-18 Thread Dan Markhasin
We are also collecting consumer group metrics from Kafka - we didn't want
to add extra unnecessary dependencies (such as burrow, which is also
overkill for what we need), so we just run a script every minute on the
brokers that parses the output of kafka-consumer-groups.sh and uploads it
to an http-listening logstash, which then indexes it in our central
ElasticSearch cluster where we keep all of our application metrics. Then,
it's easy to visualize the data with Kibana.

Unfortunately, as Jens pointed out, the output format seems to change a lot
(it changed between 0.10.0.1 to 0.10.1.0 if I'm not mistaken, and has
changed again in 0.11.0.0) and each time we upgrade we have to adjust our
script to account for the format difference.
I agree it would be great if Kafka either didn't change the format so much
or better yet exposed it via JMX.

Dan

On 12 August 2017 at 15:55, Jens Rantil  wrote:

> Hi,
>
> I am one of the maintainers of prometheus-kafka-consumer-
> group-exporter[1],
> which exports consumer group offsets and lag to Prometheus. The way we
> currently scrape this information is by periodically executing
> `kafka-consumer-groups.sh --describe` for each group and parse the output.
>
> Recently the output from `kafka-consumer-groups.sh --describe` was
> changed[2]. While I am working on a patch[3] to accomodate for the new
> output format I was wondering if there is an easier, possibly more
> stable[4] and more future proof, way for our project to extract the
> information we are interested in. Does anyone know of a Go library that
> could extract the metrics we need? Or would it make sense to refactor
> `kafka-consumer-groups.sh` to support a more structured output? I'd love to
> hear your input.
>
> Also, if Kafka exported the same metrics through JMX our project would not
> exist, but maybe that's another story...
>
> Cheers,
> Jens
>
> [1] https://github.com/kawamuray/prometheus-kafka-consumer-group-exporter
> [2]
> https://github.com/kawamuray/prometheus-kafka-consumer-
> group-exporter/issues/24
> [3]
> https://github.com/kawamuray/prometheus-kafka-consumer-
> group-exporter/pull/29
> [4] We've also encountered `kafka-consumer-groups.sh` hanging a few times
> in production. There's a race condition somewhere in the script, most
> likely when a topic is rebalancing. Currently we kill the process if it
> doesn't finish within a timeout. See
> https://github.com/kawamuray/prometheus-kafka-consumer-
> group-exporter/blob/e4cdc3b1245f636d89d7e227066f02
> 578d732165/kafka/collector.go#L44
> .
>
> --
> Want to communicate with me securely? You can find my PGP key here
> .
>


Re: Avro With Kafka

2017-08-18 Thread Stephen Durfey
Yes, the confluent SerDe's support nested avro records. Underneath the covers 
they are using avro classes (DatumReader and DatumWriter) to carry out those 
operations. So, as long as you're sending valid avro data to be produced or 
consumed, the confluent SerDe's will handle it just fine.


From: Kidong Lee 
Sent: Thursday, August 17, 2017 11:56:16 PM
To: users@kafka.apache.org
Subject: Re: Avro With Kafka

You can send avro record to kafka and consume it without schema registry.

In my approach, avro schema file avsc must be in the classpath on both
producer and consumer side.

On producer side, first write value avro serializer and set the properties
of key.serializer and value.serializer to kafka producer configuration.

For instance, the following class is avro serializer for value:

import domain.Event;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;


/**
 * Created by mykidong on 2016-05-17.
 */
public class KafkaAvroEventSerializer implements Serializer {

private static Logger log =
LoggerFactory.getLogger(KafkaAvroEventSerializer.class);

private Schema schema;


@Override
public void configure(Map map, boolean b) {

// get avro avsc schema path from kafka configuration.

String avroSchemaPath = (String) map.get("event.avro.schema.path");

Schema.Parser parser = new Schema.Parser();
try {

// construct avro schema instance from the classpath.

schema =
parser.parse(getClass().getResourceAsStream(avroSchemaPath));
}catch (IOException e)
{
throw new RuntimeException(e);
}
}

@Override
public byte[] serialize(String s, Event event) {
try {
GenericRecord datum = new GenericData.Record(schema);
datum.put("eventType", event.getEventType());
datum.put("value", event.getValue());

ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter writer = new
GenericDatumWriter(schema);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(datum, encoder);
encoder.flush();

byte[] avroBytes = out.toByteArray();
out.close();

return avroBytes;
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}

@Override
public void close() {

}
}


Kafka producer will send Event which should be replaced with your message:

Properties kafkaProp = new Properties();
..
kafkaProp.put("key.serializer",
"org.apache.kafka.common.serialization.IntegerSerializer");

// the value avro serializer written in the above.

kafkaProp.put("value.serializer", "your.package.KafkaAvroEventSerializer");
..

producer = new KafkaProducer<>(kafkaProp);

producer.send(new ProducerRecord(eventType, event));


On consumer side, avro schema instance should be cached, because the
messages consumed from kafka must be deserialized, which costs some
latency.

Avro schema instance can be constructed from the classpath and mapped
with event type key like this:

import api.dao.AvroSchemaRegistryDao;
import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;


public class MapAvroSchemaRegistryDao implements
AvroSchemaRegistryDao, InitializingBean {

private static Logger log =
LoggerFactory.getLogger(MapAvroSchemaRegistryDao.class);

private final Object LOCK = new Object();
private ConcurrentMap schemaMap = new ConcurrentHashMap<>();

private Properties eventTypeProps;

public void setEventTypeProps(Properties eventTypeProps) {
this.eventTypeProps = eventTypeProps;
}

@Override
public void afterPropertiesSet() throws Exception {
for(String eventType : eventTypeProps.stringPropertyNames())
{
String schemaPath = eventTypeProps.getProperty(eventType);

Schema.Parser parser = new Schema.Parser();
try {
Schema schema =
parser.parse(getClass().getResourceAsStream(schemaPath));

schemaMap.put(eventType, schema);

log.info("loaded avro schema " + eventType);

}catch (Exception e)
{
log.error("load fail avro schema " + eventType);
throw new RuntimeExce

Re: Different Schemas on same Kafka Topic

2017-08-18 Thread Stephen Durfey
You're welcome. I'm glad it was helpful. I think it is a good idea to maintain 
a schema that can be evolved per topic and configure the schema registry to the 
type of Avro evolution rules that fits your use case. While it is possible to 
have many different non-compatible schemas per topic, it's much easier to 
reason about both for consumers and producers if only one is maintained. It's 
also much easier to develop against if you provide that guarantee to consumers.


From: Sreejith S 
Sent: Thursday, August 17, 2017 11:13:59 PM
To: users@kafka.apache.org
Subject: Re: Different Schemas on same Kafka Topic

Thank you Stephen for a very detailed write up. Really helpful.

 I was stuck in a concept of one schema per topic. Let me try this in my
use case.

Thank you very much. And thank you Svante.

Regards,
Sreejith

On 18-Aug-2017 12:17 am, "Stephen Durfey"  wrote:

> There's a lot to unpack here, so I'll do my best to answer.
>
> How ?. You are always registering a schema against a topic using the
> > topicname and schema registry is assiging a unique id across the registry
> > cluster. Where is the global unique schema id here ?
>
>
> When I say globally unique, I mean that the id for a particular schema
> (when I say schema I'm referring to each unique version of the schema as it
> evolves) is unique across all schemas that the registry knows about. The id
> for a particular schema can appear in many different topics, but will
> always refer to one and only one schema (so long as you dont lose your
> _schemas topic, but thats a different discussion). Schemas are stored
> uniquely, but can be used by many subjects (a subject being usually the
> -key/value) [1]. So, you can have one schema appearing inside
> many different topics, and have the same id re-used, since the ids are
> unique per schema, not per subject.
>
> I think in Producer Consumer API you will have more freedom to pass a
> > schema id of ur choice and ask avro serialize/deserialize. But in connect
> > framework all these things are abstracted.
> >
>
> I disagree with this. When using the schema registry it is up to the
> serializer used to interact with it. For this part I'm specifically talking
> about the confluent kafka SerDe's. If those are being used, the behavior
> will be the same regardless of whether it is used in a generic
> KafkaProducer or in Kafka Connect. That serializer will interact with the
> schema registry (if configured to do so), and will register schemas on
> behalf of the producer. The schema registry must be in control of all
> schema IDs (see here: [2]) and it cannot be delegated to the producer.
> Otherwise it would be possible for multple producers to generate the same
> ID, and thus during deserialization the consumer wouldn't know which schema
> to deserialize with. In kafka connect, the SerDr operations are carried out
> by the specified DataConverter in the worker properties. In the quickstart
> version it defaults to using the AvroConverter, which uses the confluent
> SerDe's
>
> Its a good pointer on using NONE compatibility type so that even if schema
> > registry holds same id for a topic, each schema version under it is
> > entirely different schema. Is my understanding correct ?
> >
> > But,  when defines NONE,  the purpose of the schema registry itself
> > lost.Rght ?
> >
>
> I don't recommend using NONE. I've only ever used NONE during testing to
> allow a non-passive change to a schema to correct a previous mistake in a
> schema. This was done because deleting schemas wasn't an option (I believe
> in confluent 3.3.0 you can delete the association between a schema and a
> subject, but still cannot delete the schema itself). So, as you mention
> setting the value to NONE defeats the purpose (mostly) of the schema
> registry. If you only ever plan on dealing with the data in terms of
> generic records, NONE is fine, but you need a way of dealing with the
> multitude of types in your topic.
>
> [1]
> https://github.com/confluentinc/schema-registry/blob/
> 8eb664dbc84b1c2db3666fa0771eeb0e0909f892/avro-serializer/
> src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerDe.java#
> L83-L89
>
> [2]
> https://github.com/confluentinc/schema-registry/
> blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/
> AbstractKafkaAvroSerializer.java#L74
>
> On Thu, Aug 17, 2017 at 1:10 PM, Sreejith S  wrote:
>
> > Hi Stephen,
> >
> > Thank you very much.
> >
> > Please give clarity on the statement.
> >
> > "each unique avro schema has a unique id associated with it. That id
> > can be used across multiple different topics. The enforcement of which
> > schemas are allowed in a particular topic comes down to the combination
> of
> > the subject (usually topic-name-key/value) and version (the version
> itself
> > starts at 1 inside the subject, and itself has an id that ties to the
> > globally unique schema id). ".
> >
> > How ?. You are always registering a

Re: Global KTable value is null in Kafka Stream left join

2017-08-18 Thread Damian Guy
Hi,

If the userData value is null then that would usually mean that there
wasn't a record with the provided key in the global table. So you should
probably check if you have the expected data in the global table and also
check that your KeyMapper is returning the correct key.

Thanks,
Damian



On Fri, 18 Aug 2017 at 12:13 Duy Truong  wrote:

> Hi everyone,
>
> When using left join, I can't get the value of Global KTable record in
> ValueJoiner parameter (3rd parameter). Here is my code:
>
> val userTable: GlobalKTable[String, UserData] =
> builder.globalTable(Serdes.String(), userDataSede, userTopic,
> userDataStore)
>
> val jvnStream: KStream[String, JVNModel] = sourceStream.leftJoin(userTable,
>   (eventId: String, dataLog: DataLog) => {
> dataLog.rawData.userId
>   },
>   (dataLog, userData: UserData) => {
> // userData is null.
>
>   })
>
> What I have to do to resolve this issue?
>
> Thanks
> --
> *Duy Truong*
>


Global KTable value is null in Kafka Stream left join

2017-08-18 Thread Duy Truong
Hi everyone,

When using left join, I can't get the value of Global KTable record in
ValueJoiner parameter (3rd parameter). Here is my code:

val userTable: GlobalKTable[String, UserData] =
builder.globalTable(Serdes.String(), userDataSede, userTopic, userDataStore)

val jvnStream: KStream[String, JVNModel] = sourceStream.leftJoin(userTable,
  (eventId: String, dataLog: DataLog) => {
dataLog.rawData.userId
  },
  (dataLog, userData: UserData) => {
// userData is null.

  })

What I have to do to resolve this issue?

Thanks
-- 
*Duy Truong*


Question on Kafka Producer Transaction Id

2017-08-18 Thread Sameer Kumar
Hi,

I have a question on Kafka transaction.id config related to atomic writes
feature of Kafka11. If I have multiple producers across different JVMs, do
i need to set transactional.id differently for each JVM. Does transaction.id
controls the begin and ending of transactions.

If its not set unique, how would the following case be handled.

T1.begin (Producer 1)

T1.begin (Producer 2)
T2.end (Producer 2)
T2.end (Producer 1)

-Sameer.


Re: Continue to consume messages when exception occurs in Kafka Stream

2017-08-18 Thread Duy Truong
OK, I got it, thank you Damian, Eno.

On Fri, Aug 18, 2017 at 4:30 PM, Damian Guy  wrote:

> Duy, if it is in you logic then you need to handle the exception yourself.
> If you don't then it will bubble out and kill the thread.
>
> On Fri, 18 Aug 2017 at 10:27 Duy Truong 
> wrote:
>
> > Hi Eno,
> >
> > Sorry for late reply, it's not a deserialization exception, it's a
> pattern
> > matching exception in my logic.
> >
> > val jvnStream: KStream[String, JVNModel] = sourceStream.leftJoin(
> userTable,
> >   (eventId: String, datatup: (DataLog, Option[CrawlData])) => {
> > datatup._1.rawData.userId
> >   },
> >   (tuple, fbData: FacebookData) => {
> > val (dmpData, Some(crawData)) = tuple // exception here
> >
> > // something here
> >
> >   })
> >
> > Thanks
> >
> >
> > On Thu, Aug 17, 2017 at 11:11 PM, Duy Truong  >
> > wrote:
> >
> > > Hi everyone,
> > >
> > > My kafka stream app has an exception (my business exception), and then
> it
> > > doesn't consume messages anymore. Is there any way to make my app
> > continues
> > > consume messages when the exception occurs?
> > >
> > > Thanks
> > >
> > > --
> > > *Duy Truong*
> > >
> >
> >
> >
> > --
> > *Duy Truong*
> >
>



-- 
*Duy Truong*


Re: Continue to consume messages when exception occurs in Kafka Stream

2017-08-18 Thread Damian Guy
Duy, if it is in you logic then you need to handle the exception yourself.
If you don't then it will bubble out and kill the thread.

On Fri, 18 Aug 2017 at 10:27 Duy Truong  wrote:

> Hi Eno,
>
> Sorry for late reply, it's not a deserialization exception, it's a pattern
> matching exception in my logic.
>
> val jvnStream: KStream[String, JVNModel] = sourceStream.leftJoin(userTable,
>   (eventId: String, datatup: (DataLog, Option[CrawlData])) => {
> datatup._1.rawData.userId
>   },
>   (tuple, fbData: FacebookData) => {
> val (dmpData, Some(crawData)) = tuple // exception here
>
> // something here
>
>   })
>
> Thanks
>
>
> On Thu, Aug 17, 2017 at 11:11 PM, Duy Truong 
> wrote:
>
> > Hi everyone,
> >
> > My kafka stream app has an exception (my business exception), and then it
> > doesn't consume messages anymore. Is there any way to make my app
> continues
> > consume messages when the exception occurs?
> >
> > Thanks
> >
> > --
> > *Duy Truong*
> >
>
>
>
> --
> *Duy Truong*
>


Re: Continue to consume messages when exception occurs in Kafka Stream

2017-08-18 Thread Duy Truong
Hi Eno,

Sorry for late reply, it's not a deserialization exception, it's a pattern
matching exception in my logic.

val jvnStream: KStream[String, JVNModel] = sourceStream.leftJoin(userTable,
  (eventId: String, datatup: (DataLog, Option[CrawlData])) => {
datatup._1.rawData.userId
  },
  (tuple, fbData: FacebookData) => {
val (dmpData, Some(crawData)) = tuple // exception here

// something here

  })

Thanks


On Thu, Aug 17, 2017 at 11:11 PM, Duy Truong 
wrote:

> Hi everyone,
>
> My kafka stream app has an exception (my business exception), and then it
> doesn't consume messages anymore. Is there any way to make my app continues
> consume messages when the exception occurs?
>
> Thanks
>
> --
> *Duy Truong*
>



-- 
*Duy Truong*


Re: Topic Creation fails - Need help

2017-08-18 Thread Raghav
Broker is 100% running. ZK path shows /broker/ids/1

On Fri, Aug 18, 2017 at 1:02 AM, Yang Cui  wrote:

> please use zk client to check the path:/brokers/ids in ZK
>
> 发自我的 iPhone
>
> > 在 2017年8月18日,下午3:14,Raghav  写道:
> >
> > Hi
> >
> > I have a 1 broker and 1 zookeeper on the same VM. I am using Kafka
> 10.2.1.
> > I am trying to create a topic using below command:
> >
> > "bin/kafka-topics.sh --create --zookeeper localhost:2181
> > --replication-factor 1 --partitions 16 --topic topicTest04"
> >
> > It fails with the below error. Just wondering why admin tools don't think
> > that there is any broker available while the broker is up ? Any input is
> > greatly appreciated.
> >
> > *"Error while executing topic command : replication factor: 1 larger than
> > available brokers: 0*
> > *[2017-08-18 07:05:47,813] ERROR
> > org.apache.kafka.common.errors.InvalidReplicationFactorException:
> > replication factor: 1 larger than available brokers: 0*
> > * (kafka.admin.TopicCommand$)"*
> >
> > Thanks.
> >
> > --
> > Raghav
>



-- 
Raghav


Re: Topic Creation fails - Need help

2017-08-18 Thread Yang Cui
please use zk client to check the path:/brokers/ids in ZK

发自我的 iPhone

> 在 2017年8月18日,下午3:14,Raghav  写道:
> 
> Hi
> 
> I have a 1 broker and 1 zookeeper on the same VM. I am using Kafka 10.2.1.
> I am trying to create a topic using below command:
> 
> "bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 16 --topic topicTest04"
> 
> It fails with the below error. Just wondering why admin tools don't think
> that there is any broker available while the broker is up ? Any input is
> greatly appreciated.
> 
> *"Error while executing topic command : replication factor: 1 larger than
> available brokers: 0*
> *[2017-08-18 07:05:47,813] ERROR
> org.apache.kafka.common.errors.InvalidReplicationFactorException:
> replication factor: 1 larger than available brokers: 0*
> * (kafka.admin.TopicCommand$)"*
> 
> Thanks.
> 
> -- 
> Raghav


Re: Topic Creation fails - Need help

2017-08-18 Thread Yang Cui
your broker is not running

发自我的 iPhone

> 在 2017年8月18日,下午3:14,Raghav  写道:
> 
> Hi
> 
> I have a 1 broker and 1 zookeeper on the same VM. I am using Kafka 10.2.1.
> I am trying to create a topic using below command:
> 
> "bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 16 --topic topicTest04"
> 
> It fails with the below error. Just wondering why admin tools don't think
> that there is any broker available while the broker is up ? Any input is
> greatly appreciated.
> 
> *"Error while executing topic command : replication factor: 1 larger than
> available brokers: 0*
> *[2017-08-18 07:05:47,813] ERROR
> org.apache.kafka.common.errors.InvalidReplicationFactorException:
> replication factor: 1 larger than available brokers: 0*
> * (kafka.admin.TopicCommand$)"*
> 
> Thanks.
> 
> -- 
> Raghav


Topic Creation fails - Need help

2017-08-18 Thread Raghav
Hi

I have a 1 broker and 1 zookeeper on the same VM. I am using Kafka 10.2.1.
I am trying to create a topic using below command:

"bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 16 --topic topicTest04"

It fails with the below error. Just wondering why admin tools don't think
that there is any broker available while the broker is up ? Any input is
greatly appreciated.

*"Error while executing topic command : replication factor: 1 larger than
available brokers: 0*
*[2017-08-18 07:05:47,813] ERROR
org.apache.kafka.common.errors.InvalidReplicationFactorException:
replication factor: 1 larger than available brokers: 0*
* (kafka.admin.TopicCommand$)"*

Thanks.

-- 
Raghav


Re: Querying consumer groups programmatically (from Golang)

2017-08-18 Thread Gabriel Machado
Hello,

Could you tell me if burrow or remora is compatible with ssl kafka clusters
?


Gabriel.

2017-08-16 15:39 GMT+02:00 Gabriel Machado :

> Hi Jens and Ian,
>
> Very usefuls projects :).
> What's the difference between the 2 softwares ?
> Do they support kafka ssl clusters ?
>
> Thanks,
> Gabriel.
>
> 2017-08-13 3:29 GMT+02:00 Ian Duffy :
>
>> Hi Jens,
>>
>> We did something similar to this at Zalando.
>>
>> https://github.com/zalando-incubator/remora
>>
>> It effectively supplies the kafka consumer group supply command as a http
>> endpoint.
>>
>> On 12 August 2017 at 16:42, Subhash Sriram 
>> wrote:
>>
>> > Hi Jens,
>> >
>> > Have you looked at Burrow?
>> >
>> > https://github.com/linkedin/Burrow/blob/master/README.md
>> >
>> > Thanks,
>> > Subhash
>> >
>> > Sent from my iPhone
>> >
>> > > On Aug 12, 2017, at 8:55 AM, Jens Rantil 
>> wrote:
>> > >
>> > > Hi,
>> > >
>> > > I am one of the maintainers of prometheus-kafka-consumer-
>> > group-exporter[1],
>> > > which exports consumer group offsets and lag to Prometheus. The way we
>> > > currently scrape this information is by periodically executing
>> > > `kafka-consumer-groups.sh --describe` for each group and parse the
>> > output.
>> > >
>> > > Recently the output from `kafka-consumer-groups.sh --describe` was
>> > > changed[2]. While I am working on a patch[3] to accomodate for the new
>> > > output format I was wondering if there is an easier, possibly more
>> > > stable[4] and more future proof, way for our project to extract the
>> > > information we are interested in. Does anyone know of a Go library
>> that
>> > > could extract the metrics we need? Or would it make sense to refactor
>> > > `kafka-consumer-groups.sh` to support a more structured output? I'd
>> love
>> > to
>> > > hear your input.
>> > >
>> > > Also, if Kafka exported the same metrics through JMX our project would
>> > not
>> > > exist, but maybe that's another story...
>> > >
>> > > Cheers,
>> > > Jens
>> > >
>> > > [1] https://github.com/kawamuray/prometheus-kafka-consumer-
>> > group-exporter
>> > > [2]
>> > > https://github.com/kawamuray/prometheus-kafka-consumer-
>> > group-exporter/issues/24
>> > > [3]
>> > > https://github.com/kawamuray/prometheus-kafka-consumer-
>> > group-exporter/pull/29
>> > > [4] We've also encountered `kafka-consumer-groups.sh` hanging a few
>> times
>> > > in production. There's a race condition somewhere in the script, most
>> > > likely when a topic is rebalancing. Currently we kill the process if
>> it
>> > > doesn't finish within a timeout. See
>> > > https://github.com/kawamuray/prometheus-kafka-consumer-
>> > group-exporter/blob/e4cdc3b1245f636d89d7e227066f02
>> > 578d732165/kafka/collector.go#L44
>> > > .
>> > >
>> > > --
>> > > Want to communicate with me securely? You can find my PGP key here
>> > > .
>> >
>>
>
>