Re: Understanding out of order message processing w/ Streaming

2016-10-13 Thread Michael Noll
> But if they arrive out of order, I have to detect / process that myself in
> the processor logic.

Yes -- if your processing logic depends on the specific ordering of
messages (which is the case for you), then you must manually implement this
ordering-specific logic at the moment.

Other use cases may not need to do that and "just work" even with
out-of-order data.  If, for example, you are counting objects or are
computing the sum of numbers, then you do not need to anything special.





On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar  wrote:

> Thanks Matthias.
>
> So, if I'm understanding this right, Kafka will not discard which messages
> which arrive out of order.
>
> What it will do is show messages in the order in which they arrive.
>
> But if they arrive out of order, I have to detect / process that myself in
> the processor logic.
>
> Is that correct?
>
> Thanks.
>
> On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax 
> wrote:
>
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > Last question first: A KTable is basically in finite window over the
> > whole stream providing a single result (that gets updated when new
> > data arrives). If you use windows, you cut the overall stream into
> > finite subsets and get a result per window. Thus, I guess you do not
> > need windows (if I understood you use case correctly).
> >
> > However, current state of Kafka Streams DSL, you will not be able to
> > use KTable (directly -- see suggestion to fix this below) because is
> > does (currently) not allow to access the timestamp of the current
> > record (thus, you can not know if a record is late or not). You will
> > need to use Processor API which allows you to access the current
> > records timestamp via the Context object given in init()
> >
> > Your reasoning about partitions and Streams instances is correct.
> > However, the following two are not
> >
> > > - Because I'm using a KTable, the timestamp of the messages is
> > > extracted, and I'm not shown the older bid because I've already
> > > processed the later bid. The older bid is ignored.
> >
> > and
> >
> > > - Because of this, the replica already knows which timestamps it
> > > has processed, and is able to ignore the older messages.
> >
> > Late arriving records are not dropped but processes regularly. Thus,
> > your KTable aggregate function will be called for the late arriving
> > record, too (but as described about, you have currently no way to know
> > it is a later record).
> >
> >
> > Last but not least, you last statement is a valid concern:
> >
> > > Also, what will happen if bid 2 arrived and got processed, and then
> > > the particular replica crashed, and was restarted. The restarted
> > > replica won't have any memory of which timestamps it has previously
> > > processed.
> > >
> > > So if bid 2 got processed, replica crashed and restarted, and then
> > > bid 1 arrived, what would happen in that case?
> >
> > In order to make this work, you would need to store the timestamp in
> > you store next to the actual data. Thus, you can compare the timestamp
> > of the latest result (safely stored in operator state) with the
> > timestamp of the current record.
> >
> > Does this makes sense?
> >
> > To fix you issue, you could add a .transformValue() before you KTable,
> > which allows you to access the timestamp of a record. If you add this
> > timestamp to you value and pass it to KTable afterwards, you can
> > access it and it gets also store reliably.
> >
> >  => transformValue =>  > timestamp} => aggregate
> >
> > Hope this helps.
> >
> > - -Matthias
> >
> >
> > On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > > P.S, does my scenario require using windows, or can it be achieved
> > > using just KTable?
> > >
> > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar 
> > > wrote:
> > >
> > >> Heya,
> > >>
> > >> Say I'm building a live auction site, with different products.
> > >> Different users will bid on different products. And each time
> > >> they do, I want to update the product's price, so it should
> > >> always have the latest price in place.
> > >>
> > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
> > >> the same product 100 ms later.
> > >>
> > >> The second bid arrives first and the price is updated to $5. Then
> > >> the first bid arrives. I want the price to not be updated in this
> > >> case, as this bid is older than the one I've already processed.
> > >>
> > >> Here's my understanding of how I can achieve this with Kafka
> > >> Streaming - is my understanding correct?
> > >>
> > >> - I have a topic for receiving bids. The topic has N partitions,
> > >> and I have N replicas of my application which hooks up w/ Kafka
> > >> Streaming, up and running.
> > >>
> > >> - I assume each replica of my app will listen to a different
> > >> partition of the topic.
> > >>
> > >> - A user makes a bid on product A.
> > >>
> > >> - This is pushed to the topic with the key bid_a
> > >>
> > >> - Another user makes a bid

Re: Consumer offsets reset for _all_ topics after increasing partitions for one topic

2016-10-13 Thread Juho Autio
Created a bug: https://issues.apache.org/jira/browse/KAFKA-4299

On Wed, Oct 5, 2016 at 12:34 PM, Juho Autio  wrote:

> Does anyone know about this? Altering topic partitions seems to reset
> consumer offsets.
>
> On Tue, Sep 27, 2016 at 1:28 PM, Juho Autio  wrote:
>
>> I increased partitions for one existing topic (2->10), but was surprised
>> to see that it entirely reset the committed offsets of my consumer group.
>>
>> All topics & partitions were reset to the earliest offset available, and
>> the consumer read everything again.
>>
>> Documentation doesn't mention anything like this. Is this how it's
>> supposed to work, or a bug?
>>
>> I would've expected the consumer offsets to not decrease at all,
>> especially for the topics that I didn't even touch.
>>
>> For the altered topic I would've expected that consuming the previously
>> existing partitions 0 and 1 would've continued from the position where they
>> were, and naturally starting to read the new added partitions from 0.
>>
>> I added partitions according to the "Modifying topics" section of Kafka
>> 0.10.0 Documentation:
>>
>> "To add partitions you can do
>>
>>  > bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic
>> altered_topic --partitions 10
>> "
>>
>> Previously this topic had 2 partitions.
>>
>> For the consumer I'm using:
>> kafka.javaapi.consumer.ConsumerConnector.createMessageStreamsByFilter()
>>
>> And version is:
>>
>> org.apache.kafka
>> kafka_2.11
>> 0.10.0.1
>>
>> Kafka cluster itself is kafka_2.11-0.10.0.1.
>>
>
>


-- 
*Juho Autio*
Senior Analytics Developer

Data Ops, Games
Rovio Entertainment Ltd
Mobile: + 358 (0)45 313 0122
juho.au...@rovio.com
www.rovio.com

*This message and its attachments may contain confidential information and
is intended solely for the attention and use of the named addressee(s). If
you are not the intended recipient and / or you have received this message
in error, please contact the sender immediately and delete all material you
have received in this message. You are hereby notified that any use of the
information, which you have received in error in whatsoever form, is
strictly prohibited. Thank you for your co-operation.*


AUTO: Yan Wang is out of the office (returning 10/17/2016)

2016-10-13 Thread Yan Wang


I am out of the office until 10/17/2016.




Note: This is an automated response to your message  "Re: Understanding out
of order message processing w/ Streaming" sent on 10/13/2016 2:42:16 AM.

This is the only notification you will receive while this person is away.
**

This email and any attachments may contain information that is confidential 
and/or privileged for the sole use of the intended recipient.  Any use, review, 
disclosure, copying, distribution or reliance by others, and any forwarding of 
this email or its contents, without the express permission of the sender is 
strictly prohibited by law.  If you are not the intended recipient, please 
contact the sender immediately, delete the e-mail and destroy all copies.
**


Re: Mirror Maker - Message Format Issue?

2016-10-13 Thread Ismael Juma
Hi Craig,

I think you may be running into:

https://issues.apache.org/jira/browse/KAFKA-4073

Ismael

On Thu, Oct 13, 2016 at 5:51 AM, Craig Swift <
craig.sw...@returnpath.com.invalid> wrote:

