Re: Kafka Question

2022-05-03 Thread Robin Moffatt
Kafka itself includes Kafka Streams (
https://kafka.apache.org/31/documentation/streams/), so you can do this
processing in Kafka. There's a Filter transformation that would be a good
place to start:
https://kafka.apache.org/31/documentation/streams/developer-guide/dsl-api.html#stateless-transformations


-- 

Robin Moffatt | Principal Developer Advocate | ro...@confluent.io | @rmoff


On Tue, 3 May 2022 at 03:09, Liam Clarke-Hutchinson 
wrote:

> Hi Emily,
>
> Nope, Kafka doesn't have that capability built in, it's just a distributed
> log that's great for streaming events. However, you can easily write a
> program that consumes those events from Kafka and then does what you want
> :)
>
> Cheers,
>
> Liam
>
> On Tue, 3 May 2022 at 06:30, Emily Schepisi
>  wrote:
>
> > Hello,
> >
> > I have a question about Kafka. If I put an upper and lower control limit
> on
> > the data, and the log records an event where the upper or lower control
> > limit is breached, will Kafka be able to send a notification via email or
> > text message to the user?
> >
> > Example: I'm tracking the daily temperature and set the upper control
> limit
> > at 80 degrees and the lower control limit at 50 degrees. The event log on
> > Kafka recorded the temperature on Monday at 90 degrees, so it's higher
> than
> > the upper control limit. Does Kafka have the capability to send a text
> > message or email to let me know that the temperature is outside of the
> > control limit?
> >
> > Thank you,
> >
> > Emily
> >
>


Re: Kafka Question

2022-05-02 Thread Liam Clarke-Hutchinson
Hi Emily,

Nope, Kafka doesn't have that capability built in, it's just a distributed
log that's great for streaming events. However, you can easily write a
program that consumes those events from Kafka and then does what you want
:)

Cheers,

Liam

On Tue, 3 May 2022 at 06:30, Emily Schepisi
 wrote:

> Hello,
>
> I have a question about Kafka. If I put an upper and lower control limit on
> the data, and the log records an event where the upper or lower control
> limit is breached, will Kafka be able to send a notification via email or
> text message to the user?
>
> Example: I'm tracking the daily temperature and set the upper control limit
> at 80 degrees and the lower control limit at 50 degrees. The event log on
> Kafka recorded the temperature on Monday at 90 degrees, so it's higher than
> the upper control limit. Does Kafka have the capability to send a text
> message or email to let me know that the temperature is outside of the
> control limit?
>
> Thank you,
>
> Emily
>


Re: Kafka question on Stream Processing

2019-04-24 Thread Bruno Cadonna
Hi Gagan,

If you want to read a message, you need to poll the message from the
broker. The brokers have only very limited notion of message content. They
only know that a message has a key, a value, and some metadata, but they
are not able to interpret the contents of those message components. The
clients are responsible to read and process the messages. For reading and
processing, the clients need to poll the messages from the brokers.

For the processing you want to do, you could use Kafka Streams. See
https://kafka.apache.org/documentation/streams/ for more information. Have
a look at the branch DSL operation there.

Best regards,
Bruno

On Wed, Apr 24, 2019 at 1:54 AM Gagan Sabharwal  wrote:

> Hi team,
>
> Say we have a client which has pushed a message to a topic. The message has
> a a simple structure
>
> Task - Time of task
> Send an email - 1530
>
> Now say that this message is consumed by a consumer subscribed to this
> topic.
> Since Topic already has a storage, what I intend to do is just read the
> message (not poll it) and see if it is before 1530 then send it to the tail
> of the partition of that topic. Does Kafka provide such an Api? Next time
> when the consumer reads the message and see if the current time is after
> 1530, it will poll the message and execute the task.
>
> Regards
> Gagan
>


Re: [kafka question] leader of partitions is none

