Re: Kafka connect jdbc Sink connector issues when moving from MySQL to Oracle DB

2016-09-30 Thread Srikrishna Alla
Thanks for the response Shikar. The issue was happening because the table
metadata was sending the column names of the table in upper case and the
connector is expecting the table names to be in lower case. I fixed it by
creating the table with table columns like "updated_by". This way, I am no
longer having this issue. Regd the NVARCHAR2(4000) issue, Oracle DB threw
an error saying out of range for the data_type.

Thanks,
Sri

On Fri, Sep 30, 2016 at 2:03 PM, Shikhar Bhushan 
wrote:

> Hi Srikrishna,
>
> For future please address questions related to Confluent's connectors to
> the relevant ML (https://groups.google.com/forum/#!forum/confluent-
> platform
> ).
>
> The NVARCHAR2(4000) mapping for string types for Oracle was based on my
> reading of the documentation which states it can hold up to 4000
> characters, and when auto-creating a table we're aiming for the most
> broadly-applicable datatype that makes sense. You mention that it only
> supports upto 2000, is that because the max limit is overridable at the
> db-level? Perhaps CLOB make more sense?
>
> The error you are running into is because the connector refuses to issue
> ALTER's that will add columns which are required (not optional, and no
> default value), as that is potentially unsafe. You will have to add
> the 'app_name'
> column manually. Alternately if you don't require that column to be
> propagated to the database, you can use the `fields.whitelist`
> configuration to whitelist the desired fields.
>
> Best,
>
> Shikhar
>
> On Fri, Sep 30, 2016 at 8:38 AM Srikrishna Alla  >
> wrote:
>
> > Hi,
> >
> > I am facing issues with jdbc Sink Connector when working with Oracle DB.
> > This functionality was working fine when I was using MySQL DB.
> >
> > First error I had was when trying to create table using auto.create =
> true.
> > It tried to create table for STRING fields as NVARCHAR2(4000) (which I
> see
> > is by default what will be used for STRING Schema Type). This failed as
> > NVARCHAR2 has support only till 2000.
> >
> > To rectify this, I created the table and ran connector again expecting it
> > to write to the DB. Now, I am getting the following error -
> >
> > [2016-09-30 10:21:16,627] ERROR Task is being killed and will not recover
> > until manually restarted:
> > (org.apache.kafka.connect.runtime.WorkerSinkTask:303)
> > org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add
> > missing field SinkRecordField{type=STRING, name='app_name',
> > isOptional=false}, as it is not optional and does not have a default
> value
> >at
> >
> > com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_
> connector.sink.DbStructure.amendIfNecessary(DbStructure.java:117)
> >at
> >
> > com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_
> connector.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:59)
> >at
> >
> > com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_
> connector.sink.AlertWriter.write(AlertWriter.java:57)
> >at
> >
> > com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.
> EtlJdbcSinkTask.put(EtlJdbcSinkTask.java:53)
> >at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(
> WorkerSinkTask.java:280)
> >at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(
> WorkerSinkTask.java:176)
> >at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(
> WorkerSinkTaskThread.java:90)
> >at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
> execute(WorkerSinkTaskThread.java:58)
> >at
> >
> > org.apache.kafka.connect.util.ShutdownableThread.run(
> ShutdownableThread.java:82)
> > [2016-09-30 10:21:16,629] ERROR Thread WorkerSinkTask-jdbc-sink-
> connector-0
> > exiting with uncaught exception:
> > (org.apache.kafka.connect.util.ShutdownableThread:84)
> > org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
> > due to unrecoverable exception.
> >at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(
> WorkerSinkTask.java:304)
> >at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(
> WorkerSinkTask.java:176)
> >at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(
> WorkerSinkTaskThread.java:90)
> >at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTaskThread.
> execute(WorkerSinkTaskThread.java:58)
> >at
> >
> > org.apache.kafka.connect.util.ShutdownableThread.run(
> ShutdownableThread.java:82)
> > Exception in thread "WorkerSinkTask-jdbc-sink-connector-0"
> > org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
> > due to unrecoverable exception.
> >at
> >
> > org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(
> WorkerSinkTask.java:304)
> >at
> >
> > 

list of stream or consumer objects per topic