> Hello,
>
> Just to close this issue out. The 8 producer going to the 10 cluster was
> the root issue. The mirror maker by default was unable to produce the
> message to the destination cluster. The work around was to include a
> MirrorMakerMessageHandler
> that did nothing but repackage the message again. In the future it might be
> nice if the mirror handled this auto-magically, but at least the ability to
> alter the behavior provided an easy fix. Hope this helps someone else,
> thanks.
>
> Craig
>
> Hello,
> >
> > I think we're misunderstanding the docs on some level and I need a little
> > clarification. We have the following setup:
> >
> > 1) 0.8.2 producer -> writing to Kafka 0.10.0.1 cluster w/ version 10
> > message format (source cluster).
> > 2) 0.10.0.1 mirror using the 'new consumer' reading from the source
> > cluster and writing to Kafka 0.10.0.1 cluster w/version 0.8.2 message
> > format (destination cluster). We need some of the features like SSL,
> hence
> > using the new consumer.
> > 3) Lots of old 0.8.2 consumers reading from the destination cluster that
> > still need to be upgraded.
> >
> > We're seeing errors from the mirror maker when trying to produce to the
> > destination cluster like the following:
> >
> > java.lang.IllegalArgumentException: Invalid timestamp -1
> > at org.apache.kafka.clients.producer.ProducerRecord.
> > (ProducerRecord.java:60)
> >
> > Is the root problem the 0.8.2 producer sending data to the source cluster
> > or the new 10 mirror writing data to the destination cluster in 0.8.2
> > format? From the docs we were under the impression that the data would be
> > stored in the source cluster in 10 format regardless of the producer and
> > the mirror could produce to the destination cluster regardless of it's
> > message format setting.
> >
> > Is this current setup non-functional or is there a way to make this work?
> > For example, if the mirror producing is the issue could we implement a
> > custom MirrorMakerMessageHandler? Any advice and clarification would be
> > helpful, thanks.
> >
> > Craig
> >
>


回复:Re: Re: I found kafka lsot message

2016-10-13 Thread yangyuqi
Other question:   if account of consumer over broker number, whether loss data? 
   if some broker rebooted, whether loss data? 
Thanks.




- 原始邮件 -
发件人:Guozhang Wang 
收件人:yangy...@sina.com
抄送人:users 
主题:Re: Re: I found kafka lsot message
日期:2016年10月12日 07点05分

One common issue of lost messages is that consumer auto-committing (related 
config is "auto.commit.enabled", "commit.interval.ms"): from the Kafka consumer 
point of view, once the messages are returned from the "poll" call they are 
considered "consumed", and if committing offsets is called it will set the 
offset to the last message's offset. So if you have some exceptions / errors in 
the middle of processing some of your consumed records, upon restarting they 
will not be re-fetched again, hence possibly causing "data loss" on your end.

Guozhang
On Mon, Oct 10, 2016 at 3:33 AM,   wrote:
Hi Guozhang,
At first, thank you answer my question, and give me some suggest.But, I'm sure 
I readed some introduction about kafka.
In my producer, My Code is( c code):res = rd_kafka_conf_set(kafka_conf, 
"request.required.acks", "-1", NULL, 0);res = rd_kafka_topic_conf_set( 
topic_conf, "produce.offset.report", "true", errstr, sizeof(errstr) );
In my consumer, My Code is(kafka-python):
self.__consumer = KafkaConsumer( 
bootstrap_servers=self.__kafka_config["hosts"], 
   group_id=self.__kafka_config["group"],   
 auto_offset_reset="earliest",  
  )
self.__consumer.subscribe([self.__kafka_config["data_topic"]])
for message in self.__consumer:

I think these codes is common, What's your suggest about these codes?
In the end, I must explain: Lost message is not often, some time, couple days 
can find one or two lost messages.But some day, maybe can find over 20 messages 
were losted.Our message over 1,200,000 in one day.  
So, do your have any suggest?Bye the way, can you speak Chinese? Thank you very 
much & Best wishesJerry




- 原始邮件 -
发件人:Guozhang Wang 
收件人:"users@kafka.apache.org" , yangy...@sina.com
主题:Re: I found kafka lsot message
日期:2016年10月10日 01点25分

Jerry,
Message lost scenarios usually are due to producer and consumer mis-configured. 
Have you read about the client configs web docs?
http://kafka.apache.org/documentation#producerconfigs

http://kafka.apache.org/documentation#newconsumerconfigs


If not I'd suggest you reading those first and see if you can tune some of 
these configs to have better delivery guarantees.
Guozhang

On Fri, Oct 7, 2016 at 9:48 PM,   wrote:
Hello every body,I build a kafka cluster(include 5 domains) use 
kafka_2.11-0.10.0.0 and kafka-python-1.3.1.I create a topic by 100 partitions 
and 2 replicate, then use 20 consumers to receive message.

But, I found sometime the kafka lost message! I found some partition's offsite 
lost at consumer.After, I make a global index for every message from producer 
for confirm this problem, and I also found the global index had been break!

Why the kafka lost message?  What can I do to fix the problem?

Thanks!Jerry









-- 
-- Guozhang





-- 
-- Guozhang





Call to poll () hangs when invalid port number is given in bootstrap.servers

2016-10-13 Thread Anirudh P
Hi,

We have run into a problem where the call to poll () hangs when an
incorrect port number is specified in the bootstrap.servers property. Kafka
version is 0.9.0.0. We have tried using both assign and subscribe.

The poll problem seems to have been discussed in quite a few threads but we
were wondering if it was something that we are doing wrong or if it is a
known problem for which there is a fix.

Thank you,
Anirudh


Re: Force producer topic metadata refresh.

2016-10-13 Thread Alexandru Ionita
Hello Ismael,

yes, I went through that PR and this is what I was actually looking for.
Thanks!

2016-10-12 14:37 GMT+02:00 Ismael Juma :

> Hi Alexandru,
>
> I think your issue will be fixed by KAFKA-4254. There's a PR available and
> should be merged shortly. Can you please verify?
>
> Thanks,
> Ismael
>
> On Wed, Oct 12, 2016 at 11:00 AM, Alexandru Ionita <
> alexandru.ion...@gmail.com> wrote:
>
> > OK. then my question is: why is not the producer trying to recover from
> > this error by updating its topic metadata right away instead of waiting
> for
> > the "metadata.max.age.ms" to expire?
> >
> > 2016-10-12 11:43 GMT+02:00 Manikumar :
> >
> > > we have similar setting "metadata.max.age.ms" in new producer api.
> > > Its default value is 300sec.
> > >
> > > On Wed, Oct 12, 2016 at 3:04 PM, Alexandru Ionita <
> > > alexandru.ion...@gmail.com> wrote:
> > >
> > > > Hello kafka users!!
> > > >
> > > > I'm trying implement/use a mechanism to make a Kafka producer
> > > imperatively
> > > > update its topic metadata for a particular topic.
> > > >
> > > > Here is the use case:
> > > >
> > > > we are adding partitions on topics programmatically because we want
> to
> > > very
> > > > strictly control how messages are published to particular partitions.
> > > >
> > > > We are using AdminUtils.addPartitions to achieve this.
> > > > We then store the ID of the newly added partition in Zookeeper so
> that
> > we
> > > > persist a mapping to a partition ID for our particular domain key.
> > > >
> > > > The problem we are facing right now is that the Kafka producer won't
> > > > refresh its topic metadata until after a while, preventing the
> producer
> > > > from posting to those partitions by throwing an error :
> > > >
> > > > Caused by: java.lang.IllegalArgumentException: Invalid partition
> given
> > > > with
> > > > record: 56 is not in the range [0...55].
> > > > at
> > > > org.apache.kafka.clients.producer.KafkaProducer.
> > > > partition(KafkaProducer.java:717)
> > > > ~[kafka-clients-0.10.0.1.jar:na]
> > > > at
> > > > org.apache.kafka.clients.producer.KafkaProducer.doSend(
> > > > KafkaProducer.java:459)
> > > > ~[kafka-clients-0.10.0.1.jar:na]
> > > > at
> > > > org.apache.kafka.clients.producer.KafkaProducer.send(
> > > > KafkaProducer.java:430)
> > > > ~[kafka-clients-0.10.0.1.jar:na]
> > > > at
> > > > org.apache.kafka.clients.producer.KafkaProducer.send(
> > > > KafkaProducer.java:353)
> > > > ~[kafka-clients-0.10.0.1.jar:na]
> > > >
> > > > As I somewhere read (https://github.com/SOHU-Co/
> kafka-node/issues/175
> > ),
> > > > the
> > > > producer should try to recover from such error by pulling the latest
> > > > version of the topic metadata.
> > > >
> > > > This doesn't happening and I will keep getting those errors for like
> 60
> > > > seconds until the producer eventually will be able to publish to that
> > > > partition.
> > > >
> > > > In the previous version of kafka (0.8) there was a producer setting
> > > called
> > > > topic.metadata.refresh.interval.ms that was aimed to make the
> producer
> > > > pull
> > > > that information. This is what I found related to that setting in the
> > 0.8
> > > > documentation: "The producer generally refreshes the topic metadata
> > from
> > > > brokers when there is a failure (partition missing, leader not
> > > > available...)
> > > > "
> > > >
> > > > Any ideas and comments on this are much appreciated.
> > > > Thanks
> > > >
> > >
> >
>


