Re: KStreams to KTable join
Guozhang, "1) if the coming record's key is null, then when it flows into the join processor inside the topology this record will be dropped as it cannot be joined with any records from the other stream." Can you please elaborate on the notion of key? By keys, do you mean kafka partition keys? For a json kstream to ktable example, can you please show me a sample input? For me, the ktable has: {"user_name": "Joe", "location": "US", "gender": "male"} {"user_name": "Julie", "location": "US", "gender": "female"} {"user_name": "Kawasaki", "location": "Japan", "gender": "male"} The kstream gets a event (KStreams) {"user": "Joe", "custom": {"choice":"vegan"}} Is this data right or do I need to have a key and then a json - as in: "joe", {"user_name": "Joe", "location": "US", "gender": "male"} - Shekar On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang wrote: > I think your issue is in two folds: > > 1) if the coming record's key is null, then when it flows into the join > processor inside the topology this record will be dropped as it cannot be > joined with any records from the other stream. > > 2) the NPE you are getting when giving it the non-null keyed record seems > because, you are using "SnowServerDeserialzer" (is it set as the default > key deserializer) which expects a SnowServerPOJOClass while the key "joe" > is typed String. You need to override the key deserialize when constructing > the "cache" KTable as well: > > > KTable cache = builder.table(Serdes.String(), > rawSerde, "cache", "local-cache"); > > > Guozhang > > > On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur wrote: > > > Guozhang > > > > I am using 0.10.2.1 version > > > > - Shekar > > > > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang > > wrote: > > > > > Hi Shekar, > > > > > > Could you demonstrate your input data. More specifically, what are the > > key > > > types of your input streams, and are they not-null values? It seems the > > > root cause is similar to the other thread you asked on the mailing > list. > > > > > > Also, could you provide your used Kafka Streams version? > > > > > > > > > Guozhang > > > > > > > > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur > > wrote: > > > > > > > Hello, > > > > > > > > I am having trouble implementing streams to table join. > > > > > > > > I have 2 POJO's each representing streams and table data structures. > > raw > > > > topic contains streams and cache topic contains table structure. The > > join > > > > is not happening since the print statement is not being called. > > > > > > > > Appreciate any pointers. > > > > > > > > - Shekar > > > > > > > > raw.leftJoin(cache, new ValueJoiner > > > CachePOJOClass,RawPOJOClass>() { > > > > > > > > @Override > > > > public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) { > > > > > > > > String src=r.getSource(); > > > > String cSrc=c.getSnowHost(); > > > > Custom custom=new Custom(); > > > > > > > > if (src.matches(snowSrc)){ > > > > System.out.println("In apply code"); > > > > custom.setAdditionalProperty("custom",cSrc.getAll()); > > > > r.setCustom(custom); > > > > } > > > > return r; > > > > } > > > > }).to("parser"); > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >
Re: KStreams to KTable join
Guozhang, "1) if the coming record's key is null, then when it flows into the join processor inside the topology this record will be dropped as it cannot be joined with any records from the other stream." Can you please elaborate on the notion of key? By keys, do you mean kafka partition keys? For a json kstream to ktable example, can you please show me a sample input? For me, the ktable has: {"user_name": "Joe", "location": "US", "gender": "male"} {"user_name": "Julie", "location": "US", "gender": "female"} {"user_name": "Kawasaki", "location": "Japan", "gender": "male"} The kstream gets a event (KStreams) {"user": "Joe", "custom": {"choice":"vegan"}} Is this data right or do I need to have a key and then a json - as in: "joe", {"user_name": "Joe", "location": "US", "gender": "male"} On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang wrote: > I think your issue is in two folds: > > 1) if the coming record's key is null, then when it flows into the join > processor inside the topology this record will be dropped as it cannot be > joined with any records from the other stream. > > 2) the NPE you are getting when giving it the non-null keyed record seems > because, you are using "SnowServerDeserialzer" (is it set as the default > key deserializer) which expects a SnowServerPOJOClass while the key "joe" > is typed String. You need to override the key deserialize when constructing > the "cache" KTable as well: > > > KTable cache = builder.table(Serdes.String(), > rawSerde, "cache", "local-cache"); > > > Guozhang > > > On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur wrote: > > > Guozhang > > > > I am using 0.10.2.1 version > > > > - Shekar > > > > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang > > wrote: > > > > > Hi Shekar, > > > > > > Could you demonstrate your input data. More specifically, what are the > > key > > > types of your input streams, and are they not-null values? It seems the > > > root cause is similar to the other thread you asked on the mailing > list. > > > > > > Also, could you provide your used Kafka Streams version? > > > > > > > > > Guozhang > > > > > > > > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur > > wrote: > > > > > > > Hello, > > > > > > > > I am having trouble implementing streams to table join. > > > > > > > > I have 2 POJO's each representing streams and table data structures. > > raw > > > > topic contains streams and cache topic contains table structure. The > > join > > > > is not happening since the print statement is not being called. > > > > > > > > Appreciate any pointers. > > > > > > > > - Shekar > > > > > > > > raw.leftJoin(cache, new ValueJoiner > > > CachePOJOClass,RawPOJOClass>() { > > > > > > > > @Override > > > > public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) { > > > > > > > > String src=r.getSource(); > > > > String cSrc=c.getSnowHost(); > > > > Custom custom=new Custom(); > > > > > > > > if (src.matches(snowSrc)){ > > > > System.out.println("In apply code"); > > > > custom.setAdditionalProperty("custom",cSrc.getAll()); > > > > r.setCustom(custom); > > > > } > > > > return r; > > > > } > > > > }).to("parser"); > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >
Re: mirroring Kafka while preserving the order
MirrorMaker acts as a consumer+producer. So it will consume from the source topic and produce to the destination topic. That means that the destination partition is chosen using the same technique as the normal producer: * if the source record has a key, the key will be hashed and the hash will be used to choose a partition. If the source partition was chosen using some different hashing algorithm or a custom partitioner, then you may end up writing to a different destination. * if the source record does NOT have a key, then the destination partition will be randomly chosen. MirrorMaker supports custom message handlers. You can use those to map the source partition to the destination partition, which will allow you to avoid the above two problems. Here's an example of how to do it. https://github.com/gwenshap/kafka-examples/tree/master/MirrorMakerHandler -James Sent from my iPhone > On Jun 29, 2017, at 9:57 AM, Tom Bentley wrote: > > I believe so. You need to be careful that the mirror maker producer doesn't > reorder messages; in particular if retries > 0 then > max.in.flight.requests.per.connection must be 1. If > retries=0 then it doesn't matter what max.in.flight.requests.per.connection > is. > > > >> On 29 June 2017 at 05:52, Sunil Parmar wrote: >> >> Is it possible to configure mirror maker using message handler to preserve >> the order of messages in each topic partition. In the particular use case >> we're looking at both source and destination topics have same number of >> partitions. >> >> Sunil >>
Re: Weird broker lock-up causing an almost global downtime
Hi, we're using Kafka 0.10.1.1 and the streams app is using 0.10.2.1 On Thu, Jun 29, 2017, at 04:21 PM, Bill Bejeck wrote: > Hi Vincent, > > What version of Kafka/Kafka Streams are you running, more specifically > when > this error occurred? > > Thanks, > Bill > > On Wed, Jun 28, 2017 at 12:24 PM, Bill Bejeck wrote: > > > Thanks for the info Vincent. > > > > -Bill > > > > On Wed, Jun 28, 2017 at 12:19 PM, Vincent Rischmann > > wrote: > > > >> I'm not sure what exactly to search for, but here are a couple of > >> things. > >> > >> First, looking at the broker 1, there's this: > >> https://pastebin.com/raw/XMLpjj7J which I find weird, because I'm sure I > >> didn't create those topics manually...which means to me that the streams > >> application was still up (despite me thinking it was shut down) and > >> therefore it recreated the topics ? Or is there another explanation here > >> ? > >> > >> After that, there is a lot of stuff like this > >> https://pastebin.com/raw/7PfRWuZb until 15:51 when there's this > >> https://pastebin.com/raw/Pnhs4JT1. Still after that at 15:53 the topics > >> are again deleted it seems https://pastebin.com/raw/wTL9PcWJ > >> > >> The broker with the problem was the broker 5. Here's a piece of logs > >> just after I ran the reset script. https://pastebin.com/raw/PgCDZFb8 > >> > >> After that there are a LOT of UnknownTopicOrPartitionException and then > >> this: https://pastebin.com/raw/b7k7yNdQ > >> This repeats for a while until 15:50 when it seems the broker thinks > >> he's the only one in the cluster ? https://pastebin.com/raw/Qy0TGjnx I > >> didn't include everything but it looks like every topic/partition had > >> it's ISR shrink. > >> > >> This goes on for a long time. At 15:57 there's this > >> https://pastebin.com/raw/VrTGdXBZ > >> After that there a lot of NotLeaderForPartitionException > >> https://pastebin.com/raw/PhDQts66 > >> > >> Then at 16:04 according to the log I killed -9 the process. At 16:13 I > >> restarted it. It spent 12 minutes recreating index files and then > >> proceeded to create the topics that I tried to delete: > >> https://pastebin.com/raw/bz9MYRsJ > >> Then just one minute later it deleted the topics > >> https://pastebin.com/raw/EQ2Kuwj6 > >> > >> And after that it operated normally. > >> > >> Sorry if it's a bit random and unorganized, I hope it helps anyway. I > >> can search for some specific things if you'd like. > >> > >> > >> On Wed, Jun 28, 2017, at 12:17 AM, Bill Bejeck wrote: > >> > Thanks Vincent. > >> > > >> > That's a good start for now. > >> > > >> > If you get a chance to forward some logs that would be great. > >> > > >> > -Bill > >> > > >> > On Tue, Jun 27, 2017 at 6:10 PM, Vincent Rischmann > >> > wrote: > >> > > >> > > Sure. > >> > > > >> > > The application reads a topic of keyless events and based on some > >> > > criteria of the event, it creates a new key and uses that for > >> > > `selectKey`. > >> > > Then I groupByKey and count with 3 differents windows. Each count is > >> > > then stored in a database. > >> > > > >> > > The three windows are tumbling windows: > >> > > - 1 minute window, 1 day retention > >> > > - 1 hour window, 7 day retention > >> > > - 1 day window, 15 days retention > >> > > That's basically it for the structure. > >> > > The input topic has 64 partitions. > >> > > > >> > > Tomorrow I can get some logs from Kafka/Zookeeper if that would help. > >> > > > >> > > On Tue, Jun 27, 2017, at 11:41 PM, Bill Bejeck wrote: > >> > > > Hi Vincent, > >> > > > > >> > > > Thanks for reporting this issue. Could you give us some more > >> details > >> > > > (number topics, partitions per topic and the structure of your Kafka > >> > > > Streams application) so we attempt to reproduce and diagnose the > >> issue? > >> > > > > >> > > > Thanks! > >> > > > Bill > >> > > > > >> > > > On 2017-06-27 14:46 (-0400), Vincent Rischmann > >> wrote: > >> > > > > Hello. so I had a weird problem this afternoon. I was deploying a > >> > > > > streams application and wanted to delete already existing internal > >> > > > > states data so I ran kafka-streams-application-reset.sh to do > >> it, as > >> > > > > recommended. it wasn't the first time I ran it and it had always > >> worked > >> > > > > before, in staging or in production. > >> > > > > Anyway, I run the command and around 2/3 minutes later we realize > >> a lot > >> > > > > of stuff using the cluster is basically down, unable to fetch or > >> > > > > produce. After investigating logs from the producers and the > >> brokers I > >> > > > > saw that one broker was not responding, despite the process being > >> up. > >> > > It > >> > > > > kept spewing `UnknownTopicOrPartitionException` in the logs, > >> other > >> > > > > brokers were spewing `NotLeaderForPartitionException` regularly. > >> A > >> > > > > zookeeper node logged a lot of this: > >> > > > > > 2017-06-27 15:51:32,897 [myid:2] - INFO [ProcessThread(sid:2 > >> cport:- > >> > > > > > 1)::PrepRequestPr
Re: Requires suggestions for Producer request throttling
Request quotas was just added to 0.11. Does that help in your use case? https://cwiki.apache.org/confluence/display/KAFKA/KIP-124+-+Request+rate+quotas -hans > On Jun 29, 2017, at 12:55 AM, sukumar.np wrote: > > Hi Team, > > > > We are having a Kafka cluster with multiple Topics in it and shared with > multiple services(clients). Each service will have multiple events source > from where they will be pushing messages to Kafka brokers. Once a service > starts producing message at a high rate, it will affect other > services(clients) because it will fill the disk quickly which will bring the > cluster down. So, we want to throttle Producer request once it crosses the > specified threshold(Which can be set on Topic basis, not for each service). > > > > After checking with Quota feature available in Kafka we found that It allows > pushing data to a queue and will keep responses in delay queue(if it requires > getting throttled). If we apply quota for our use case then below problems > can happen: > > a). Since quota observes message rate for a window and starts to throttle > the producer responses, meanwhile all incoming messages will be added to the > queue. It may fill the disks quickly as there are many producers for the same > Topics and will create an outage for Kafka service. > > b). For sync Producers, because of throttling response will be delayed, which > will result in hang the user-thread or app-servers. > > > > So we don't want to go for applying quota for our use case. Can you please > share some suggestions to handle this use-case in our Kafka broker. Like, > before messages get appended to log, it should validate for throttling and if > it requires being throttled. Throttling mechanism should be either slow down > the request rate up to specified time frame or throw some generic exception > from broker side to clients. > > > > Our Kafka setup like, > > Having 3 brokers in a cluster and each Topic has replication factor 3 and > using Kafka-0.10.0.1. > > > > Looking forward to your suggestions. > > > > Thanks > > Sukumar N > > > > >
Requires suggestions for Producer request throttling
Hi Team, We are having a Kafka cluster with multiple Topics in it and shared with multiple services(clients). Each service will have multiple events source from where they will be pushing messages to Kafka brokers. Once a service starts producing message at a high rate, it will affect other services(clients) because it will fill the disk quickly which will bring the cluster down. So, we want to throttle Producer request once it crosses the specified threshold(Which can be set on Topic basis, not for each service). After checking with Quota feature available in Kafka we found that It allows pushing data to a queue and will keep responses in delay queue(if it requires getting throttled). If we apply quota for our use case then below problems can happen: a). Since quota observes message rate for a window and starts to throttle the producer responses, meanwhile all incoming messages will be added to the queue. It may fill the disks quickly as there are many producers for the same Topics and will create an outage for Kafka service. b). For sync Producers, because of throttling response will be delayed, which will result in hang the user-thread or app-servers. So we don't want to go for applying quota for our use case. Can you please share some suggestions to handle this use-case in our Kafka broker. Like, before messages get appended to log, it should validate for throttling and if it requires being throttled. Throttling mechanism should be either slow down the request rate up to specified time frame or throw some generic exception from broker side to clients. Our Kafka setup like, Having 3 brokers in a cluster and each Topic has replication factor 3 and using Kafka-0.10.0.1. Looking forward to your suggestions. Thanks Sukumar N
metric.reporters not working
Hi, I'm trying to use metrics.reporters https://pastebin.com/185HAjq5 but couldn't get any useful values. All I get is 0.0 or -infinity. What is wrong? I'm referencing it like in this library https://github.com/SimpleFinance/kafka-dropwizard-reporter metric.reporters=kafkaReporter.MetricsExtractor Any help is appreciated. Please, guide me through the right channel. Best regards, Ansel
Re: Weird broker lock-up causing an almost global downtime
Hi Vincent, What version of Kafka/Kafka Streams are you running, more specifically when this error occurred? Thanks, Bill On Wed, Jun 28, 2017 at 12:24 PM, Bill Bejeck wrote: > Thanks for the info Vincent. > > -Bill > > On Wed, Jun 28, 2017 at 12:19 PM, Vincent Rischmann > wrote: > >> I'm not sure what exactly to search for, but here are a couple of >> things. >> >> First, looking at the broker 1, there's this: >> https://pastebin.com/raw/XMLpjj7J which I find weird, because I'm sure I >> didn't create those topics manually...which means to me that the streams >> application was still up (despite me thinking it was shut down) and >> therefore it recreated the topics ? Or is there another explanation here >> ? >> >> After that, there is a lot of stuff like this >> https://pastebin.com/raw/7PfRWuZb until 15:51 when there's this >> https://pastebin.com/raw/Pnhs4JT1. Still after that at 15:53 the topics >> are again deleted it seems https://pastebin.com/raw/wTL9PcWJ >> >> The broker with the problem was the broker 5. Here's a piece of logs >> just after I ran the reset script. https://pastebin.com/raw/PgCDZFb8 >> >> After that there are a LOT of UnknownTopicOrPartitionException and then >> this: https://pastebin.com/raw/b7k7yNdQ >> This repeats for a while until 15:50 when it seems the broker thinks >> he's the only one in the cluster ? https://pastebin.com/raw/Qy0TGjnx I >> didn't include everything but it looks like every topic/partition had >> it's ISR shrink. >> >> This goes on for a long time. At 15:57 there's this >> https://pastebin.com/raw/VrTGdXBZ >> After that there a lot of NotLeaderForPartitionException >> https://pastebin.com/raw/PhDQts66 >> >> Then at 16:04 according to the log I killed -9 the process. At 16:13 I >> restarted it. It spent 12 minutes recreating index files and then >> proceeded to create the topics that I tried to delete: >> https://pastebin.com/raw/bz9MYRsJ >> Then just one minute later it deleted the topics >> https://pastebin.com/raw/EQ2Kuwj6 >> >> And after that it operated normally. >> >> Sorry if it's a bit random and unorganized, I hope it helps anyway. I >> can search for some specific things if you'd like. >> >> >> On Wed, Jun 28, 2017, at 12:17 AM, Bill Bejeck wrote: >> > Thanks Vincent. >> > >> > That's a good start for now. >> > >> > If you get a chance to forward some logs that would be great. >> > >> > -Bill >> > >> > On Tue, Jun 27, 2017 at 6:10 PM, Vincent Rischmann >> > wrote: >> > >> > > Sure. >> > > >> > > The application reads a topic of keyless events and based on some >> > > criteria of the event, it creates a new key and uses that for >> > > `selectKey`. >> > > Then I groupByKey and count with 3 differents windows. Each count is >> > > then stored in a database. >> > > >> > > The three windows are tumbling windows: >> > > - 1 minute window, 1 day retention >> > > - 1 hour window, 7 day retention >> > > - 1 day window, 15 days retention >> > > That's basically it for the structure. >> > > The input topic has 64 partitions. >> > > >> > > Tomorrow I can get some logs from Kafka/Zookeeper if that would help. >> > > >> > > On Tue, Jun 27, 2017, at 11:41 PM, Bill Bejeck wrote: >> > > > Hi Vincent, >> > > > >> > > > Thanks for reporting this issue. Could you give us some more >> details >> > > > (number topics, partitions per topic and the structure of your Kafka >> > > > Streams application) so we attempt to reproduce and diagnose the >> issue? >> > > > >> > > > Thanks! >> > > > Bill >> > > > >> > > > On 2017-06-27 14:46 (-0400), Vincent Rischmann >> wrote: >> > > > > Hello. so I had a weird problem this afternoon. I was deploying a >> > > > > streams application and wanted to delete already existing internal >> > > > > states data so I ran kafka-streams-application-reset.sh to do >> it, as >> > > > > recommended. it wasn't the first time I ran it and it had always >> worked >> > > > > before, in staging or in production. >> > > > > Anyway, I run the command and around 2/3 minutes later we realize >> a lot >> > > > > of stuff using the cluster is basically down, unable to fetch or >> > > > > produce. After investigating logs from the producers and the >> brokers I >> > > > > saw that one broker was not responding, despite the process being >> up. >> > > It >> > > > > kept spewing `UnknownTopicOrPartitionException` in the logs, >> other >> > > > > brokers were spewing `NotLeaderForPartitionException` regularly. >> A >> > > > > zookeeper node logged a lot of this: >> > > > > > 2017-06-27 15:51:32,897 [myid:2] - INFO [ProcessThread(sid:2 >> cport:- >> > > > > > 1)::PrepRequestProcessor@649] - Got user-level KeeperException >> when >> > > > > > processing sessionid:0x159cadf860e0089 type:setData >> cxid:0x249af08 >> > > > > > zxid:0xb06b3722e txntype:-1 reqpath:n/a Error >> > > Path:/brokers/topics/event-counter-per-day-store- >> > > > > > repartition/partitions/4/state Error:KeeperErrorCode = >> BadVersion for >> > > > > > /brokers/topics/event-count
RE: don't read message in consumer
Uncomment the line #advertised.listeners with your address & port Create a fresh topic and retry your test. Let us know what happens. KR, On 29 Jun 2017 2:06 pm, "Anton Mushin" wrote: > Thanks for your reply. > > My server.properties: > > broker.id=0 > #delete.topic.enable=true > #advertised.listeners=PLAINTEXT://your.host.name:9092 > num.network.threads=3 > num.io.threads=8 > socket.send.buffer.bytes=102400 > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > log.dirs=/tmp/kafka-logs > num.partitions=1 > num.recovery.threads.per.data.dir=1 > log.retention.hours=168 > log.segment.bytes=1073741824 > log.retention.check.interval.ms=30 > zookeeper.connect=localhost:2181 > zookeeper.connection.timeout.ms=6000 > > Best, > Anton > > -Original Message- > From: M. Manna [mailto:manme...@gmail.com] > Sent: Thursday, June 29, 2017 4:52 PM > To: users@kafka.apache.org > Subject: Re: don't read message in consumer > > Please share your server configuration. How are you advertising the > listeners? > > On 29 Jun 2017 13:44, "Anton Mushin" wrote: > > > Hi everyone, > > I use on kafka_2.11-0.10.1.1, and I'm trying check it work. I have > > Zookeeper and Kafka on one host. > > I'm calling console producer: "kafka-console-producer.sh --broker-list > > 10.0.0.19:9092 --topic test" > > I expect message in consumer. Consumer is calling as: > > "kafka-console-consumer.sh --zookeeper localhost:2181 --topic test > > --from-beginning" > > But I don't get messages in consumer, and don't catch any exceptions. > > If I will call console producer as : "kafka-console-producer.sh > > --broker-list localhost:9092 --topic test" - I'm getting messages in > > consumer. > > > > Problem is what external users can connect only to 10.0.0.19:9092, but > > their messages don't processing. > > > > Could you help me this, please? > > > > Best, > > Anton > > > > >
Re: Kafka Sometimes Fails to Start on Boot
I have tried changing the port to no avail. You can also see in the first email running a: `netstat -tulpn` produces output saying that nothing is using port . Also seeing as how 90% of the time it works it really doesn't seem like other software would sometimes be booted, and sometimes not. On Thu, Jun 29, 2017 at 1:51 AM, Tom Bentley wrote: > Have you tried changing the configured JMX port? After all, it's possible > the conflict is between kafka and some other software running on the same > server. > > On 28 June 2017 at 21:06, Eric Coan wrote: > > > Hello, > > > > > > Unfortunately Kafka does indeed startup and run for a little bit before > > crashing with the above exception, so doing one simple check wouldn't > work. > > I could theoretically keep this script running forever, and constantly > > checking for it being up. However that's really a hacky solution, and I'd > > prefer to not do that if I don't have too. > > > > On Wed, Jun 28, 2017 at 1:43 PM, M. Manna wrote: > > > > > Can you not put a service wrapper for startup? It will attempt a > restart > > if > > > the executable isn't up and running successfully. > > > > > > I am not familiar with Unix side, but in Windows you can use a > powershell > > > to utilise such thing. It's a better approach. > > > > > > Let me know what you think. > > > > > > On 28 Jun 2017 8:34 pm, "Eric Coan" wrote: > > > > > > > I am using the same configuration for all brokers. However, each > broker > > > is > > > > running on a completely separate host (I'm not running all three > > brokers > > > on > > > > the same host). I can get all three running if I manually start kafka > > > > again, however it's just occasionally on boot one fails to start with > > > this > > > > error. > > > > > > > > On Wed, Jun 28, 2017 at 1:25 PM, M. Manna > wrote: > > > > > > > > > Aren't u using the same JMX port for all brokers? I dont think > > it > > > > will > > > > > work for more than 1 broker. > > > > > > > > > > > > > > > > > > > > On 28 Jun 2017 8:22 pm, "Eric Coan" wrote: > > > > > > > > > > > Hey, > > > > > > > > > > > > No worries. I'm starting the brokers with a script yes (that ends > > up > > > > > > generating the command I pasted: > > > > > > > > > > > > ``` > > > > > > > > > > > > KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true > > > > > > -Dcom.sun.management.jmxremote.authenticate=false > > > > > > -Dcom.sun.management.jmxremote.ssl=false > > > -Djava.rmi.server.hostname=$ > > > > > FQDN > > > > > > -Djava.net.preferIPv4Stack=true" JMX_PORT= > > SCALA_VERSION=2.12.2 > > > > > > JAVA_HOME=/usr > > > > > > $KAFKA_INSTALL_PATH//bin/kafka-server-start.sh -daemon > > > > > > $KAFKA_INSTALL_PATH/config/server.properties --override > > > > > > zookeeper.connect="XX.XX.XX.XX:XX" --override broker.id > > ="$broker_id" > > > > > > --override > > > > > > listeners="SSL://$LOCAL_IPV4:9092" --override broker.rack="$AZ" > > > > > > ``` > > > > > > > > > > > > The script beforehand populates the variables such as the FQDN, > the > > > > > broker > > > > > > Id, Zookeeper IPs to connect to, Kafka Install Path, etc. The > > > important > > > > > > part of the command really is: > > > > > > > > > > > > ``` > > > > > > KAFKA_JMX_OPTS="..." JMX_PORT= SCALA_VERSION=2.12.2 > > > JAVA_HOME=/usr > > > > > > $KAFKA_INSTALL_PATH/bin/kafka-server-start.sh -daemon .. > > > > > > ``` > > > > > > > > > > > > On Wed, Jun 28, 2017 at 1:08 PM, M. Manna > > > wrote: > > > > > > > > > > > > > Please forgive my autocorrect options :( > > > > > > > > > > > > > > On 28 Jun 2017 8:06 pm, "M. Manna" wrote: > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > OS is not an issue, I have a 3 broker setup and I have > > experienced > > > > this > > > > > > > too. > > > > > > > > > > > > > > How are toy atarting the brokers? Is this a concurrent start or > > > have > > > > > you > > > > > > > got some startup scriptto bring up all the brokers? > > > > > > > > > > > > > > KR, > > > > > > > > > > > > > > On 28 Jun 2017 6:47 pm, "Eric Coan" > > wrote: > > > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > I've recently been doing research into getting our Kafka > > cluster > > > > > > running > > > > > > > > outside of Mesos (for a couple of reasons). However I'm > > noticing > > > > > about > > > > > > > 10% > > > > > > > > of the time Kafka fails to start on boot (or more accurately > > > > starts, > > > > > > and > > > > > > > > immediately exits). I find it weird since all brokers are > using > > > the > > > > > > exact > > > > > > > > same configuration, on the same OS (Ubuntu 16.04) > > > > > > > > > > > > > > > > There's nothing in my LOG4J directory, however I did find a > > > > singular > > > > > > log > > > > > > > > line within $KAFKA_DIR/logs/kafkaServer.out that shed the > > actual > > > > > light > > > > > > > as > > > > > > > > to why it's failing: > > > > > > > > > > > > > > > > ``` > > > > > > > > Error: Exception thrown by the agent : java.rmi
RE: Kafka logs - server.log & controller.log
All the brokers write to server.log. The broker that happens to be the controller will also write to the controller.log file. -Dave -Original Message- From: karan alang [mailto:karan.al...@gmail.com] Sent: Wednesday, June 28, 2017 6:04 PM To: users@kafka.apache.org Subject: Kafka logs - server.log & controller.log Hi All - in Kafka, i see two different log files - server.log & controller.log files. What specifically gets logged in these log files ? i.e. what are these 2 files meant to log ? This e-mail and any files transmitted with it are confidential, may contain sensitive information, and are intended solely for the use of the individual or entity to whom they are addressed. If you have received this e-mail in error, please notify the sender by reply e-mail immediately and destroy all copies of the e-mail and any attachments.
RE: don't read message in consumer
Thanks for your reply. My server.properties: broker.id=0 #delete.topic.enable=true #advertised.listeners=PLAINTEXT://your.host.name:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=30 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 Best, Anton -Original Message- From: M. Manna [mailto:manme...@gmail.com] Sent: Thursday, June 29, 2017 4:52 PM To: users@kafka.apache.org Subject: Re: don't read message in consumer Please share your server configuration. How are you advertising the listeners? On 29 Jun 2017 13:44, "Anton Mushin" wrote: > Hi everyone, > I use on kafka_2.11-0.10.1.1, and I'm trying check it work. I have > Zookeeper and Kafka on one host. > I'm calling console producer: "kafka-console-producer.sh --broker-list > 10.0.0.19:9092 --topic test" > I expect message in consumer. Consumer is calling as: > "kafka-console-consumer.sh --zookeeper localhost:2181 --topic test > --from-beginning" > But I don't get messages in consumer, and don't catch any exceptions. > If I will call console producer as : "kafka-console-producer.sh > --broker-list localhost:9092 --topic test" - I'm getting messages in > consumer. > > Problem is what external users can connect only to 10.0.0.19:9092, but > their messages don't processing. > > Could you help me this, please? > > Best, > Anton > >
Re: don't read message in consumer
Please share your server configuration. How are you advertising the listeners? On 29 Jun 2017 13:44, "Anton Mushin" wrote: > Hi everyone, > I use on kafka_2.11-0.10.1.1, and I'm trying check it work. I have > Zookeeper and Kafka on one host. > I'm calling console producer: "kafka-console-producer.sh --broker-list > 10.0.0.19:9092 --topic test" > I expect message in consumer. Consumer is calling as: > "kafka-console-consumer.sh --zookeeper localhost:2181 --topic test > --from-beginning" > But I don't get messages in consumer, and don't catch any exceptions. > If I will call console producer as : "kafka-console-producer.sh > --broker-list localhost:9092 --topic test" - I'm getting messages in > consumer. > > Problem is what external users can connect only to 10.0.0.19:9092, but > their messages don't processing. > > Could you help me this, please? > > Best, > Anton > >
don't read message in consumer
Hi everyone, I use on kafka_2.11-0.10.1.1, and I'm trying check it work. I have Zookeeper and Kafka on one host. I'm calling console producer: "kafka-console-producer.sh --broker-list 10.0.0.19:9092 --topic test" I expect message in consumer. Consumer is calling as: "kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning" But I don't get messages in consumer, and don't catch any exceptions. If I will call console producer as : "kafka-console-producer.sh --broker-list localhost:9092 --topic test" - I'm getting messages in consumer. Problem is what external users can connect only to 10.0.0.19:9092, but their messages don't processing. Could you help me this, please? Best, Anton
Re: [DISCUSS] Streams DSL/StateStore Refactoring
I've updated the experimental code with a couple of ways of doing joins. One following the fluent approach and one following the builder approach. The 2 examples can be found here: https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java#L714 The code looks like: @Test public void shouldBeFluentIsh() throws Exception { final KStream one = null; final KStream two = null; final Serde serde = null; final ValueJoiner vj = null; // inner join one.join2(two, vj, JoinWindows.of(10)) .withKeySerde(serde) .withThisValueSerde(serde) .withOtherValueSerde(serde) .stream(); // left join one.join2(two, vj, JoinWindows.of(10)) .withJoinType(JoinType.LEFT) .stream(); } @Test public void shouldUseBuilder() throws Exception { final KStream one = null; final KStream two = null; final Serde serde = null; final ValueJoiner vj = null; //inner one.join(Joins.streamStreamJoin(two, vj, JoinWindows.of(10)).build()); //left one.join(Joins.streamStreamJoin(two, vj, JoinWindows.of(10)).withJoinType(JoinType.LEFT).build()); } I'm not going to say which way i'm leaning, yet! Thanks, Damian On Thu, 29 Jun 2017 at 11:47 Damian Guy wrote: > >> However, I don't understand your argument about putting aggregate() >> after the withXX() -- all the calls to withXX() set optional parameters >> for aggregate() and not for groupBy() -- but a groupBy().withXX() >> indicates that the withXX() belongs to the groupBy(). IMHO, this might >> be quite confusion for developers. >> >> > I see what you are saying, but the grouped stream is effectively a no-op > until you call one of the aggregate/count/reduce etc functions. So the > optional params are ones that are applicable to any of the operations you > can perform on this grouped stream. Then the final > count()/reduce()/aggregate() call has any of the params that are > required/specific to that function. > > >> >> -Matthias >> >> On 6/28/17 2:55 AM, Damian Guy wrote: >> >> I also think that mixing optional parameters with configs is a bad >> idea. >> >> Have not proposal for this atm but just wanted to mention it. Hope to >> >> find some time to come up with something. >> >> >> >> >> > Yes, i don't like the mix of config either. But the only real config >> here >> > is the logging config - which we don't really need as it can already be >> > done via a custom StateStoreSupplier. >> > >> > >> >> What I don't like in the current proposal is the >> >> .grouped().withKeyMapper() -- the current solution with .groupBy(...) >> >> and .groupByKey() seems better. For clarity, we could rename to >> >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find >> >> some better names). >> >> >> >> >> > it could be groupByKey(), groupBy() or something different bt >> > >> > >> > >> >> The proposed pattern "chains" grouping and aggregation too close >> >> together. I would rather separate both more than less, ie, do into the >> >> opposite direction. >> >> >> >> I am also wondering, if we could so something more "fluent". The >> initial >> >> proposal was like: >> >> >> groupedStream.count() >> .withStoreName("name") >> .withCachingEnabled(false) >> .withLoggingEnabled(config) >> .table() >> >> >> >> The .table() statement in the end was kinda alien. >> >> >> > >> > I agree, but then all of the withXXX methods need to be on KTable which >> is >> > worse in my opinion. You also need something that is going to "build" >> the >> > internal processors and add them to the topology. >> > >> > >> >> The current proposal put the count() into the end -- ie, the optional >> >> parameter for count() have to specified on the .grouped() call -- this >> >> does not seems to be the best way either. >> >> >> >> >> > I actually prefer this method as you are building a grouped stream that >> you >> > will aggregate. So table.grouped(...).withOptionalStuff().aggregate(..) >> etc >> > seems natural to me. >> > >> > >> >> I did not think this through in detail, but can't we just do the >> initial >> >> proposal with the .table() ? >> >> >> >> groupedStream.count().withStoreName("name").mapValues(...) >> >> >> >> Each .withXXX(...) return the current KTable and all the .withXXX() are >> >> just added to the KTable interface. Or do I miss anything why this >> wont' >> >> work or any obvious disadvantage? >> >> >> >> >> >> >> > See above. >> > >> > >> >> >> >> -Matthias >> >> >> >> On 6/22/17 4:06 AM, Damian Guy wrote: >> >>> Thanks everyone. My latest attempt is below. It builds on the fluent >> >>> approach, but i think it is slightly nicer. >> >>> I agree with some of what Eno said about mixing configy stuff in the >> DSL, >> >>> but i think that enabling caching and enabling logging are things that >> >>> aren't actually config. I'd probably not add withLogC
Re: mirroring Kafka while preserving the order
I believe so. You need to be careful that the mirror maker producer doesn't reorder messages; in particular if retries > 0 then max.in.flight.requests.per.connection must be 1. If retries=0 then it doesn't matter what max.in.flight.requests.per.connection is. On 29 June 2017 at 05:52, Sunil Parmar wrote: > Is it possible to configure mirror maker using message handler to preserve > the order of messages in each topic partition. In the particular use case > we're looking at both source and destination topics have same number of > partitions. > > Sunil >
Re: Kafka Sometimes Fails to Start on Boot
Have you tried changing the configured JMX port? After all, it's possible the conflict is between kafka and some other software running on the same server. On 28 June 2017 at 21:06, Eric Coan wrote: > Hello, > > > Unfortunately Kafka does indeed startup and run for a little bit before > crashing with the above exception, so doing one simple check wouldn't work. > I could theoretically keep this script running forever, and constantly > checking for it being up. However that's really a hacky solution, and I'd > prefer to not do that if I don't have too. > > On Wed, Jun 28, 2017 at 1:43 PM, M. Manna wrote: > > > Can you not put a service wrapper for startup? It will attempt a restart > if > > the executable isn't up and running successfully. > > > > I am not familiar with Unix side, but in Windows you can use a powershell > > to utilise such thing. It's a better approach. > > > > Let me know what you think. > > > > On 28 Jun 2017 8:34 pm, "Eric Coan" wrote: > > > > > I am using the same configuration for all brokers. However, each broker > > is > > > running on a completely separate host (I'm not running all three > brokers > > on > > > the same host). I can get all three running if I manually start kafka > > > again, however it's just occasionally on boot one fails to start with > > this > > > error. > > > > > > On Wed, Jun 28, 2017 at 1:25 PM, M. Manna wrote: > > > > > > > Aren't u using the same JMX port for all brokers? I dont think > it > > > will > > > > work for more than 1 broker. > > > > > > > > > > > > > > > > On 28 Jun 2017 8:22 pm, "Eric Coan" wrote: > > > > > > > > > Hey, > > > > > > > > > > No worries. I'm starting the brokers with a script yes (that ends > up > > > > > generating the command I pasted: > > > > > > > > > > ``` > > > > > > > > > > KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true > > > > > -Dcom.sun.management.jmxremote.authenticate=false > > > > > -Dcom.sun.management.jmxremote.ssl=false > > -Djava.rmi.server.hostname=$ > > > > FQDN > > > > > -Djava.net.preferIPv4Stack=true" JMX_PORT= > SCALA_VERSION=2.12.2 > > > > > JAVA_HOME=/usr > > > > > $KAFKA_INSTALL_PATH//bin/kafka-server-start.sh -daemon > > > > > $KAFKA_INSTALL_PATH/config/server.properties --override > > > > > zookeeper.connect="XX.XX.XX.XX:XX" --override broker.id > ="$broker_id" > > > > > --override > > > > > listeners="SSL://$LOCAL_IPV4:9092" --override broker.rack="$AZ" > > > > > ``` > > > > > > > > > > The script beforehand populates the variables such as the FQDN, the > > > > broker > > > > > Id, Zookeeper IPs to connect to, Kafka Install Path, etc. The > > important > > > > > part of the command really is: > > > > > > > > > > ``` > > > > > KAFKA_JMX_OPTS="..." JMX_PORT= SCALA_VERSION=2.12.2 > > JAVA_HOME=/usr > > > > > $KAFKA_INSTALL_PATH/bin/kafka-server-start.sh -daemon .. > > > > > ``` > > > > > > > > > > On Wed, Jun 28, 2017 at 1:08 PM, M. Manna > > wrote: > > > > > > > > > > > Please forgive my autocorrect options :( > > > > > > > > > > > > On 28 Jun 2017 8:06 pm, "M. Manna" wrote: > > > > > > > > > > > > Hi, > > > > > > > > > > > > OS is not an issue, I have a 3 broker setup and I have > experienced > > > this > > > > > > too. > > > > > > > > > > > > How are toy atarting the brokers? Is this a concurrent start or > > have > > > > you > > > > > > got some startup scriptto bring up all the brokers? > > > > > > > > > > > > KR, > > > > > > > > > > > > On 28 Jun 2017 6:47 pm, "Eric Coan" > wrote: > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > I've recently been doing research into getting our Kafka > cluster > > > > > running > > > > > > > outside of Mesos (for a couple of reasons). However I'm > noticing > > > > about > > > > > > 10% > > > > > > > of the time Kafka fails to start on boot (or more accurately > > > starts, > > > > > and > > > > > > > immediately exits). I find it weird since all brokers are using > > the > > > > > exact > > > > > > > same configuration, on the same OS (Ubuntu 16.04) > > > > > > > > > > > > > > There's nothing in my LOG4J directory, however I did find a > > > singular > > > > > log > > > > > > > line within $KAFKA_DIR/logs/kafkaServer.out that shed the > actual > > > > light > > > > > > as > > > > > > > to why it's failing: > > > > > > > > > > > > > > ``` > > > > > > > Error: Exception thrown by the agent : java.rmi.server. > > > > > ExportException: > > > > > > > Port already in use: ; nested exception is: > > > > > > > java.net.BindException: Address already in use (Bind > > > failed) > > > > > > > ``` > > > > > > > > > > > > > > However, I can verify nothing is running on this port right > > before > > > > > > > invocation using netstat -tulpn which shows: > > > > > > > > > > > > > > ``` > > > > > > > upstart.sh[1127]: Active Internet connections (only servers) > > > > > > > upstart.sh[1127]: Proto Recv-Q Send-Q Local Address > > > > Foreign > > > > > > > Address State PID/Pr > > > > > > > upstart.sh[1127]: t