2016-09-30 Thread Mudassir Maredia
I am using kafka api 0.10.

//Sample code
 List topicsList = new ArrayList<>();
 topicsList.add("topic1");
 topicsList.add("topic2");

  KafkaConsumer consumer = new KafkaConsumer(props);
 consumer.subscribe(topicsList);

Problem:

For each topic, I want to spawn a separate thread who would be handling
data on it. It seems like for that to achieve I have to create multiple
KafkaConsumer. I don't want to do that. Does anyone have any idea how to
achieve that simply.

Previously, in 0.8 version if have used createMessageStreams method which
returns collection of kafkaStream (one for each topic). I want some thing
similar to that.

//0.8 code sample
Map>> consumerMap = consumer.
createMessageStreams(topicCountMap);


Thanks,

Moody


Re: Kafka connect jdbc Sink connector issues when moving from MySQL to Oracle DB

2016-09-30 Thread Shikhar Bhushan
Hi Srikrishna,

For future please address questions related to Confluent's connectors to
the relevant ML (https://groups.google.com/forum/#!forum/confluent-platform
).

The NVARCHAR2(4000) mapping for string types for Oracle was based on my
reading of the documentation which states it can hold up to 4000
characters, and when auto-creating a table we're aiming for the most
broadly-applicable datatype that makes sense. You mention that it only
supports upto 2000, is that because the max limit is overridable at the
db-level? Perhaps CLOB make more sense?

The error you are running into is because the connector refuses to issue
ALTER's that will add columns which are required (not optional, and no
default value), as that is potentially unsafe. You will have to add
the 'app_name'
column manually. Alternately if you don't require that column to be
propagated to the database, you can use the `fields.whitelist`
configuration to whitelist the desired fields.

Best,

Shikhar

On Fri, Sep 30, 2016 at 8:38 AM Srikrishna Alla 
wrote:

> Hi,
>
> I am facing issues with jdbc Sink Connector when working with Oracle DB.
> This functionality was working fine when I was using MySQL DB.
>
> First error I had was when trying to create table using auto.create = true.
> It tried to create table for STRING fields as NVARCHAR2(4000) (which I see
> is by default what will be used for STRING Schema Type). This failed as
> NVARCHAR2 has support only till 2000.
>
> To rectify this, I created the table and ran connector again expecting it
> to write to the DB. Now, I am getting the following error -
>
> [2016-09-30 10:21:16,627] ERROR Task is being killed and will not recover
> until manually restarted:
> (org.apache.kafka.connect.runtime.WorkerSinkTask:303)
> org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add
> missing field SinkRecordField{type=STRING, name='app_name',
> isOptional=false}, as it is not optional and does not have a default value
>at
>
> com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.DbStructure.amendIfNecessary(DbStructure.java:117)
>at
>
> com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:59)
>at
>
> com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.AlertWriter.write(AlertWriter.java:57)
>at
>
> com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.EtlJdbcSinkTask.put(EtlJdbcSinkTask.java:53)
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:280)
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
>at
>
> org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
> [2016-09-30 10:21:16,629] ERROR Thread WorkerSinkTask-jdbc-sink-connector-0
> exiting with uncaught exception:
> (org.apache.kafka.connect.util.ShutdownableThread:84)
> org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
> due to unrecoverable exception.
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:304)
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
>at
>
> org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
> Exception in thread "WorkerSinkTask-jdbc-sink-connector-0"
> org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
> due to unrecoverable exception.
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:304)
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
>at
>
> org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
>at
>
> org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
>
> Please let me know if anyone has seen this issue before.
>
> Thanks,
> Sri
>


Re: rack aware consumer

2016-09-30 Thread Ian Wrigley
Unfortunately, that’s not the way Kafka works wrt Consumers. When a partition 
is replicated, only one replica is the Leader — all reads and writes are done 
via the Leader. The other replicas are Followers; their only job is to keep up 
with the Leader. No read requests from Consumers go to Followers.

Ian.

---
Ian Wrigley
Director, Education Services
Confluent, Inc