Kafka 0.10 stores offsets in ZooKeeper instead of a Kafka topic

2016-10-13 Thread Samy Dindane

Hi,

I use Kafka 0.10 with ZK 3.4.6 and my consumers' offsets aren't stored in the 
__consumer_offsets topic but in ZK instead.

That happens whether I let the consumer commit automatically, or commit 
manually with enable.auto.commit set to false.
Same behavior with `offsets.storage=kafka`, which isn't surprising as this 
configuration value is dropped in 0.10.

`kafka-console-consumer --topic __consumer_offsets --zookeeper 
localhost:/kafka-exp --bootstrap-server localhost:9092` shows nothing while my 
program is committing offsets.
Not sure it matters, but I consume the topic using a Spark 2.0.0 app.

Is there anything specific I should do to store consumers' offsets in a Kafka 
topic instead of ZooKeeper?

Thank you for you help!

Samy


Re: Kafka 0.10 stores offsets in ZooKeeper instead of a Kafka topic

2016-10-13 Thread Ben Davison
I *think* Spark 2.0.0 has a Kafka 0.8 consumer, which would still use the
old Zookeeper method.

The use the new consumer offsets the consumer needs to be atleast Kafka 0.9
compatible.

On Thu, Oct 13, 2016 at 1:55 PM, Samy Dindane  wrote:

> Hi,
>
> I use Kafka 0.10 with ZK 3.4.6 and my consumers' offsets aren't stored in
> the __consumer_offsets topic but in ZK instead.
>
> That happens whether I let the consumer commit automatically, or commit
> manually with enable.auto.commit set to false.
> Same behavior with `offsets.storage=kafka`, which isn't surprising as this
> configuration value is dropped in 0.10.
>
> `kafka-console-consumer --topic __consumer_offsets --zookeeper
> localhost:/kafka-exp --bootstrap-server localhost:9092` shows nothing while
> my program is committing offsets.
> Not sure it matters, but I consume the topic using a Spark 2.0.0 app.
>
> Is there anything specific I should do to store consumers' offsets in a
> Kafka topic instead of ZooKeeper?
>
> Thank you for you help!
>
> Samy
>

-- 


This email, including attachments, is private and confidential. If you have 
received this email in error please notify the sender and delete it from 
your system. Emails are not secure and may contain viruses. No liability 
can be accepted for viruses that might be transferred by this email or any 
attachment. Any unauthorised copying of this message or unauthorised 
distribution and publication of the information contained herein are 
prohibited.

7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
Registered in England and Wales. Registered No. 04843573.


Re: Kafka 0.10 stores offsets in ZooKeeper instead of a Kafka topic

2016-10-13 Thread Vasilij Syc
Spark 2.0 has experemental support of kafka 10.0 and you have to explicitly
define this in your build e.g. spark-streaming-kafka-0-10

On 13 Oct 2016 16:10, "Ben Davison"  wrote:

> I *think* Spark 2.0.0 has a Kafka 0.8 consumer, which would still use the
> old Zookeeper method.
>
> The use the new consumer offsets the consumer needs to be atleast Kafka 0.9
> compatible.
>
> On Thu, Oct 13, 2016 at 1:55 PM, Samy Dindane  wrote:
>
> > Hi,
> >
> > I use Kafka 0.10 with ZK 3.4.6 and my consumers' offsets aren't stored in
> > the __consumer_offsets topic but in ZK instead.
> >
> > That happens whether I let the consumer commit automatically, or commit
> > manually with enable.auto.commit set to false.
> > Same behavior with `offsets.storage=kafka`, which isn't surprising as
> this
> > configuration value is dropped in 0.10.
> >
> > `kafka-console-consumer --topic __consumer_offsets --zookeeper
> > localhost:/kafka-exp --bootstrap-server localhost:9092` shows nothing
> while
> > my program is committing offsets.
> > Not sure it matters, but I consume the topic using a Spark 2.0.0 app.
> >
> > Is there anything specific I should do to store consumers' offsets in a
> > Kafka topic instead of ZooKeeper?
> >
> > Thank you for you help!
> >
> > Samy
> >
>
> --
>
>
> This email, including attachments, is private and confidential. If you have
> received this email in error please notify the sender and delete it from
> your system. Emails are not secure and may contain viruses. No liability
> can be accepted for viruses that might be transferred by this email or any
> attachment. Any unauthorised copying of this message or unauthorised
> distribution and publication of the information contained herein are
> prohibited.
>
> 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
> Registered in England and Wales. Registered No. 04843573.
>


Re: Kafka 0.10 stores offsets in ZooKeeper instead of a Kafka topic

2016-10-13 Thread Samy Dindane

Ben, Vasilij, thanks for you answer.

I forgot to mention I use spark-streaming-kafka-0-10.

On 10/13/2016 03:17 PM, Vasilij Syc wrote:

Spark 2.0 has experemental support of kafka 10.0 and you have to explicitly
define this in your build e.g. spark-streaming-kafka-0-10

On 13 Oct 2016 16:10, "Ben Davison"  wrote:


I *think* Spark 2.0.0 has a Kafka 0.8 consumer, which would still use the
old Zookeeper method.

The use the new consumer offsets the consumer needs to be atleast Kafka 0.9
compatible.

On Thu, Oct 13, 2016 at 1:55 PM, Samy Dindane  wrote:


Hi,

I use Kafka 0.10 with ZK 3.4.6 and my consumers' offsets aren't stored in
the __consumer_offsets topic but in ZK instead.

That happens whether I let the consumer commit automatically, or commit
manually with enable.auto.commit set to false.
Same behavior with `offsets.storage=kafka`, which isn't surprising as

this

configuration value is dropped in 0.10.

`kafka-console-consumer --topic __consumer_offsets --zookeeper
localhost:/kafka-exp --bootstrap-server localhost:9092` shows nothing

while

my program is committing offsets.
Not sure it matters, but I consume the topic using a Spark 2.0.0 app.

Is there anything specific I should do to store consumers' offsets in a
Kafka topic instead of ZooKeeper?

Thank you for you help!

Samy



--


This email, including attachments, is private and confidential. If you have
received this email in error please notify the sender and delete it from
your system. Emails are not secure and may contain viruses. No liability
can be accepted for viruses that might be transferred by this email or any
attachment. Any unauthorised copying of this message or unauthorised
distribution and publication of the information contained herein are
prohibited.

7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
Registered in England and Wales. Registered No. 04843573.