2017-07-17 Thread Del Barrio, Alberto
Hi,
I had similar problems where new topics were assigned none, or some cases
-1 leader. In my case, was due to a very busy ZK cluster. Once I removed
load in the ZK cluster, I've restarted (rolling restart) kafka brokers and
everything went back to normal. However I was worried about restarting
them, I thought that if topics had no leader nor ISR the messages in there
could be lost when restarting, but I haven't lost any message.

Try to have a look to the information stored inside ZK: for example check
new topics appear there, leadership...



On 13 July 2017 at 10:57, Qin MingHui(大数据中心/外包) 
wrote:

> Hi,I need some help about kafka. Kafka is a very nice software , and we
> have been using it for a long time. But now ,I encountered a problem.
> When I create a topic on kafka server cluster, the leader of every
> partition is none . like this:
>
>
>
>
>
> The command is : /opt/apps_install/kafka_2.9.2-0.8.1/bin/kafka-topics.sh
> --zookeeper 10.16.10.94:2181/kafka-0.8.1 --create --topic
> meteor_hot_article3 --replica-assignment 4:7,5:8:2,6:9
>
>
>
> The zookeeper server cluster is normal and no kafka server is down,everything
> looks normal.
>
>
>
> Then I use this command : 
> /opt/apps_install/kafka_2.9.2-0.8.1/bin/kafka-topics.sh
> --zookeeper 10.16.10.94:2181/kafka-0.8.1 --create --topic topicName
> --partitions 10 --replication-factor 3
>
>
>
> The leader is none too. I am desperate and I don’t know what to do ,so
> please help me .
>
>
>
> My kafka version is 2.9.2-0.8.1. zookeeper version is 3.4.5.
>
>
>



-- 
News, jobs, product releases, events.
Follow 360dialog on LinkedIn 
 and Twitter .
Subscribe to our newsletter .


*Alberto del Barrio*DevOps Engineer





*Contact 360dialog*www.360dialog.com

i...@360dialog.com
+49-(0)30-6098-5953-0

360dialog GmbH, Saarbrücker Str. 36-38, 10405 Berlin, Germany
Managing director: Roland Siebert
Commercial register: Charlottenbug, HRB 144188 B
VAT ID: DE815382679


Re: Kafka question

2015-04-07 Thread Jack
That would be really useful. Thanks for your writing, Guozhang. I will give
it a shot and let you know.

On Tue, Apr 7, 2015 at 10:06 AM, Guozhang Wang  wrote:

> Jack,
>
> Okay I see your point now. I was originally thinking that in each run, you
> 1) first create the topic, 2) start producing to the topic, 3) start
> consuming from the topic, and then 4) delete the topic, stop producers /
> consumers before complete, but it sounds like you actually only create the
> topic once.
>
> If that is the case and you always use a different group id, then yes with
> the current consumer you have to make sure that at the boundary of each
> run, when you stop the consumers you also have to halt the producers from
> continue producing until the starting of the next run. In the new consumer
> that we are currently developing, it allows you to specify the starting
> offset for your consumption and you could then do some offset check
> pointing outside Kafka on the consumer side and use the check pointed
> offsets when you resume in each run.
>
> You can find the new consumer's API here (check position() / seek()
> specifically) and let me know if you think that will work for your case.
>
>
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 8:39 PM, Jack  wrote:
>
> > How about the first run then? If we use "largest" as "auto.offset.reset"
> > value, what value will these consumers get? I assume it will point to the
> > latest position in the log. Is that true? Just you know, we can't have a
> > warm up run so that the later runs can use the committed offset by that
> > run.
> >
> > To give you a little bit more context, for every run, we create a unique
> > group.id so essentially, we want the offset to point to a safe position
> so
> > that consumer won't miss any messages appended after that point. So is
> > there a way other than setting "auto.offset.reset" to "smallest" which we
> > know it works, but it took forever to get the data (since the log is
> long).
> >
> > Thanks again.
> >
> > -Jack
> >
> > On Mon, Apr 6, 2015 at 5:34 PM, Guozhang Wang 
> wrote:
> >
> > > Did you turn on automatic offset committing? If yes then this issue
> > should
> > > not happen as later runs will just consume data from the last committed
> > > offset.
> > >
> > > Guozhang
> > >
> > > On Mon, Apr 6, 2015 at 5:16 PM, Jack  wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > When I switched to auto.offset.reset to smallest, it will work.
> > However,
> > > it
> > > > will generate a lot of data and it will slow down the verification.
> > > >
> > > > Thanks,
> > > >
> > > > -Jack
> > > >
> > > > On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Jack,
> > > > >
> > > > > Could you just change "auto.offset.reset" to smallest and see if
> this
> > > > issue
> > > > > goes away? It is not related to the producer end.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Apr 6, 2015 at 4:14 PM, Jack  wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > Thanks so much for replying, first of all.
> > > > > >
> > > > > > Here is the config we have:
> > > > > >
> > > > > > group.id -> 'some unique id'
> > > > > > zookeeper.connect -> 'zookeeper host'
> > > > > > auto.commit.enabled -> false
> > > > > > 'auto.offset.reset' -> largest
> > > > > > consumer.timeout.ms -> -1
> > > > > > fetch.message.max.bytes -> 10M
> > > > > >
> > > > > > So it seems like we need to make sure the submitted future
> returns
> > > > before
> > > > > > performing action actions which eventually generate the message
> we
> > > > > expect.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > -Jack
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Jack,
> > > > > > >
> > > > > > > Your theory is correct if your consumer config set
> > > auto.offset.reset
> > > > to
> > > > > > > latest and you do not have any committed offsets before. Could
> > you
> > > > list
> > > > > > > your consumer configs and see if that is the case?
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack  wrote:
> > > > > > >
> > > > > > > > Hi folks,
> > > > > > > >
> > > > > > > > I have a quick question.
> > > > > > > >
> > > > > > > > We are using 0.8.1 and running into this weird problem. We
> are
> > > > using
> > > > > > > > HighLevelConsumer for this topic. We created 64 partitions
> for
> > > this
> > > > > > > > message.
> > > > > > > >
> > > > > > > > In our service, we first create a Consumer object as usual,
> and
> > > > then
> > > > > we
> > > > > > > > went ahead, calls 'createMessageStreans' with
> > > > Map('topic_name'->64).
> > > > > It
> > > > > > > > returns us a Se[KafkaStream], For each stream object in the
> > > > sequence,
> > > > > > we
> > > > > > > > submit a task like the foll

Re: Kafka question

2015-04-07 Thread Guozhang Wang
Jack,

Okay I see your point now. I was originally thinking that in each run, you
1) first create the topic, 2) start producing to the topic, 3) start
consuming from the topic, and then 4) delete the topic, stop producers /
consumers before complete, but it sounds like you actually only create the
topic once.

If that is the case and you always use a different group id, then yes with
the current consumer you have to make sure that at the boundary of each
run, when you stop the consumers you also have to halt the producers from
continue producing until the starting of the next run. In the new consumer
that we are currently developing, it allows you to specify the starting
offset for your consumption and you could then do some offset check
pointing outside Kafka on the consumer side and use the check pointed
offsets when you resume in each run.

You can find the new consumer's API here (check position() / seek()
specifically) and let me know if you think that will work for your case.

http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Guozhang

On Mon, Apr 6, 2015 at 8:39 PM, Jack  wrote:

> How about the first run then? If we use "largest" as "auto.offset.reset"
> value, what value will these consumers get? I assume it will point to the
> latest position in the log. Is that true? Just you know, we can't have a
> warm up run so that the later runs can use the committed offset by that
> run.
>
> To give you a little bit more context, for every run, we create a unique
> group.id so essentially, we want the offset to point to a safe position so
> that consumer won't miss any messages appended after that point. So is
> there a way other than setting "auto.offset.reset" to "smallest" which we
> know it works, but it took forever to get the data (since the log is long).
>
> Thanks again.
>
> -Jack
>
> On Mon, Apr 6, 2015 at 5:34 PM, Guozhang Wang  wrote:
>
> > Did you turn on automatic offset committing? If yes then this issue
> should
> > not happen as later runs will just consume data from the last committed
> > offset.
> >
> > Guozhang
> >
> > On Mon, Apr 6, 2015 at 5:16 PM, Jack  wrote:
> >
> > > Hi Guozhang,
> > >
> > > When I switched to auto.offset.reset to smallest, it will work.
> However,
> > it
> > > will generate a lot of data and it will slow down the verification.
> > >
> > > Thanks,
> > >
> > > -Jack
> > >
> > > On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Jack,
> > > >
> > > > Could you just change "auto.offset.reset" to smallest and see if this
> > > issue
> > > > goes away? It is not related to the producer end.
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Apr 6, 2015 at 4:14 PM, Jack  wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > Thanks so much for replying, first of all.
> > > > >
> > > > > Here is the config we have:
> > > > >
> > > > > group.id -> 'some unique id'
> > > > > zookeeper.connect -> 'zookeeper host'
> > > > > auto.commit.enabled -> false
> > > > > 'auto.offset.reset' -> largest
> > > > > consumer.timeout.ms -> -1
> > > > > fetch.message.max.bytes -> 10M
> > > > >
> > > > > So it seems like we need to make sure the submitted future returns
> > > before
> > > > > performing action actions which eventually generate the message we
> > > > expect.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > -Jack
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang 
> > > > wrote:
> > > > >
> > > > > > Jack,
> > > > > >
> > > > > > Your theory is correct if your consumer config set
> > auto.offset.reset
> > > to
> > > > > > latest and you do not have any committed offsets before. Could
> you
> > > list
> > > > > > your consumer configs and see if that is the case?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack  wrote:
> > > > > >
> > > > > > > Hi folks,
> > > > > > >
> > > > > > > I have a quick question.
> > > > > > >
> > > > > > > We are using 0.8.1 and running into this weird problem. We are
> > > using
> > > > > > > HighLevelConsumer for this topic. We created 64 partitions for
> > this
> > > > > > > message.
> > > > > > >
> > > > > > > In our service, we first create a Consumer object as usual, and
> > > then
> > > > we
> > > > > > > went ahead, calls 'createMessageStreans' with
> > > Map('topic_name'->64).
> > > > It
> > > > > > > returns us a Se[KafkaStream], For each stream object in the
> > > sequence,
> > > > > we
> > > > > > > submit a task like the following to the pool.
> > > > > > >
> > > > > > > threadpool.submit(new Runnable {
> > > > > > >override def run() = {
> > > > > > >   stream.iterator().foreach { msg => ...}
> > > > > > >   }
> > > > > > > }
> > > > > > >
> > > > > > > The problem we ran into is that after all the above
> established,
> > > any
> > > > > > > message showing up in kafka, we should be able to get it from
> > > > consumer
> > > > > > > side. But in reality, for some reason, occasion

Re: Kafka question

2015-04-06 Thread Jack
How about the first run then? If we use "largest" as "auto.offset.reset"
value, what value will these consumers get? I assume it will point to the
latest position in the log. Is that true? Just you know, we can't have a
warm up run so that the later runs can use the committed offset by that run.

To give you a little bit more context, for every run, we create a unique
group.id so essentially, we want the offset to point to a safe position so
that consumer won't miss any messages appended after that point. So is
there a way other than setting "auto.offset.reset" to "smallest" which we
know it works, but it took forever to get the data (since the log is long).

Thanks again.

-Jack

On Mon, Apr 6, 2015 at 5:34 PM, Guozhang Wang  wrote:

> Did you turn on automatic offset committing? If yes then this issue should
> not happen as later runs will just consume data from the last committed
> offset.
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 5:16 PM, Jack  wrote:
>
> > Hi Guozhang,
> >
> > When I switched to auto.offset.reset to smallest, it will work. However,
> it
> > will generate a lot of data and it will slow down the verification.
> >
> > Thanks,
> >
> > -Jack
> >
> > On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang 
> wrote:
> >
> > > Jack,
> > >
> > > Could you just change "auto.offset.reset" to smallest and see if this
> > issue
> > > goes away? It is not related to the producer end.
> > >
> > > Guozhang
> > >
> > > On Mon, Apr 6, 2015 at 4:14 PM, Jack  wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Thanks so much for replying, first of all.
> > > >
> > > > Here is the config we have:
> > > >
> > > > group.id -> 'some unique id'
> > > > zookeeper.connect -> 'zookeeper host'
> > > > auto.commit.enabled -> false
> > > > 'auto.offset.reset' -> largest
> > > > consumer.timeout.ms -> -1
> > > > fetch.message.max.bytes -> 10M
> > > >
> > > > So it seems like we need to make sure the submitted future returns
> > before
> > > > performing action actions which eventually generate the message we
> > > expect.
> > > >
> > > > Cheers,
> > > >
> > > > -Jack
> > > >
> > > >
> > > >
> > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Jack,
> > > > >
> > > > > Your theory is correct if your consumer config set
> auto.offset.reset
> > to
> > > > > latest and you do not have any committed offsets before. Could you
> > list
> > > > > your consumer configs and see if that is the case?
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack  wrote:
> > > > >
> > > > > > Hi folks,
> > > > > >
> > > > > > I have a quick question.
> > > > > >
> > > > > > We are using 0.8.1 and running into this weird problem. We are
> > using
> > > > > > HighLevelConsumer for this topic. We created 64 partitions for
> this
> > > > > > message.
> > > > > >
> > > > > > In our service, we first create a Consumer object as usual, and
> > then
> > > we
> > > > > > went ahead, calls 'createMessageStreans' with
> > Map('topic_name'->64).
> > > It
> > > > > > returns us a Se[KafkaStream], For each stream object in the
> > sequence,
> > > > we
> > > > > > submit a task like the following to the pool.
> > > > > >
> > > > > > threadpool.submit(new Runnable {
> > > > > >override def run() = {
> > > > > >   stream.iterator().foreach { msg => ...}
> > > > > >   }
> > > > > > }
> > > > > >
> > > > > > The problem we ran into is that after all the above established,
> > any
> > > > > > message showing up in kafka, we should be able to get it from
> > > consumer
> > > > > > side. But in reality, for some reason, occasionally, we don't see
> > > these
> > > > > > message (we do see these message in the log though).
> > > > > >
> > > > > >  Some team members believe that the stream might get a later
> > offset,
> > > > thus
> > > > > > not being able to see the earlier messages.
> > > > > >
> > > > > > I really doubt that statement and want to see if anyone could
> shed
> > > any
> > > > > > light upon this?
> > > > > >
> > > > > > One possible theory from me is that the offset won't be given
> until
> > > > > > stream.iterator().next is called, but since the task submission
> is
> > > > > > asynchronous (we don't wait for each submission and then produce
> > > > message
> > > > > to
> > > > > > kafka), that could get us a later offset, which might not
> contains
> > > the
> > > > > > message we want). One possible solution to that is perform any
> > action
> > > > > which
> > > > > > produce messages to kafka, after all these submitted tasks
> returns.
> > > > > >
> > > > > > Any thoughts?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > -Jack
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka question

2015-04-06 Thread Guozhang Wang
Did you turn on automatic offset committing? If yes then this issue should
not happen as later runs will just consume data from the last committed
offset.

Guozhang

On Mon, Apr 6, 2015 at 5:16 PM, Jack  wrote:

> Hi Guozhang,
>
> When I switched to auto.offset.reset to smallest, it will work. However, it
> will generate a lot of data and it will slow down the verification.
>
> Thanks,
>
> -Jack
>
> On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang  wrote:
>
> > Jack,
> >
> > Could you just change "auto.offset.reset" to smallest and see if this
> issue
> > goes away? It is not related to the producer end.
> >
> > Guozhang
> >
> > On Mon, Apr 6, 2015 at 4:14 PM, Jack  wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thanks so much for replying, first of all.
> > >
> > > Here is the config we have:
> > >
> > > group.id -> 'some unique id'
> > > zookeeper.connect -> 'zookeeper host'
> > > auto.commit.enabled -> false
> > > 'auto.offset.reset' -> largest
> > > consumer.timeout.ms -> -1
> > > fetch.message.max.bytes -> 10M
> > >
> > > So it seems like we need to make sure the submitted future returns
> before
> > > performing action actions which eventually generate the message we
> > expect.
> > >
> > > Cheers,
> > >
> > > -Jack
> > >
> > >
> > >
> > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Jack,
> > > >
> > > > Your theory is correct if your consumer config set auto.offset.reset
> to
> > > > latest and you do not have any committed offsets before. Could you
> list
> > > > your consumer configs and see if that is the case?
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack  wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I have a quick question.
> > > > >
> > > > > We are using 0.8.1 and running into this weird problem. We are
> using
> > > > > HighLevelConsumer for this topic. We created 64 partitions for this
> > > > > message.
> > > > >
> > > > > In our service, we first create a Consumer object as usual, and
> then
> > we
> > > > > went ahead, calls 'createMessageStreans' with
> Map('topic_name'->64).
> > It
> > > > > returns us a Se[KafkaStream], For each stream object in the
> sequence,
> > > we
> > > > > submit a task like the following to the pool.
> > > > >
> > > > > threadpool.submit(new Runnable {
> > > > >override def run() = {
> > > > >   stream.iterator().foreach { msg => ...}
> > > > >   }
> > > > > }
> > > > >
> > > > > The problem we ran into is that after all the above established,
> any
> > > > > message showing up in kafka, we should be able to get it from
> > consumer
> > > > > side. But in reality, for some reason, occasionally, we don't see
> > these
> > > > > message (we do see these message in the log though).
> > > > >
> > > > >  Some team members believe that the stream might get a later
> offset,
> > > thus
> > > > > not being able to see the earlier messages.
> > > > >
> > > > > I really doubt that statement and want to see if anyone could shed
> > any
> > > > > light upon this?
> > > > >
> > > > > One possible theory from me is that the offset won't be given until
> > > > > stream.iterator().next is called, but since the task submission is
> > > > > asynchronous (we don't wait for each submission and then produce
> > > message
> > > > to
> > > > > kafka), that could get us a later offset, which might not contains
> > the
> > > > > message we want). One possible solution to that is perform any
> action
> > > > which
> > > > > produce messages to kafka, after all these submitted tasks returns.
> > > > >
> > > > > Any thoughts?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > -Jack
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: Kafka question

2015-04-06 Thread Jack
Hi Guozhang,

When I switched to auto.offset.reset to smallest, it will work. However, it
will generate a lot of data and it will slow down the verification.

Thanks,

-Jack

On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang  wrote:

> Jack,
>
> Could you just change "auto.offset.reset" to smallest and see if this issue
> goes away? It is not related to the producer end.
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 4:14 PM, Jack  wrote:
>
> > Hi Guozhang,
> >
> > Thanks so much for replying, first of all.
> >
> > Here is the config we have:
> >
> > group.id -> 'some unique id'
> > zookeeper.connect -> 'zookeeper host'
> > auto.commit.enabled -> false
> > 'auto.offset.reset' -> largest
> > consumer.timeout.ms -> -1
> > fetch.message.max.bytes -> 10M
> >
> > So it seems like we need to make sure the submitted future returns before
> > performing action actions which eventually generate the message we
> expect.
> >
> > Cheers,
> >
> > -Jack
> >
> >
> >
> > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang 
> wrote:
> >
> > > Jack,
> > >
> > > Your theory is correct if your consumer config set auto.offset.reset to
> > > latest and you do not have any committed offsets before. Could you list
> > > your consumer configs and see if that is the case?
> > >
> > > Guozhang
> > >
> > > On Mon, Apr 6, 2015 at 3:15 PM, Jack  wrote:
> > >
> > > > Hi folks,
> > > >
> > > > I have a quick question.
> > > >
> > > > We are using 0.8.1 and running into this weird problem. We are using
> > > > HighLevelConsumer for this topic. We created 64 partitions for this
> > > > message.
> > > >
> > > > In our service, we first create a Consumer object as usual, and then
> we
> > > > went ahead, calls 'createMessageStreans' with Map('topic_name'->64).
> It
> > > > returns us a Se[KafkaStream], For each stream object in the sequence,
> > we
> > > > submit a task like the following to the pool.
> > > >
> > > > threadpool.submit(new Runnable {
> > > >override def run() = {
> > > >   stream.iterator().foreach { msg => ...}
> > > >   }
> > > > }
> > > >
> > > > The problem we ran into is that after all the above established, any
> > > > message showing up in kafka, we should be able to get it from
> consumer
> > > > side. But in reality, for some reason, occasionally, we don't see
> these
> > > > message (we do see these message in the log though).
> > > >
> > > >  Some team members believe that the stream might get a later offset,
> > thus
> > > > not being able to see the earlier messages.
> > > >
> > > > I really doubt that statement and want to see if anyone could shed
> any
> > > > light upon this?
> > > >
> > > > One possible theory from me is that the offset won't be given until
> > > > stream.iterator().next is called, but since the task submission is
> > > > asynchronous (we don't wait for each submission and then produce
> > message
> > > to
> > > > kafka), that could get us a later offset, which might not contains
> the
> > > > message we want). One possible solution to that is perform any action
> > > which
> > > > produce messages to kafka, after all these submitted tasks returns.
> > > >
> > > > Any thoughts?
> > > >
> > > > Thanks,
> > > >
> > > > -Jack
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka question

2015-04-06 Thread Guozhang Wang
Jack,

Could you just change "auto.offset.reset" to smallest and see if this issue
goes away? It is not related to the producer end.

Guozhang

On Mon, Apr 6, 2015 at 4:14 PM, Jack  wrote:

> Hi Guozhang,
>
> Thanks so much for replying, first of all.
>
> Here is the config we have:
>
> group.id -> 'some unique id'
> zookeeper.connect -> 'zookeeper host'
> auto.commit.enabled -> false
> 'auto.offset.reset' -> largest
> consumer.timeout.ms -> -1
> fetch.message.max.bytes -> 10M
>
> So it seems like we need to make sure the submitted future returns before
> performing action actions which eventually generate the message we expect.
>
> Cheers,
>
> -Jack
>
>
>
> On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang  wrote:
>
> > Jack,
> >
> > Your theory is correct if your consumer config set auto.offset.reset to
> > latest and you do not have any committed offsets before. Could you list
> > your consumer configs and see if that is the case?
> >
> > Guozhang
> >
> > On Mon, Apr 6, 2015 at 3:15 PM, Jack  wrote:
> >
> > > Hi folks,
> > >
> > > I have a quick question.
> > >
> > > We are using 0.8.1 and running into this weird problem. We are using
> > > HighLevelConsumer for this topic. We created 64 partitions for this
> > > message.
> > >
> > > In our service, we first create a Consumer object as usual, and then we
> > > went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It
> > > returns us a Se[KafkaStream], For each stream object in the sequence,
> we
> > > submit a task like the following to the pool.
> > >
> > > threadpool.submit(new Runnable {
> > >override def run() = {
> > >   stream.iterator().foreach { msg => ...}
> > >   }
> > > }
> > >
> > > The problem we ran into is that after all the above established, any
> > > message showing up in kafka, we should be able to get it from consumer
> > > side. But in reality, for some reason, occasionally, we don't see these
> > > message (we do see these message in the log though).
> > >
> > >  Some team members believe that the stream might get a later offset,
> thus
> > > not being able to see the earlier messages.
> > >
> > > I really doubt that statement and want to see if anyone could shed any
> > > light upon this?
> > >
> > > One possible theory from me is that the offset won't be given until
> > > stream.iterator().next is called, but since the task submission is
> > > asynchronous (we don't wait for each submission and then produce
> message
> > to
> > > kafka), that could get us a later offset, which might not contains the
> > > message we want). One possible solution to that is perform any action
> > which
> > > produce messages to kafka, after all these submitted tasks returns.
> > >
> > > Any thoughts?
> > >
> > > Thanks,
> > >
> > > -Jack
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: Kafka question

2015-04-06 Thread Jack
Hi Guozhang,

Thanks so much for replying, first of all.

Here is the config we have:

group.id -> 'some unique id'
zookeeper.connect -> 'zookeeper host'
auto.commit.enabled -> false
'auto.offset.reset' -> largest
consumer.timeout.ms -> -1
fetch.message.max.bytes -> 10M

So it seems like we need to make sure the submitted future returns before
performing action actions which eventually generate the message we expect.

Cheers,

-Jack



On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang  wrote:

> Jack,
>
> Your theory is correct if your consumer config set auto.offset.reset to
> latest and you do not have any committed offsets before. Could you list
> your consumer configs and see if that is the case?
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 3:15 PM, Jack  wrote:
>
> > Hi folks,
> >
> > I have a quick question.
> >
> > We are using 0.8.1 and running into this weird problem. We are using
> > HighLevelConsumer for this topic. We created 64 partitions for this
> > message.
> >
> > In our service, we first create a Consumer object as usual, and then we
> > went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It
> > returns us a Se[KafkaStream], For each stream object in the sequence, we
> > submit a task like the following to the pool.
> >
> > threadpool.submit(new Runnable {
> >override def run() = {
> >   stream.iterator().foreach { msg => ...}
> >   }
> > }
> >
> > The problem we ran into is that after all the above established, any
> > message showing up in kafka, we should be able to get it from consumer
> > side. But in reality, for some reason, occasionally, we don't see these
> > message (we do see these message in the log though).
> >
> >  Some team members believe that the stream might get a later offset, thus
> > not being able to see the earlier messages.
> >
> > I really doubt that statement and want to see if anyone could shed any
> > light upon this?
> >
> > One possible theory from me is that the offset won't be given until
> > stream.iterator().next is called, but since the task submission is
> > asynchronous (we don't wait for each submission and then produce message
> to
> > kafka), that could get us a later offset, which might not contains the
> > message we want). One possible solution to that is perform any action
> which
> > produce messages to kafka, after all these submitted tasks returns.
> >
> > Any thoughts?
> >
> > Thanks,
> >
> > -Jack
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka question

2015-04-06 Thread Guozhang Wang
Jack,

Your theory is correct if your consumer config set auto.offset.reset to
latest and you do not have any committed offsets before. Could you list
your consumer configs and see if that is the case?

Guozhang

On Mon, Apr 6, 2015 at 3:15 PM, Jack  wrote:

> Hi folks,
>
> I have a quick question.
>
> We are using 0.8.1 and running into this weird problem. We are using
> HighLevelConsumer for this topic. We created 64 partitions for this
> message.
>
> In our service, we first create a Consumer object as usual, and then we
> went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It
> returns us a Se[KafkaStream], For each stream object in the sequence, we
> submit a task like the following to the pool.
>
> threadpool.submit(new Runnable {
>override def run() = {
>   stream.iterator().foreach { msg => ...}
>   }
> }
>
> The problem we ran into is that after all the above established, any
> message showing up in kafka, we should be able to get it from consumer
> side. But in reality, for some reason, occasionally, we don't see these
> message (we do see these message in the log though).
>
>  Some team members believe that the stream might get a later offset, thus
> not being able to see the earlier messages.
>
> I really doubt that statement and want to see if anyone could shed any
> light upon this?
>
> One possible theory from me is that the offset won't be given until
> stream.iterator().next is called, but since the task submission is
> asynchronous (we don't wait for each submission and then produce message to
> kafka), that could get us a later offset, which might not contains the
> message we want). One possible solution to that is perform any action which
> produce messages to kafka, after all these submitted tasks returns.
>
> Any thoughts?
>
> Thanks,
>
> -Jack
>



-- 
-- Guozhang