> On Sep 30, 2016, at 12:32 PM, Ezra Stuetzel  wrote:
> 
> Hi,
> Yeah I am aware of MirrorMaker. We tried to simplify our architecture so as
> to avoid needing to use MirrorMaker and just rely on the rack replication
> for cross datacenter replication. I think the only missing piece to this is
> making consumers only read from a subset of the nodes in the cluster,
> specifically the rack/datacenter local nodes.
> Thanks,
> Ezra
> 
> 
> On Fri, Sep 30, 2016 at 8:03 AM, Marko Bonaći 
> wrote:
> 
>> AFAIK (not actually using myself), for cross DC replication people tend to
>> use MirrorMaker to transfer one cluster's data to another, usually a kind
>> of central DC that unifies all "regional" DCs, but the layout depends on
>> your business reqs.
>> Then your consumer are assigned only with local brokers' addresses.
>> Exactly because of the reason you mentioned, high latency of consuming from
>> a remote broker and not being able to control partition assignment, i.e.
>> which broker becomes the leader if current leader fails, since this is
>> governed by the rule that says the most up to date in-sync replica becomes
>> the leader.
>> 
>> 
>> Marko Bonaći
>> Monitoring | Alerting | Anomaly Detection | Centralized Log Management
>> Solr & Elasticsearch Support
>> Sematext  | Contact
>> 
>> 
>> On Thu, Sep 29, 2016 at 7:30 PM, Ezra Stuetzel 
>> wrote:
>> 
>>> Hi,
>>> In kafka 0.10 is there a way to configure the consumer such that it is
>> rack
>>> aware? We replicate data across all our 'racks' and want consumers to
>>> choose brokers that are rack local whenever possible. Our configured
>> racks
>>> are actually in different datacenters so there is much higher network
>> cost
>>> of not consuming from nearest replica.
>>> 
>>> Configuring the consumer to only consume from specific hosts would also
>>> achieve what we are trying to do if that is possible?
>>> 
>>> Also, are there any major downsides to using the rack setting for cross
>>> datacenter replication?
>>> 
>>> Thanks,
>>> Ezra
>>> 
>> 



Re: rack aware consumer

2016-09-30 Thread Ezra Stuetzel
Hi,
Yeah I am aware of MirrorMaker. We tried to simplify our architecture so as
to avoid needing to use MirrorMaker and just rely on the rack replication
for cross datacenter replication. I think the only missing piece to this is
making consumers only read from a subset of the nodes in the cluster,
specifically the rack/datacenter local nodes.
Thanks,
Ezra


On Fri, Sep 30, 2016 at 8:03 AM, Marko Bonaći 
wrote:

> AFAIK (not actually using myself), for cross DC replication people tend to
> use MirrorMaker to transfer one cluster's data to another, usually a kind
> of central DC that unifies all "regional" DCs, but the layout depends on
> your business reqs.
> Then your consumer are assigned only with local brokers' addresses.
> Exactly because of the reason you mentioned, high latency of consuming from
> a remote broker and not being able to control partition assignment, i.e.
> which broker becomes the leader if current leader fails, since this is
> governed by the rule that says the most up to date in-sync replica becomes
> the leader.
>
>
> Marko Bonaći
> Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> Solr & Elasticsearch Support
> Sematext  | Contact
> 
>
> On Thu, Sep 29, 2016 at 7:30 PM, Ezra Stuetzel 
> wrote:
>
> > Hi,
> > In kafka 0.10 is there a way to configure the consumer such that it is
> rack
> > aware? We replicate data across all our 'racks' and want consumers to
> > choose brokers that are rack local whenever possible. Our configured
> racks
> > are actually in different datacenters so there is much higher network
> cost
> > of not consuming from nearest replica.
> >
> > Configuring the consumer to only consume from specific hosts would also
> > achieve what we are trying to do if that is possible?
> >
> > Also, are there any major downsides to using the rack setting for cross
> > datacenter replication?
> >
> > Thanks,
> > Ezra
> >
>


Re: Kafka streaming and topic filter whitelist

2016-09-30 Thread Gary Ogden
So how exactly would that work? For example, I can currently do this:

KStream
textLines = builder.stream(stringSerde, stringSerde, SYSTEM_TOPIC);

Are you saying that I could put a regex in place of the SYSTEM_TOPIC and
that one KStream would be streaming from multiple topics that match that
regex?

If so, that could be useful.

Gary


On 30 September 2016 at 13:35, Damian Guy  wrote:

> Hi Gary,
>
> In the upcoming 0.10.1 release you can do regex subscription - will that
> help?
>
> Thanks,
> Damian
>
> On Fri, 30 Sep 2016 at 14:57 Gary Ogden  wrote:
>
> > Is it possible to use the topic filter whitelist within a Kafka Streaming
> > application? Or can it only be done in a consumer job?
> >
>


Re: Kafka streaming and topic filter whitelist

2016-09-30 Thread Damian Guy
Hi Gary,

In the upcoming 0.10.1 release you can do regex subscription - will that
help?

Thanks,
Damian

On Fri, 30 Sep 2016 at 14:57 Gary Ogden  wrote:

> Is it possible to use the topic filter whitelist within a Kafka Streaming
> application? Or can it only be done in a consumer job?
>


Kafka connect jdbc Sink connector issues when moving from MySQL to Oracle DB

2016-09-30 Thread Srikrishna Alla
Hi,

I am facing issues with jdbc Sink Connector when working with Oracle DB.
This functionality was working fine when I was using MySQL DB.

First error I had was when trying to create table using auto.create = true.
It tried to create table for STRING fields as NVARCHAR2(4000) (which I see
is by default what will be used for STRING Schema Type). This failed as
NVARCHAR2 has support only till 2000.

To rectify this, I created the table and ran connector again expecting it
to write to the DB. Now, I am getting the following error -

[2016-09-30 10:21:16,627] ERROR Task is being killed and will not recover
until manually restarted:
(org.apache.kafka.connect.runtime.WorkerSinkTask:303)
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add
missing field SinkRecordField{type=STRING, name='app_name',
isOptional=false}, as it is not optional and does not have a default value
   at
com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.DbStructure.amendIfNecessary(DbStructure.java:117)
   at
com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:59)
   at
com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.AlertWriter.write(AlertWriter.java:57)
   at
com.att.bdcoe.data_platform.etl.kafka_connectors.jdbc_connector.sink.EtlJdbcSinkTask.put(EtlJdbcSinkTask.java:53)
   at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:280)
   at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
   at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
   at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
   at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
[2016-09-30 10:21:16,629] ERROR Thread WorkerSinkTask-jdbc-sink-connector-0
exiting with uncaught exception:
(org.apache.kafka.connect.util.ShutdownableThread:84)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
due to unrecoverable exception.
   at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:304)
   at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
   at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
   at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
   at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Exception in thread "WorkerSinkTask-jdbc-sink-connector-0"
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
due to unrecoverable exception.
   at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:304)
   at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
   at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
   at
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
   at
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

Please let me know if anyone has seen this issue before.

Thanks,
Sri


Re: Topic with many paritions and many consumers, recommendations wanted.

2016-09-30 Thread Gerrit Jansen van Vuuren
take a look at kafka client https://github.com/gerritjvv/kafka-fast, it
uses a different approach where you can have more than several consumers
per topic+partition (i.e no relation between topic partitions and
consumers). It uses redis but only for offsets and work distribution, not
for the messages itself.

On Fri, Sep 30, 2016 at 4:07 PM, craig w  wrote:

> I have a scenario where, for a given topic I'll have 500 consumers (1
> consumer per instance of an app). I've setup the topic so it has 500
> partitions, thus ensuring each consumer will eventually get work (the data
> produced into kafka  use the default partitioning strategy).
>
> note: These consumer app instances are run in containers via Marathon
> (using Mesos).
>
> Several times a day the consumer apps can be intentionally restarted (to
> upgrade the app, etc). When a rolling restart occurs, Kafka begins its
> rebalancing process. This process can take 10 minutes or so as the rolling
> restart itself takes a few minutes. As a result, what I've seen is that a
> consumer will have its partitions reassigned, consume a new message, start
> working on it, and then a reassignment occurs again. The work being
> performed when a message is received is effectively lost since messages
> being processed take 30s - 2 hours to process, and a re-assignment occurs.
>
> One suggestion from someone was to create a separate "app" in marathon for
> each instance, therefore I'd have 500 apps in marathon, and assign each one
> a specific partition number instead of letting Kafka assign partitions
> automatically to the consumers. This is problematic because I need to be
> able to increase/decrease the number of instances of the app based on
> demand coming into the system.
>
> To work around this, we have a custom component that consumes kafka topics
> and puts messages into redis lists (one per kafka topic). Then our
> consumers are doing a BLPOP (blocking pop operation) to ensure the message
> is only processed once, but also helps avoid rebalancing in kafka when the
> consumer apps are restarted.
>
> I'm considering using a different queueing system such as ActiveMQ,
> RabbitMQ...to avoid this kafka to redis scenario. Is kafka the right fit?
> Is there a better approach to doing this with kafka?
>
> Thanks in advance,
> Craig
>