Problem with ke-value in the KvStorre

2016-10-13 Thread Hamza HACHANI
Hi,

I've designed two processors with a different topology.

The issue is that in the firs  topology in one node i was able to associate 
diffrent messages (key,value) where the key could be the same and so i was able 
to do something like countbyKey.

In the second example when i liked to do the same i noticed that this was not 
possible. eny new value assiated to a key is erased by the next value so there 
is a respect to the unicity of the key.

I think this is really weird.

Does anybody has an explenation or a suggestion ?

Thanks in advance,


Hamza


RE: HL7 messages to Kafka consumer

2016-10-13 Thread Martin Gainty



> From: artemerv...@gmail.com
> Date: Thu, 13 Oct 2016 00:49:50 -0400
> Subject: Re: HL7 messages to Kafka consumer
> To: users@kafka.apache.org
> 
> Nifi HL7 processor is built using HAPI API, which supports z-segments
> http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/CustomModelClasses.html
MG>lets go back to the spec so we are on the same page:

Z segments contain clinical or patient data that the HL7 Standard may not have 
defined in other areas. Essentially, it is the “catch all” for data that does 
not fit into the HL7 Standard message definitions. Z segments can be inserted 
in ANY message at any time, and Z segments can carry ANY data you want. In HL7 
messaging, all Z segments within it start with the letter “Z”.

http://healthstandards.com/blog/2006/10/05/what-are-z-segments/
so in your above example your segment has a predefined class which which must 
be known a-priori and already codedThis class contains 2 
datatypes:ca.uhn.hl7v2.model.v25.datatype.NM; //Numeric 
Stringca.uhn.hl7v2.model.v25.datatype.ST; //Single String the spec says  Z 
segments can carry ANY data you want.

so what happens when health-care facility wants a Zsegment which contains 
backup doctor contact info segment?right now http://hl7api.sourceforge.net/ 
contains no 
"backup-doctor-contact-info-segment"http://hl7api.sourceforge.net/v25/apidocs/ca/uhn/hl7v2/model/v25/datatype/package-frame.htmlif
 I do add a new Backup-Doctor-Contact-Info Segment which does not conform to 
your schema
I will be thrown HL7Exception because implemented interface Group is :

an abstraction representing >1 message parts which may repeated together.
* An implementation of Group should enforce contraints about on the contents of 
the group
* and throw an exception if an attempt is made to add a Structure that the 
Group instance
* does not 
recognize.http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/model/Group.htmlany 
implication that all segments are predefined (even Z-segments) does not conform 
with the above statements and will not automatically guarantee the asserts from 
HL7api are met

MG>this has little to do with kafka so lets take this offline> 
> On Wed, Oct 12, 2016 at 10:10 PM, Martin Gainty  wrote:
> 
> >
> >
> >
> > > From: dbis...@gmail.com
> > > Date: Wed, 12 Oct 2016 20:42:04 -0400
> > > Subject: RE: HL7 messages to Kafka consumer
> > > To: users@kafka.apache.org
> > >
> > > I did it with HAPI API and Kafka producer way back when and it worked
> > well.
> > > Times have changed, If you consider using Apache Nifi, besides native HL7
> > > processor,
> > MG>since this is where i get 99% of the applications i work on I have to
> > ask will Nifi process Z segments?
> > MG>if Nifi does not not process  Z segments you might want to delay being
> > a Nifi evangelist and go with
> > MG>aforementioned solution
> >  you can push to Kafka by dragging a processor on canvas. HL7
> > > processor also is built on HAPI API. Here's an example but instead of
> > Kafka
> > > it's pushing to Solr, replacing solr processor with Kafka will do a
> > trick.
> > MG>kafka server.properties does support a zk provider so kafka server can
> > ingest resultset(s) from zk
> > # Zookeeper #
> > # Zookeeper connection string (see zookeeper docs for details).# This is a
> > comma separated host:port pairs, each corresponding to a zk# server. e.g. "
> > 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an
> > optional chroot string to the urls to specify the# root directory for all
> > kafka znodes.
> > zookeeper.connect=localhost:2181
> > # Timeout in ms for connecting to zookeeper
> > zookeeper.connection.timeout.ms=6000
> > MG>kafkas clear advantage over zk is to control flow by pausing or
> > resuming partitions to your kafka consumer
> > MG>possible side-effect of relying only on zks provider would disable this
> > control-flow capability of kafka
> > > Old and new consumer API is available.
> > >
> > > https://community.hortonworks.com/articles/20318/visualize-
> > patients-complaints-to-their-doctors-usi.html
> > >
> > > On Oct 12, 2016 4:33 PM, "Martin Gainty"  wrote:
> > >
> > > > provisionally accomplished task by embedding A01,A03 and A08 HL7
> > > > Event-types into SOAP 1.2 Envelopes
> > > > I remember having difficulty transporting over a non-dedicated
> > transport
> > > > such as what Kafka implements
> > > > Producer Embeds Fragment1 into SOAPEnvelope
> > > > Producer Sends Fragment1-SOAPEnvelope of A01
> > > > Consumer pulls Fragment1 of A01 from SOAP1.2 Body and places
> > SOAPEnvelope
> > > > into cache
> > > > Consumer quiesces connection presumably so other SOAP 1.2 messages can
> > be
> > > > transported
> > > > Consumer re-activates connection when sufficient bandwidth
> > detected(higher
> > > > priirity SOAP1.2 envelopes have been transmitted)
> > > > Producer Embed Fragment2 into SOAPEnvelope
> > > >
> > > > Producer Sends Fragment2-SOAPEnvelope of A01

RE: Problem with ke-value in the KvStorre

2016-10-13 Thread Hamza HACHANI
Sorry i was saying anything.

Please consider  as if i didn't say anything.

Kafka does ensure the unicity of the key.


De : Hamza HACHANI 
Envoyé : jeudi 13 octobre 2016 01:38:42
À : users@kafka.apache.org
Objet : Problem with ke-value in the KvStorre

Hi,

I've designed two processors with a different topology.

The issue is that in the firs  topology in one node i was able to associate 
diffrent messages (key,value) where the key could be the same and so i was able 
to do something like countbyKey.

In the second example when i liked to do the same i noticed that this was not 
possible. eny new value assiated to a key is erased by the next value so there 
is a respect to the unicity of the key.

I think this is really weird.

Does anybody has an explenation or a suggestion ?

Thanks in advance,


Hamza


Occasional NPE in NamedCache

2016-10-13 Thread Frank Lyaruu
Hi Kafka people,

I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
0.10.1 release candidate.

It runs ok for a few thousand of messages, and then it dies with the
following exception:

Exception in thread "StreamThread-1" java.lang.NullPointerException
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
at
org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
at
org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182)
at
org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92)
at
org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:177)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:427)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:235)

I know this isn't a great bug report, as I can't seem to reproduce this in
a more sandboxed situation. Any tips / ideas for further 

Re: Understanding out of order message processing w/ Streaming

2016-10-13 Thread Ali Akhtar
Makes sense. Thanks

On 13 Oct 2016 12:42 pm, "Michael Noll"  wrote:

> > But if they arrive out of order, I have to detect / process that myself
> in
> > the processor logic.
>
> Yes -- if your processing logic depends on the specific ordering of
> messages (which is the case for you), then you must manually implement this
> ordering-specific logic at the moment.
>
> Other use cases may not need to do that and "just work" even with
> out-of-order data.  If, for example, you are counting objects or are
> computing the sum of numbers, then you do not need to anything special.
>
>
>
>
>
> On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar  wrote:
>
> > Thanks Matthias.
> >
> > So, if I'm understanding this right, Kafka will not discard which
> messages
> > which arrive out of order.
> >
> > What it will do is show messages in the order in which they arrive.
> >
> > But if they arrive out of order, I have to detect / process that myself
> in
> > the processor logic.
> >
> > Is that correct?
> >
> > Thanks.
> >
> > On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax  >
> > wrote:
> >
> > > -BEGIN PGP SIGNED MESSAGE-
> > > Hash: SHA512
> > >
> > > Last question first: A KTable is basically in finite window over the
> > > whole stream providing a single result (that gets updated when new
> > > data arrives). If you use windows, you cut the overall stream into
> > > finite subsets and get a result per window. Thus, I guess you do not
> > > need windows (if I understood you use case correctly).
> > >
> > > However, current state of Kafka Streams DSL, you will not be able to
> > > use KTable (directly -- see suggestion to fix this below) because is
> > > does (currently) not allow to access the timestamp of the current
> > > record (thus, you can not know if a record is late or not). You will
> > > need to use Processor API which allows you to access the current
> > > records timestamp via the Context object given in init()
> > >
> > > Your reasoning about partitions and Streams instances is correct.
> > > However, the following two are not
> > >
> > > > - Because I'm using a KTable, the timestamp of the messages is
> > > > extracted, and I'm not shown the older bid because I've already
> > > > processed the later bid. The older bid is ignored.
> > >
> > > and
> > >
> > > > - Because of this, the replica already knows which timestamps it
> > > > has processed, and is able to ignore the older messages.
> > >
> > > Late arriving records are not dropped but processes regularly. Thus,
> > > your KTable aggregate function will be called for the late arriving
> > > record, too (but as described about, you have currently no way to know
> > > it is a later record).
> > >
> > >
> > > Last but not least, you last statement is a valid concern:
> > >
> > > > Also, what will happen if bid 2 arrived and got processed, and then
> > > > the particular replica crashed, and was restarted. The restarted
> > > > replica won't have any memory of which timestamps it has previously
> > > > processed.
> > > >
> > > > So if bid 2 got processed, replica crashed and restarted, and then
> > > > bid 1 arrived, what would happen in that case?
> > >
> > > In order to make this work, you would need to store the timestamp in
> > > you store next to the actual data. Thus, you can compare the timestamp
> > > of the latest result (safely stored in operator state) with the
> > > timestamp of the current record.
> > >
> > > Does this makes sense?
> > >
> > > To fix you issue, you could add a .transformValue() before you KTable,
> > > which allows you to access the timestamp of a record. If you add this
> > > timestamp to you value and pass it to KTable afterwards, you can
> > > access it and it gets also store reliably.
> > >
> > >  => transformValue =>  > > timestamp} => aggregate
> > >
> > > Hope this helps.
> > >
> > > - -Matthias
> > >
> > >
> > > On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > > > P.S, does my scenario require using windows, or can it be achieved
> > > > using just KTable?
> > > >
> > > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar 
> > > > wrote:
> > > >
> > > >> Heya,
> > > >>
> > > >> Say I'm building a live auction site, with different products.
> > > >> Different users will bid on different products. And each time
> > > >> they do, I want to update the product's price, so it should
> > > >> always have the latest price in place.
> > > >>
> > > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
> > > >> the same product 100 ms later.
> > > >>
> > > >> The second bid arrives first and the price is updated to $5. Then
> > > >> the first bid arrives. I want the price to not be updated in this
> > > >> case, as this bid is older than the one I've already processed.
> > > >>
> > > >> Here's my understanding of how I can achieve this with Kafka
> > > >> Streaming - is my understanding correct?
> > > >>
> > > >> - I have a topic for receiving bids. The topic has N partitions,
> > > >> and I have N replicas of my applicatio

Re: Understanding out of order message processing w/ Streaming

2016-10-13 Thread Ali Akhtar
I am probably being too ocd anyway. It will almost never happen that
messages from another vm in the same network on ec2 arrive out of order.
Right?

On 13 Oct 2016 8:47 pm, "Ali Akhtar"  wrote:

> Makes sense. Thanks
>
> On 13 Oct 2016 12:42 pm, "Michael Noll"  wrote:
>
>> > But if they arrive out of order, I have to detect / process that myself
>> in
>> > the processor logic.
>>
>> Yes -- if your processing logic depends on the specific ordering of
>> messages (which is the case for you), then you must manually implement
>> this
>> ordering-specific logic at the moment.
>>
>> Other use cases may not need to do that and "just work" even with
>> out-of-order data.  If, for example, you are counting objects or are
>> computing the sum of numbers, then you do not need to anything special.
>>
>>
>>
>>
>>
>> On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar 
>> wrote:
>>
>> > Thanks Matthias.
>> >
>> > So, if I'm understanding this right, Kafka will not discard which
>> messages
>> > which arrive out of order.
>> >
>> > What it will do is show messages in the order in which they arrive.
>> >
>> > But if they arrive out of order, I have to detect / process that myself
>> in
>> > the processor logic.
>> >
>> > Is that correct?
>> >
>> > Thanks.
>> >
>> > On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax <
>> matth...@confluent.io>
>> > wrote:
>> >
>> > > -BEGIN PGP SIGNED MESSAGE-
>> > > Hash: SHA512
>> > >
>> > > Last question first: A KTable is basically in finite window over the
>> > > whole stream providing a single result (that gets updated when new
>> > > data arrives). If you use windows, you cut the overall stream into
>> > > finite subsets and get a result per window. Thus, I guess you do not
>> > > need windows (if I understood you use case correctly).
>> > >
>> > > However, current state of Kafka Streams DSL, you will not be able to
>> > > use KTable (directly -- see suggestion to fix this below) because is
>> > > does (currently) not allow to access the timestamp of the current
>> > > record (thus, you can not know if a record is late or not). You will
>> > > need to use Processor API which allows you to access the current
>> > > records timestamp via the Context object given in init()
>> > >
>> > > Your reasoning about partitions and Streams instances is correct.
>> > > However, the following two are not
>> > >
>> > > > - Because I'm using a KTable, the timestamp of the messages is
>> > > > extracted, and I'm not shown the older bid because I've already
>> > > > processed the later bid. The older bid is ignored.
>> > >
>> > > and
>> > >
>> > > > - Because of this, the replica already knows which timestamps it
>> > > > has processed, and is able to ignore the older messages.
>> > >
>> > > Late arriving records are not dropped but processes regularly. Thus,
>> > > your KTable aggregate function will be called for the late arriving
>> > > record, too (but as described about, you have currently no way to know
>> > > it is a later record).
>> > >
>> > >
>> > > Last but not least, you last statement is a valid concern:
>> > >
>> > > > Also, what will happen if bid 2 arrived and got processed, and then
>> > > > the particular replica crashed, and was restarted. The restarted
>> > > > replica won't have any memory of which timestamps it has previously
>> > > > processed.
>> > > >
>> > > > So if bid 2 got processed, replica crashed and restarted, and then
>> > > > bid 1 arrived, what would happen in that case?
>> > >
>> > > In order to make this work, you would need to store the timestamp in
>> > > you store next to the actual data. Thus, you can compare the timestamp
>> > > of the latest result (safely stored in operator state) with the
>> > > timestamp of the current record.
>> > >
>> > > Does this makes sense?
>> > >
>> > > To fix you issue, you could add a .transformValue() before you KTable,
>> > > which allows you to access the timestamp of a record. If you add this
>> > > timestamp to you value and pass it to KTable afterwards, you can
>> > > access it and it gets also store reliably.
>> > >
>> > >  => transformValue => > > > timestamp} => aggregate
>> > >
>> > > Hope this helps.
>> > >
>> > > - -Matthias
>> > >
>> > >
>> > > On 10/11/16 9:12 PM, Ali Akhtar wrote:
>> > > > P.S, does my scenario require using windows, or can it be achieved
>> > > > using just KTable?
>> > > >
>> > > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar 
>> > > > wrote:
>> > > >
>> > > >> Heya,
>> > > >>
>> > > >> Say I'm building a live auction site, with different products.
>> > > >> Different users will bid on different products. And each time
>> > > >> they do, I want to update the product's price, so it should
>> > > >> always have the latest price in place.
>> > > >>
>> > > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
>> > > >> the same product 100 ms later.
>> > > >>
>> > > >> The second bid arrives first and the price is updated to $5. Then
>> > > >> the first bid arrives. I want the price to no