Re: rack aware consumer

2016-09-30 Thread Marko Bonaći
AFAIK (not actually using myself), for cross DC replication people tend to
use MirrorMaker to transfer one cluster's data to another, usually a kind
of central DC that unifies all "regional" DCs, but the layout depends on
your business reqs.
Then your consumer are assigned only with local brokers' addresses.
Exactly because of the reason you mentioned, high latency of consuming from
a remote broker and not being able to control partition assignment, i.e.
which broker becomes the leader if current leader fails, since this is
governed by the rule that says the most up to date in-sync replica becomes
the leader.


Marko Bonaći
Monitoring | Alerting | Anomaly Detection | Centralized Log Management
Solr & Elasticsearch Support
Sematext  | Contact


On Thu, Sep 29, 2016 at 7:30 PM, Ezra Stuetzel 
wrote:

> Hi,
> In kafka 0.10 is there a way to configure the consumer such that it is rack
> aware? We replicate data across all our 'racks' and want consumers to
> choose brokers that are rack local whenever possible. Our configured racks
> are actually in different datacenters so there is much higher network cost
> of not consuming from nearest replica.
>
> Configuring the consumer to only consume from specific hosts would also
> achieve what we are trying to do if that is possible?
>
> Also, are there any major downsides to using the rack setting for cross
> datacenter replication?
>
> Thanks,
> Ezra
>


Topic with many paritions and many consumers, recommendations wanted.

2016-09-30 Thread craig w
I have a scenario where, for a given topic I'll have 500 consumers (1
consumer per instance of an app). I've setup the topic so it has 500
partitions, thus ensuring each consumer will eventually get work (the data
produced into kafka  use the default partitioning strategy).

note: These consumer app instances are run in containers via Marathon
(using Mesos).

Several times a day the consumer apps can be intentionally restarted (to
upgrade the app, etc). When a rolling restart occurs, Kafka begins its
rebalancing process. This process can take 10 minutes or so as the rolling
restart itself takes a few minutes. As a result, what I've seen is that a
consumer will have its partitions reassigned, consume a new message, start
working on it, and then a reassignment occurs again. The work being
performed when a message is received is effectively lost since messages
being processed take 30s - 2 hours to process, and a re-assignment occurs.

One suggestion from someone was to create a separate "app" in marathon for
each instance, therefore I'd have 500 apps in marathon, and assign each one
a specific partition number instead of letting Kafka assign partitions
automatically to the consumers. This is problematic because I need to be
able to increase/decrease the number of instances of the app based on
demand coming into the system.

To work around this, we have a custom component that consumes kafka topics
and puts messages into redis lists (one per kafka topic). Then our
consumers are doing a BLPOP (blocking pop operation) to ensure the message
is only processed once, but also helps avoid rebalancing in kafka when the
consumer apps are restarted.

I'm considering using a different queueing system such as ActiveMQ,
RabbitMQ...to avoid this kafka to redis scenario. Is kafka the right fit?
Is there a better approach to doing this with kafka?

Thanks in advance,
Craig


Kafka streaming and topic filter whitelist

2016-09-30 Thread Gary Ogden
Is it possible to use the topic filter whitelist within a Kafka Streaming
application? Or can it only be done in a consumer job?


producer hangs when partition leader is powered off

2016-09-30 Thread dimitri tombroff
Hello all;

I easily reproduce an annoying scenario, using Kafka
brokers kafka_2.10-0.8.2.0, but I believe it has nothing to do with the
brokersbut only with the consumer API.

Here is the problem: a producer is continuously writing using the sync
producer api  to a topic (2 partitions, replicated), on a three Kafka
cluster nodes.
I then power off the Kafka broker that is leader on at least one of the
partition.