Re: I found kafka lsot message

2016-10-13 Thread Hans Jespersen
Watch this talk. Kafka will not lose messages it configured correctly.

http://www.confluent.io/kafka-summit-2016-ops-when-it-absolutely-positively-has-to-be-there
 


-hans





> On Oct 13, 2016, at 2:48 AM,   wrote:
> 
> Other question:   if account of consumer over broker number, whether loss 
> data?if some broker rebooted, whether loss data? 
> Thanks.
> 
> 
> 
> 
> - 原始邮件 -
> 发件人:Guozhang Wang 
> 收件人:yangy...@sina.com
> 抄送人:users 
> 主题:Re: Re: I found kafka lsot message
> 日期:2016年10月12日 07点05分
> 
> One common issue of lost messages is that consumer auto-committing (related 
> config is "auto.commit.enabled", "commit.interval.ms"): from the Kafka 
> consumer point of view, once the messages are returned from the "poll" call 
> they are considered "consumed", and if committing offsets is called it will 
> set the offset to the last message's offset. So if you have some exceptions / 
> errors in the middle of processing some of your consumed records, upon 
> restarting they will not be re-fetched again, hence possibly causing "data 
> loss" on your end.
> 
> Guozhang
> On Mon, Oct 10, 2016 at 3:33 AM,   wrote:
> Hi Guozhang,
> At first, thank you answer my question, and give me some suggest.But, I'm 
> sure I readed some introduction about kafka.
> In my producer, My Code is( c code):res = rd_kafka_conf_set(kafka_conf, 
> "request.required.acks", "-1", NULL, 0);res = rd_kafka_topic_conf_set( 
> topic_conf, "produce.offset.report", "true", errstr, sizeof(errstr) );
> In my consumer, My Code is(kafka-python):
>self.__consumer = KafkaConsumer( 
> bootstrap_servers=self.__kafka_config["hosts"],   
>  group_id=self.__kafka_config["group"],   
>  auto_offset_reset="earliest",
> )
> self.__consumer.subscribe([self.__kafka_config["data_topic"]])
>for message in self.__consumer:
> 
> I think these codes is common, What's your suggest about these codes?
> In the end, I must explain: Lost message is not often, some time, couple days 
> can find one or two lost messages.But some day, maybe can find over 20 
> messages were losted.Our message over 1,200,000 in one day.  
> So, do your have any suggest?Bye the way, can you speak Chinese? Thank you 
> very much & Best wishesJerry
> 
> 
> 
> 
> - 原始邮件 -
> 发件人:Guozhang Wang 
> 收件人:"users@kafka.apache.org" , yangy...@sina.com
> 主题:Re: I found kafka lsot message
> 日期:2016年10月10日 01点25分
> 
> Jerry,
> Message lost scenarios usually are due to producer and consumer 
> mis-configured. Have you read about the client configs web docs?
> http://kafka.apache.org/documentation#producerconfigs
> 
> http://kafka.apache.org/documentation#newconsumerconfigs
> 
> 
> If not I'd suggest you reading those first and see if you can tune some of 
> these configs to have better delivery guarantees.
> Guozhang
> 
> On Fri, Oct 7, 2016 at 9:48 PM,   wrote:
> Hello every body,I build a kafka cluster(include 5 domains) use 
> kafka_2.11-0.10.0.0 and kafka-python-1.3.1.I create a topic by 100 partitions 
> and 2 replicate, then use 20 consumers to receive message.
> 
> But, I found sometime the kafka lost message! I found some partition's 
> offsite lost at consumer.After, I make a global index for every message from 
> producer for confirm this problem, and I also found the global index had been 
> break!
> 
> Why the kafka lost message?  What can I do to fix the problem?
> 
> Thanks!Jerry
> 
> 
> 
> 
> 
> 
> 
> 
> 
> -- 
> -- Guozhang
> 
> 
> 
> 
> 
> -- 
> -- Guozhang
> 
> 
> 



Re: Occasional NPE in NamedCache

2016-10-13 Thread Damian Guy
Hi Frank,

Thanks for reporting. Can you provide a sample of the join you are running?

Thanks,
Damian

On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu  wrote:

> Hi Kafka people,
>
> I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
> 0.10.1 release candidate.
>
> It runs ok for a few thousand of messages, and then it dies with the
> following exception:
>
> Exception in thread "StreamThread-1" java.lang.NullPointerException
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(Sourc

Re: Occasional NPE in NamedCache

2016-10-13 Thread Damian Guy
Hi, i believe i found the problem. If possible could you please try with
this: https://github.com/dguy/kafka/tree/cache-bug

Thanks,
Damian

On Thu, 13 Oct 2016 at 17:46 Damian Guy  wrote:

> Hi Frank,
>
> Thanks for reporting. Can you provide a sample of the join you are
> running?
>
> Thanks,
> Damian
>
> On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu  wrote:
>
> Hi Kafka people,
>
> I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
> 0.10.1 release candidate.
>
> It runs ok for a few thousand of messages, and then it dies with the
> following exception:
>
> Exception in thread "StreamThread-1" java.lang.NullPointerException
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Processor

Re: [kafka-clients] [VOTE] 0.10.1.0 RC2

2016-10-13 Thread Damian Guy
Hi Jason,

Really sorry, but we are going to need to cut another RC.  There was a
report on the user list w.r.t the NamedCache (in KafkaStreams) throwing a
NullPointerException. I've looked into it and it is definitely a bug that
needs to be fixed. jira is https://issues.apache.org/jira/browse/KAFKA-4300 and
PR is https://github.com/apache/kafka/pull/2024

Thanks,
Damian

On Wed, 12 Oct 2016 at 20:05 Dana Powers  wrote:

> +1; all kafka-python integration tests pass.
>
> -Dana
>
>
> On Wed, Oct 12, 2016 at 10:41 AM, Jason Gustafson 
> wrote:
> > Hello Kafka users, developers and client-developers,
> >
> > One more RC for 0.10.1.0. I think we're getting close!
> >
> > Release plan:
> > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1.
> >
> > Release notes for the 0.10.1.0 release:
> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Saturday, Oct 15, 11am PT
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/
> >
> > * Javadoc:
> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/javadoc/
> >
> > * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc2 tag:
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=8702d66434b86092a3738472f9186d6845ab0720
> >
> > * Documentation:
> > http://kafka.apache.org/0101/documentation.html
> >
> > * Protocol:
> > http://kafka.apache.org/0101/protocol.html
> >
> > * Tests:
> > Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/68/
> > System tests:
> >
> http://confluent-kafka-0-10-1-system-test-results.s3-us-west-2.amazonaws.com/2016-10-11--001.1476197348--apache--0.10.1--d981dd2/
> >
> > Thanks,
> >
> > Jason
> >
> > --
> > You received this message because you are subscribed to the Google Groups
> > "kafka-clients" group.
> > To unsubscribe from this group and stop receiving emails from it, send an
> > email to kafka-clients+unsubscr...@googlegroups.com.
> > To post to this group, send email to kafka-clie...@googlegroups.com.
> > Visit this group at https://groups.google.com/group/kafka-clients.
> > To view this discussion on the web visit
> >
> https://groups.google.com/d/msgid/kafka-clients/CAJDuW%3DDk7Mi6ZsiniHcdbCCBdBhasjSeb7_N3EW%3D97OrfvFyew%40mail.gmail.com
> .
> > For more options, visit https://groups.google.com/d/optout.
>


KafkaProducer 0.9.0.1 Client - Async `send` stops sending

2016-10-13 Thread Praveen
I am running a map-reduce job to queue data from HDFS into Kafka.
The mappers simply open a file and senda the data to Kafka using
KafkaProducer 0.9.0.1 client.

The issue I am debugging is that the Kafka Producer async send fails to
`send` to brokers after sometime. I was able to capture the behavior with
logs enabled.

Kafka Producer starts off doing the following:
1. NetworkClient - Initiates connection with nodes
2. NetworkClient - Sends Metadata request

3. KafkaProducer - Sends records down to the rest of the system
4. RecordAccumulator - Allocates new byte message buffer
5. Sender - Creates producer requests to send the message to the brokers.

Steps 3 - 5 repeats for a while.

Then all of a sudden, when no more records are needed to be sent, the
network client just  sends metadata requests every minute.

My code is waiting on all the async producer sends to complete. I am
checking this with a  `Future.get()`. When I look at the heap dump, I can
still see byte buffers in the RecordAccumulator's BufferPool. So it looks
like Kafka is not attempting to send those records anymore.

Will appreciate if anyone has any insight on what's happening here.

I've also attached my thread dump and producer configs.

Thanks,
Praveen


Re: [kafka-clients] [VOTE] 0.10.1.0 RC2

2016-10-13 Thread Jason Gustafson
No worries, it wouldn't be interesting unless there were a couple
last-minute blockers! We're also got KAFKA-4298 to get in.

-Jason

On Thu, Oct 13, 2016 at 10:04 AM, Damian Guy  wrote:

> Hi Jason,
>
> Really sorry, but we are going to need to cut another RC.  There was a
> report on the user list w.r.t the NamedCache (in KafkaStreams) throwing a
> NullPointerException. I've looked into it and it is definitely a bug that
> needs to be fixed. jira is https://issues.apache.org/
> jira/browse/KAFKA-4300 and PR is https://github.com/apache/kafka/pull/2024
>
> Thanks,
> Damian
>
> On Wed, 12 Oct 2016 at 20:05 Dana Powers  wrote:
>
>> +1; all kafka-python integration tests pass.
>>
>> -Dana
>>
>>
>> On Wed, Oct 12, 2016 at 10:41 AM, Jason Gustafson 
>> wrote:
>> > Hello Kafka users, developers and client-developers,
>> >
>> > One more RC for 0.10.1.0. I think we're getting close!
>> >
>> > Release plan:
>> > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1.
>> >
>> > Release notes for the 0.10.1.0 release:
>> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/RELEASE_NOTES.html
>> >
>> > *** Please download, test and vote by Saturday, Oct 15, 11am PT
>> >
>> > Kafka's KEYS file containing PGP keys we use to sign the release:
>> > http://kafka.apache.org/KEYS
>> >
>> > * Release artifacts to be voted upon (source and binary):
>> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/
>> >
>> > * Maven artifacts to be voted upon:
>> > https://repository.apache.org/content/groups/staging/
>> >
>> > * Javadoc:
>> > http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/javadoc/
>> >
>> > * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc2 tag:
>> > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
>> 8702d66434b86092a3738472f9186d6845ab0720
>> >
>> > * Documentation:
>> > http://kafka.apache.org/0101/documentation.html
>> >
>> > * Protocol:
>> > http://kafka.apache.org/0101/protocol.html
>> >
>> > * Tests:
>> > Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/68/
>> > System tests:
>> > http://confluent-kafka-0-10-1-system-test-results.s3-us-
>> west-2.amazonaws.com/2016-10-11--001.1476197348--apache--0.10.1--d981dd2/
>> >
>> > Thanks,
>> >
>> > Jason
>> >
>> > --
>> > You received this message because you are subscribed to the Google
>> Groups
>> > "kafka-clients" group.
>> > To unsubscribe from this group and stop receiving emails from it, send
>> an
>> > email to kafka-clients+unsubscr...@googlegroups.com.
>> > To post to this group, send email to kafka-clie...@googlegroups.com.
>> > Visit this group at https://groups.google.com/group/kafka-clients.
>> > To view this discussion on the web visit
>> > https://groups.google.com/d/msgid/kafka-clients/CAJDuW%
>> 3DDk7Mi6ZsiniHcdbCCBdBhasjSeb7_N3EW%3D97OrfvFyew%40mail.gmail.com.
>> > For more options, visit https://groups.google.com/d/optout.
>>
>


Re: Occasional NPE in NamedCache

2016-10-13 Thread Frank Lyaruu
The issue seems to be gone. Amazing work, thanks...!

On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy  wrote:

> Hi, i believe i found the problem. If possible could you please try with
> this: https://github.com/dguy/kafka/tree/cache-bug
>
> Thanks,
> Damian
>
> On Thu, 13 Oct 2016 at 17:46 Damian Guy  wrote:
>
> > Hi Frank,
> >
> > Thanks for reporting. Can you provide a sample of the join you are
> > running?
> >
> > Thanks,
> > Damian
> >
> > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu  wrote:
> >
> > Hi Kafka people,
> >
> > I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
> > 0.10.1 release candidate.
> >
> > It runs ok for a few thousand of messages, and then it dies with the
> > following exception:
> >
> > Exception in thread "StreamThread-1" java.lang.NullPointerException
> > at
> >
> > org.apache.kafka.streams.state.internals.NamedCache.
> evict(NamedCache.java:194)
> > at
> >
> > org.apache.kafka.streams.state.internals.ThreadCache.
> maybeEvict(ThreadCache.java:190)
> > at
> >
> > org.apache.kafka.streams.state.internals.ThreadCache.
> put(ThreadCache.java:121)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> CachingKeyValueStore.java:147)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> CachingKeyValueStore.java:134)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableReduce$
> KTableAggregateValueGetter.get(KTableReduce.java:121)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableFilter$
> KTableFilterProcessor.process(KTableFilter.java:83)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableFilter$
> KTableFilterProcessor.process(KTableFilter.java:73)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableFilter$
> KTableFilterProcessor.process(KTableFilter.java:83)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableFilter$
> KTableFilterProcessor.process(KTableFilter.java:73)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
> KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
> KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
> KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
> KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.
> apply(ForwardingCacheFlushListener.java:35)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> maybeForward(CachingKeyValueStore.java:97)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> 000(CachingKeyValueStore.java:34)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> CachingKeyValueStore.java:84)
> > at
> >
> > org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:117)
> > at
> >
> > org.apache.kafka.streams.state.internals.NamedCache.
> evict(NamedCache.java:196)
> > at
> >
> > org.apache.kafka.streams.state.internals.ThreadCache.
> maybeEvict(ThreadCache.java:190)
> > at
> >
> > org.apache.kafka.streams.state.internals.ThreadCache.
> put(ThreadCache.java:121)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingKeyV

Re: Occasional NPE in NamedCache

2016-10-13 Thread Guozhang Wang
Thanks Frank for reporting the bug, and many thanks to Damian for the quick
catch!

On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu  wrote:

> The issue seems to be gone. Amazing work, thanks...!
>
> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy  wrote:
>
> > Hi, i believe i found the problem. If possible could you please try with
> > this: https://github.com/dguy/kafka/tree/cache-bug
> >
> > Thanks,
> > Damian
> >
> > On Thu, 13 Oct 2016 at 17:46 Damian Guy  wrote:
> >
> > > Hi Frank,
> > >
> > > Thanks for reporting. Can you provide a sample of the join you are
> > > running?
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu  wrote:
> > >
> > > Hi Kafka people,
> > >
> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
> > > 0.10.1 release candidate.
> > >
> > > It runs ok for a few thousand of messages, and then it dies with the
> > > following exception:
> > >
> > > Exception in thread "StreamThread-1" java.lang.NullPointerException
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.NamedCache.
> > evict(NamedCache.java:194)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > maybeEvict(ThreadCache.java:190)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > put(ThreadCache.java:121)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > CachingKeyValueStore.java:147)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > CachingKeyValueStore.java:134)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
> > KTableAggregateValueGetter.get(KTableReduce.java:121)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
> > KTableFilterProcessor.process(KTableFilter.java:83)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
> > KTableFilterProcessor.process(KTableFilter.java:73)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
> > KTableFilterProcessor.process(KTableFilter.java:83)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
> > KTableFilterProcessor.process(KTableFilter.java:73)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.
> ForwardingCacheFlushListener.
> > apply(ForwardingCacheFlushListener.java:35)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> > maybeForward(CachingKeyValueStore.java:97)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> > 000(CachingKeyValueStore.java:34)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> > CachingKeyValueStore.java:84)
> > > at
> > >
> > 

Re: Occasional NPE in NamedCache

2016-10-13 Thread Guozhang Wang
BTW this is tracked and resolved as
https://issues.apache.org/jira/browse/KAFKA-4300.

On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang  wrote:

> Thanks Frank for reporting the bug, and many thanks to Damian for the
> quick catch!
>
> On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu  wrote:
>
>> The issue seems to be gone. Amazing work, thanks...!
>>
>> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy  wrote:
>>
>> > Hi, i believe i found the problem. If possible could you please try with
>> > this: https://github.com/dguy/kafka/tree/cache-bug
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Thu, 13 Oct 2016 at 17:46 Damian Guy  wrote:
>> >
>> > > Hi Frank,
>> > >
>> > > Thanks for reporting. Can you provide a sample of the join you are
>> > > running?
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu  wrote:
>> > >
>> > > Hi Kafka people,
>> > >
>> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the
>> Kafka
>> > > 0.10.1 release candidate.
>> > >
>> > > It runs ok for a few thousand of messages, and then it dies with the
>> > > following exception:
>> > >
>> > > Exception in thread "StreamThread-1" java.lang.NullPointerException
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.NamedCache.
>> > evict(NamedCache.java:194)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.ThreadCache.
>> > maybeEvict(ThreadCache.java:190)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.ThreadCache.
>> > put(ThreadCache.java:121)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
>> > CachingKeyValueStore.java:147)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
>> > CachingKeyValueStore.java:134)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
>> > KTableAggregateValueGetter.get(KTableReduce.java:121)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
>> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
>> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:83)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:73)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:83)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:73)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
>> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
>> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
>> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
>> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.ForwardingCacheFl
>> ushListener.
>> > apply(ForwardingCacheFlushListener.java:35)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.
>> > maybeForward(Cachin

Re: How can I delete a topic programatically?

2016-10-13 Thread Ratha v
Hi Jianbin;
I tried like this; Where I provided my zookeeper host. But it says[1] . I
use kafka 0.10. And I see my topic available using KafkaManagementTool.

How can I overcome this issue?

[1]Error while executing topic command : Topic targettopic does not exist
on ZK path zookeeper.xx.com:2181/chroot

*[2016-10-14 11:58:59,919] ERROR java.lang.IllegalArgumentException: Topic
streamtargettopic does not exist on ZK path zookeeper.xx.com:2181/chroot
*

* at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:169)*

* at kafka.admin.TopicCommand$.main(TopicCommand.scala:69)*

* at kafka.admin.TopicCommand.main(TopicCommand.scala)*

 bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic
my_topic_name


On 12 October 2016 at 17:02, Ratha v  wrote:

> Thank you..
>
> On 12 October 2016 at 16:30, Jianbin Wei  wrote:
>
>> You can check this
>>
>> http://kafka.apache.org/documentation.html#basic_ops_add_topic
>>
>> But from our experience it is best to delete topics one by one, i.e.,
>> make sure Kafka is in good shape before and after deleting a topic before
>> working on next one.
>>
>> Regards,
>>
>> -- Jianbin
>>
>> > On Oct 11, 2016, at 9:26 PM, Ratha v  wrote:
>> >
>> > Thanks..Which bash script I need to run?
>> >
>> >> On 12 October 2016 at 15:20, Ali Akhtar  wrote:
>> >>
>> >> The last time I tried, I couldn't find a way to do it, other than to
>> >> trigger the bash script for topic deletion programatically.
>> >>
>> >>> On Wed, Oct 12, 2016 at 9:18 AM, Ratha v 
>> wrote:
>> >>>
>> >>> Hi all;
>> >>>
>> >>> I have two topics(source and target). I do some processing on the
>> message
>> >>> available in the source topic and i merge both topic.
>> >>> That is;
>> >>>
>> >>> builder.stream(sourceTopic).to(targetTopic)
>> >>>
>> >>> Once merged I no longer require the sourceTopic. I want to delete it.
>> >>>
>> >>> How can I do that programatically in java? I use highelevel  client
>> APIs,
>> >>> kafka v 0.10.0.1
>> >>>
>> >>>
>> >>> Thanks
>> >>> --
>> >>> -Ratha
>> >>> http://vvratha.blogspot.com/
>> >>>
>> >>
>> >
>> >
>> >
>> > --
>> > -Ratha
>> > http://vvratha.blogspot.com/
>>
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


kafka-topics.sh ---delete --topic ''x'' is not working for kafka V 0.10.1

2016-10-13 Thread Ratha v
HI all;

I try to delete a existing topic (I checked using kafka management console)
using following command;

#./kafka-topics.sh --zookeeper zookeeper.xx.com:2181/chroot --delete
--topic testTopic

But it says topic not available in zookeeper.[1]
I create those topics at runtime . (I think it is created in kafka
clusters? )

How can I delete the topic using this bash script?

[1]Error while executing topic command : Topic targettopic does not exist
on ZK path zookeeper.xx.com:2181/chroot

*[2016-10-14 11:58:59,919] ERROR java.lang.IllegalArgumentException: Topic
streamtargettopic does not exist on ZK path zookeeper.xx.com:2181/chroot
*

*at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:169)*

*at kafka.admin.TopicCommand$.main(TopicCommand.scala:69)*

*at kafka.admin.TopicCommand.main(TopicCommand.scala)*

-- 
-Ratha
http://vvratha.blogspot.com/


Re: kafka-topics.sh ---delete --topic ''x'' is not working for kafka V 0.10.1

2016-10-13 Thread Ratha v
I overcame the issue.

Issue was I had "/chroot" at the end.That is how mentioned in the
documentationkafka.apache.org/documentation.html#basic_ops_modify_topic.

Thank you.

On 14 October 2016 at 12:25, Ratha v  wrote:

> HI all;
>
> I try to delete a existing topic (I checked using kafka management
> console) using following command;
>
> #./kafka-topics.sh --zookeeper zookeeper.xx.com:2181/chroot --delete
> --topic testTopic
>
> But it says topic not available in zookeeper.[1]
> I create those topics at runtime . (I think it is created in kafka
> clusters? )
>
> How can I delete the topic using this bash script?
>
> [1]Error while executing topic command : Topic targettopic does not exist
> on ZK path zookeeper.xx.com:2181/chroot
>
> *[2016-10-14 11:58:59,919] ERROR java.lang.IllegalArgumentException: Topic
> streamtargettopic does not exist on ZK path zookeeper.xx.com:2181/chroot
> *
>
> *at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:169)*
>
> *at kafka.admin.TopicCommand$.main(TopicCommand.scala:69)*
>
> *at kafka.admin.TopicCommand.main(TopicCommand.scala)*
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/