What I observe is my Java client code is stuck for very long time (several
minutes). I tested the  kafka_2.10-0.8.2.0 and the latest
Right now I see no other option to kill restart my java application when I
detect that, but of course it's cumbersome.

I tried playing with the request timeout but without success.

Clearly the producer is stuck on the standard socket calls,  in a case
where the target node is unreachable, and of course did not cleanly closed
any socket,
but it seems to me that the producer should react on its own no ?
Thanks in advance for any advise,

Below are the stack traces. I can repeateadly generate them, it's always
stuck there.

Dimi


Here is the stack trace with  kafka_2.10-0.8.2.0 consumer API:

"Thread-16-kafka_bolt-executor[3 3]" #121 prio=5 os_prio=0
tid=0x7fd0100b1000 nid=0x32f0 runnable [0x7fcfef8f8000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
- locked <0xf97caf78> (a java.lang.Object)
at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
at
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
at
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73)
- locked <0xf994cbd8> (a java.lang.Object)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:104)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:104)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:104)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:103)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:103)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:102)
at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.Producer.send(Producer.scala:76)
- locked <0xc386faf0> (a java.lang.Object)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
   



And here is the one with 2.10 0.10.0.1

hread-16-kafka_bolt-executor[3 3]" #125 prio=5 os_prio=0
tid=0x7f5dd882 nid=0x5e59 runnable [0x7f5db50d2000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
- locked <0xff4e66a8> (a java.lang.Object)
at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
at
org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:57)
at

Consumer hangs when partition leader is power off

2016-09-30 Thread dimitri tombroff
Hello all;

I easily reproduce an annoying scenario, using Kafka
brokers kafka_2.10-0.8.2.0, but I believe it has nothing to do with the
brokersbut only with the consumer API.

Here is the problem: a producer is continuously writing using the sync
producer api  to a topic (2 partitions, replicated), on a three Kafka
cluster nodes.
I then power off the Kafka broker that is leader on at least one of the
partition.

What I observe is my Java client code is stuck for very long time (several
minutes). I tested the  kafka_2.10-0.8.2.0 and the latest
Right now I see no other option to kill restart my java application when I
detect that, but of course it's cumbersome.

I tried playing with the request timeout but without success.

Clearly the producer is stuck on the standard socket calls,  in a case
where the target node is unreachable, and of course did not cleanly closed
any socket,
but it seems to me that the producer should react on its own no ?
Thanks in advance for any advise,

Below are the stack traces. I can repeateadly generate them, it's always
stuck there.

Dimi


Here is the stack trace with  kafka_2.10-0.8.2.0 consumer API:

"Thread-16-kafka_bolt-executor[3 3]" #121 prio=5 os_prio=0
tid=0x7fd0100b1000 nid=0x32f0 runnable [0x7fcfef8f8000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
- locked <0xf97caf78> (a java.lang.Object)
at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
at kafka.network.BoundedByteBufferSend.writeTo(
BoundedByteBufferSend.scala:56)
at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
at kafka.network.BoundedByteBufferSend.writeCompletely(
BoundedByteBufferSend.scala:26)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
doSend(SyncProducer.scala:73)
- locked <0xf994cbd8> (a java.lang.Object)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$
mcV$sp$1.apply$mcV$sp(SyncProducer.scala:104)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$
mcV$sp$1.apply(SyncProducer.scala:104)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$
mcV$sp$1.apply(SyncProducer.scala:104)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(
SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(
SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(
SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:102)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$
DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$
dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$
dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.
apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.
apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(
HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.
foreach(TraversableLike.scala:771)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(
DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(
DefaultEventHandler.scala:72)
at kafka.producer.Producer.send(Producer.scala:76)
- locked <0xc386faf0> (a java.lang.Object)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
   



And here is the one with 2.10 0.10.0.1

hread-16-kafka_bolt-executor[3 3]" #125 prio=5 os_prio=0
tid=0x7f5dd882 nid=0x5e59 runnable [0x7f5db50d2000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
- locked <0xff4e66a8> (a java.lang.Object)
at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
at org.apache.kafka.common.network.ByteBufferSend.
writeTo(ByteBufferSend.java:57)
at