Re: Kafka HDFS Connector
Anybody have any idea on this? Thanks Pari On 20 June 2016 at 14:36, Pariksheet Barapatre wrote: > Hello All, > > I have data coming from sensors into kafka cluster in text format > delimited by comma. > > How to offload this data to Hive periodically from Kafka. I guess, Kafka > Connect should solve my problem but when I checked documentation, examples > have only avro formatted data. Can you please provide some knowledge on > this. > > Many Thanks > Pari >
Re: RocksDBWindowStore.putInternal does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG?
A bit more investigation shows that because logging is always enabled in both RocksDBKeyValueStoreSupplier and RocksDBWindowStoreSupplier, the aggregated key/values get written to a topic in Kafka. RocksDBWindowStore always stores keys with timestamp attached. RocksDBStore stores raw keys. If the aggregate name remains the same and code is changed to use Windowed aggregation, it always expects timestamp attached to the key at startup recovery operation. So if aggregate operation is changed from non window to window in the code, the aggregation logs need to be cleared from Kafka. I deleted the logs from Kafka nodes and all is working just fine. Thanks, Unmesh On Wed, Jun 22, 2016 at 9:29 AM, Unmesh Joshi wrote: > I could reproduce it with following steps. Adding Stacktrace in the end. > > 1. Create a stream and consume it without Windowing. > > KTable aggregation = locationViews > .map((key, value) -> { > GenericRecord parsedRecord = parse(value); > String parsedKey = parsedRecord.get("region").toString() + > parsedRecord.get("location").toString(); > return new KeyValue<>(parsedKey, 1); > }).reduceByKey((v1, v2) -> v1 + v2, Serdes.String(), > Serdes.Integer(), "aggregated"); > > aggregation.foreach((k, v) -> System.out.println(k + ", " + v)); > > > 2. Stop the consumer and then change the code to add windowing. > > KTable, Integer> aggregation = locationViews > .map((key, value) -> { > GenericRecord parsedRecord = parse(value); > String parsedKey = parsedRecord.get("region").toString() + > parsedRecord.get("location").toString(); > return new KeyValue<>(parsedKey, 1); > }).reduceByKey((v1, v2) -> v1 + v2, TimeWindows.of("aggregated", > 10), Serdes.String(), Serdes.Integer()); > > > aggregation.foreach((k, v) -> System.out.println(k + ", " + v)); > > > If I already had a consumer without windowing, it throws following exception > when consumer is run as in step 2. > > > java.lang.IndexOutOfBoundsException > at java.nio.Buffer.checkIndex(Buffer.java:546) > at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:416) > at > org.apache.kafka.streams.state.internals.WindowStoreUtils.timestampFromBinaryKey(WindowStoreUtils.java:63) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:295) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:44) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:190) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:184) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > > On Wed, Jun 22, 2016 at 1:12 AM, Guozhang Wang wrote: > >> Could you share your stack trace upon failure? >> >> >> >> On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi >> wrote: >> >> > HI, >> > >> > I am using confluent-3.0.0-2.11, with kafka and streams versions >> > org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and >> > org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The probl
Re: RocksDBWindowStore.putInternal does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG?
I could reproduce it with following steps. Adding Stacktrace in the end. 1. Create a stream and consume it without Windowing. KTable aggregation = locationViews .map((key, value) -> { GenericRecord parsedRecord = parse(value); String parsedKey = parsedRecord.get("region").toString() + parsedRecord.get("location").toString(); return new KeyValue<>(parsedKey, 1); }).reduceByKey((v1, v2) -> v1 + v2, Serdes.String(), Serdes.Integer(), "aggregated"); aggregation.foreach((k, v) -> System.out.println(k + ", " + v)); 2. Stop the consumer and then change the code to add windowing. KTable, Integer> aggregation = locationViews .map((key, value) -> { GenericRecord parsedRecord = parse(value); String parsedKey = parsedRecord.get("region").toString() + parsedRecord.get("location").toString(); return new KeyValue<>(parsedKey, 1); }).reduceByKey((v1, v2) -> v1 + v2, TimeWindows.of("aggregated", 10), Serdes.String(), Serdes.Integer()); aggregation.foreach((k, v) -> System.out.println(k + ", " + v)); If I already had a consumer without windowing, it throws following exception when consumer is run as in step 2. java.lang.IndexOutOfBoundsException at java.nio.Buffer.checkIndex(Buffer.java:546) at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:416) at org.apache.kafka.streams.state.internals.WindowStoreUtils.timestampFromBinaryKey(WindowStoreUtils.java:63) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:295) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:44) at org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:190) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:184) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) On Wed, Jun 22, 2016 at 1:12 AM, Guozhang Wang wrote: > Could you share your stack trace upon failure? > > > > On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi > wrote: > > > HI, > > > > I am using confluent-3.0.0-2.11, with kafka and streams versions > > org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and > > org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem > seems > > to be with null keys because the original messages are not produced with > > keys, and I am creating a key value pair in the map function before > > aggregating to KTable. The RocksDBWindowStore putInternal is expecting > the > > timestamp to be appended to the Key, which was not the case. > > It somehow corrected itself, once I started producing messages with some > > non null key. > > > > The code is here > > > > > https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java > > > > > > Thanks, > > Unmesh > > > > > > > > > > > > On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang > > wrote: > > > > > Hello Unmesh, > > > > > > Timestamp extractor is always applied at the beginning of the topology > > for > > > each incoming record, and the extracted timestamp is carried throughout > > the > > > topology. > > > > > > Could you share yo
Re: AdminUtils
On Wed, Jun 22, 2016 at 12:32 AM, Chris Barlock wrote: > > What is KIP-4? https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations > Until this is available, I think I'm stuck with > AdminUtils. Is there any (java)doc available for it, or am I going to > have to dig through the scala files and figure out what has changed? You can generate the scaladoc yourself, but it's probably less work to just look at the file: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala Ismael
Re: AdminUtils
Ismael: What is KIP-4? Until this is available, I think I'm stuck with AdminUtils. Is there any (java)doc available for it, or am I going to have to dig through the scala files and figure out what has changed? The new REST interface is a possibility, but, if I recall, it does not support, at least, createTopic. Chris IBM Middleware Research Triangle Park, NC (919) 224-2240 Internet: barl...@us.ibm.com From: Ismael Juma To: users@kafka.apache.org Date: 06/21/2016 05:47 PM Subject:Re: AdminUtils Sent by:isma...@gmail.com Hi Chris, Yes, `AdminUtils` is not public API. The plan is to introduce `AdminClient` as part of KIP-4. The Kafka protocol additions for `createTopic` and `deleteTopic` are currently being discussed and it looks like they will be part of the next Kafka release based on current progress. The API of `AdminClient` hasn't been discussed yet, but it should be possible to support the methods you mentioned. Maybe you can provide your feedback as the KIP-4 discussions happen in kafka-dev? Ismael On Tue, Jun 21, 2016 at 10:39 PM, Chris Barlock wrote: > While working on upgrading from 0.8.2.1 to 0.10.0.0, I found out that > AdminUtils has changed -- and not in a backwards-compatible manner. I > gather this is not a public API since I can't find any Javadoc for it. So, > in 0.10.0.0 are there public replacements for: > > AdminUtils.fetchTopicMetadataFromZk > AdminUtils.topicExists > AdminUtils.createTopic > AdminUtils.deleteTopic > > ? > > Thanks, > > Chris > > >
Re: AdminUtils
Hi Chris, Yes, `AdminUtils` is not public API. The plan is to introduce `AdminClient` as part of KIP-4. The Kafka protocol additions for `createTopic` and `deleteTopic` are currently being discussed and it looks like they will be part of the next Kafka release based on current progress. The API of `AdminClient` hasn't been discussed yet, but it should be possible to support the methods you mentioned. Maybe you can provide your feedback as the KIP-4 discussions happen in kafka-dev? Ismael On Tue, Jun 21, 2016 at 10:39 PM, Chris Barlock wrote: > While working on upgrading from 0.8.2.1 to 0.10.0.0, I found out that > AdminUtils has changed -- and not in a backwards-compatible manner. I > gather this is not a public API since I can't find any Javadoc for it. So, > in 0.10.0.0 are there public replacements for: > > AdminUtils.fetchTopicMetadataFromZk > AdminUtils.topicExists > AdminUtils.createTopic > AdminUtils.deleteTopic > > ? > > Thanks, > > Chris > > >
0.9 KafkaConsumer Memory Usage
I'm using 0.9.0.1 consumers on 0.9.0.1 brokers. In a single Java service, we have 4 producers and 5 consumers. They are all KafkaProducer and KafkaConsumer instances (the new consumer.) Since the 0.9 upgrade, this service is now OOMing after a being up for a few minutes. Heap dumps show >80MB of objects related to topic and partition metadata (hundreds of thousands of org.apache.kafka.common.Node objects.) Digging through the heap, I see references to all sorts of topics in the cluster that this service is not producing to or consuming from. We are not using any pattern matching on topics. Unusual things about this service and cluster: * Smallish heap (was fine with 128mb on the 0.8 consumer, but even bumping up to 256mb we still OOM on 0.9) * We have a large number of dev/test topics on this cluster (several hundred) and thousands of partitions as a result. The service is only concerned with ~15 of these topics. It seems pretty likely that our issue is that metadata for all partitions is being cached locally. Is there a way to keep that from happening? Or are there other known memory leak issues that could be causing us to OOM?
AdminUtils
While working on upgrading from 0.8.2.1 to 0.10.0.0, I found out that AdminUtils has changed -- and not in a backwards-compatible manner. I gather this is not a public API since I can't find any Javadoc for it. So, in 0.10.0.0 are there public replacements for: AdminUtils.fetchTopicMetadataFromZk AdminUtils.topicExists AdminUtils.createTopic AdminUtils.deleteTopic ? Thanks, Chris
Re: RocksDBWindowStore.putInternal does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG?
Could you share your stack trace upon failure? On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi wrote: > HI, > > I am using confluent-3.0.0-2.11, with kafka and streams versions > org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and > org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem seems > to be with null keys because the original messages are not produced with > keys, and I am creating a key value pair in the map function before > aggregating to KTable. The RocksDBWindowStore putInternal is expecting the > timestamp to be appended to the Key, which was not the case. > It somehow corrected itself, once I started producing messages with some > non null key. > > The code is here > > https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java > > > Thanks, > Unmesh > > > > > > On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang > wrote: > > > Hello Unmesh, > > > > Timestamp extractor is always applied at the beginning of the topology > for > > each incoming record, and the extracted timestamp is carried throughout > the > > topology. > > > > Could you share your stack trace upon failure with your source code? And > > what version of Kafka Streams are you using? Some old version of the > > library requires the key (as for your case, "String parsedKey = > > parsedRecord.get("region").toString() + parsedRecord.get("location"). > > toString();") to be not null, but it has been resolved in the recent > fixes. > > > > Guozhang > > > > On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi > > wrote: > > > > > Hi, > > > > > > I was trying to experiment with Kafka streams, and had following code > > > > > > KTable, Integer> aggregated = locationViews > > > .map((key, value) -> { > > > GenericRecord parsedRecord = parse(value); > > > String parsedKey = parsedRecord.get("region").toString() + > > > parsedRecord.get("location").toString(); > > > return new KeyValue<>(parsedKey, 1); > > > }).reduceByKey((v1, v2) -> v1 + v2, > > > TimeWindows.of("aggregated", 5000), Serdes.String(), > > > Serdes.Integer()); > > > > > > This code fails in RocksDBWindowStore.putInternal, where its trying > > > to get timestamp from message key. > > > > > > I am not producing message with key, so I tried putting > > > TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > > > > > > streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > > > WallclockTimestampExtractor.class.getName()); > > > > > > But RocksDBWindowStore, still expects timestamp in the message key and > > > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > > > > > Is it the case that Kafka message should always have a key if we have > > > to use Windowing? or this is an issue with RocksDBWindowStore? > > > > > > > > > Thanks, > > > > > > Unmesh > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang
RE: Error while sending message to Kafka broker on SSL with Kafka 0.10.0.0
Now I changed the test producer call like this. C:\development\kafka\kafka_2.11-0.10.0.0\kafka_2.11-0.10.0.0\bin\windows>.\kafka-console-producer.bat --broker-list localhost:8393 --topic test --prod ucer.config ..\..\config\producer.properties and updated producer.properties like this security.protocol=SSL ssl.truststore.location=C:/test.jks ssl.truststore.password=test Now I see this error message. C:\development\kafka\kafka_2.11-0.10.0.0\kafka_2.11-0.10.0.0\bin\windows>.\kafka-console-producer.bat --broker-list localhost:8393 --topic test --prod ucer.config ..\..\config\producer.properties [2016-06-21 10:23:34,738] WARN The configuration metadata.broker.list = localhost:8393 was supplied but isn't a known config. (org.apache.kafka.client s.producer.ProducerConfig) [2016-06-21 10:23:55,180] WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer) java.io.IOException: An established connection was aborted by the software in your host machine at sun.nio.ch.SocketDispatcher.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:51) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:89) at sun.nio.ch.IOUtil.write(IOUtil.java:60) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:450) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47) at org.apache.kafka.common.network.Selector.close(Selector.java:471) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:348) at org.apache.kafka.common.network.Selector.poll(Selector.java:283) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) Please let me know what I am missing. Thanks Subhash Agrawal. From: Subhash Agrawal Sent: Monday, June 20, 2016 4:20 PM To: d...@kafka.apache.org; 'users@kafka.apache.org' Subject: Error while sending message to Kafka broker on SSL with Kafka 0.10.0.0 Hi All, I am seeing error while sending message via test producer on SSL port. I am able to successfully send message to non-SSL port. Here is my broker configuration. listeners=SSL://:9093 security.inter.broker.protocol=SSL ssl.client.auth=required ssl.keystore.location=C:/test.jks ssl.keystore.password=test ssl.key.password=test ssl.truststore.location= C:/test.jks ssl.truststore.password=test and I see no error while starting up kafka server as per its console output. [2016-06-20 15:59:32,627] INFO Registered broker 0 at path /brokers/ids/0 with addresses: SSL -> EndPoint(SAGRAWAL-PC.opentext.net,9093,SSL) (kafka.ut ils.ZkUtils) [2016-06-20 15:59:32,635] INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser) [2016-06-20 15:59:32,636] INFO Kafka commitId : b8642491e78c5a13 (org.apache.kafka.common.utils.AppInfoParser) [2016-06-20 15:59:32,637] INFO [Kafka Server 0], started (kafka.server.KafkaServer) C:\development\kafka\kafka_2.11-0.10.0.0\kafka_2.11-0.10.0.0\bin\windows>.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test". When I want to send message using test producer on SSL port, I keep on seeing this warning message. C:\development\kafka\kafka_2.11-0.10.0.0\kafka_2.11-0.10.0.0\bin\windows>.\kafka-console-producer.bat --broker-list localhost:9093 --topic test aaa [2016-06-20 16:13:38,565] WARN Bootstrap broker localhost:9093 disconnected (org.apache.kafka.clients.NetworkClient) [2016-06-20 16:13:38,781] WARN Bootstrap broker localhost:9093 disconnected (org.apache.kafka.clients.NetworkClient) [2016-06-20 16:13:39,010] WARN Bootstrap broker localhost:9093 disconnected (org.apache.kafka.clients.NetworkClient) Terminate batch job (Y/N)? y Any idea what should I look into? Thanks Subhash Agrawal
0.9 client persistently high CPU usage
Hi all, We've got a problem with high CPU usage on a 0.9 client. We've got a monitoring system that polls kafka topics for metadata (to get the last message offset) every so often, and this has started using very high CPU continuously. We're seeing the following being spammed in the logs every 100ms: 2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | Initialize connection to node -1 for sending metadata request 2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | Initiating connection to node -1 at :9092. 2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | Error connecting to node -1 at :9092: java.nio.channels.ClosedByInterruptException: null at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) ~[na:1.8.0_60-ea] at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659) ~[na:1.8.0_60-ea] at org.apache.kafka.common.network.Selector.connect(Selector.java:153) ~[monitoring-collector.jar:na] at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:489) [monitoring-collector.jar:na] at org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:47) [monitoring-collector.jar:na] at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:624) [monitoring-collector.jar:na] at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:543) [monitoring-collector.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:254) [monitoring-collector.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) [monitoring-collector.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) [monitoring-collector.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) [monitoring-collector.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) [monitoring-collector.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184) [monitoring-collector.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:290) [monitoring-collector.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:272) [monitoring-collector.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1299) [monitoring-collector.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1106) [monitoring-collector.jar:na] ... 2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | Give up sending metadata request since no node is available This is a single-broker cluster (id: 1), all on a single machine. There is nothing being logged in the broker logs. Can anyone help work out what is going wrong, and how we could fix it? In particular, the '-1' node id is suspicious, but we can't work out where this value is coming from. Thanks, SimonC
Re: [DISCUSS] Java 8 as a minimum requirement
+1 On Tue, 21 Jun 2016 at 09:59 Marcus Gründler wrote: > Hi Ismael, > > thanks for the pointer to the latest WebSphere documentation - I wasn’t > aware > of that release. > > We currently have customers that run our software frontend on an older > WebSphere version that runs on Java 7 and push data to kafka brokers in the > backend. Replacing Kafka brokers wouldn’t be an issue here since we are in > control of the backend part. > > The WebSphere server where our frontend part (and therefore our kafka > client) > is running is a kind of general infrastructure of that customer and upgrade > cycles are slow and independent of our release cycles. > > Of course that could be solved by a kind of proxy in front of the kafka > brokers, > so maybe we shouldn’t pay too much tribute to legacy systems :-) > > Regards, Marcus > > > > Am 17.06.2016 um 15:44 schrieb Ismael Juma : > > > > Hi Marcus, > > > > Thanks for your feedback. > > > > With regards to IBM WebSphere, the latest stable release (8.5.5) supports > > Java 8 according to the documentation: > > > > http://www-01.ibm.com/support/docview.wss?uid=swg27005002 > > > > Having said that, it is fair to discuss servers and clients separately. > In > > Kafka, you can't use newer clients with older brokers, but you can use > > older clients with newer brokers. As such, the scenario we're talking > about > > is that of users who can upgrade their brokers and clients to the latest > > Kafka version, but are stuck with an older version of WebSphere, right? > Are > > you aware of such users? > > > > Ismael > > On Fri, Jun 17, 2016 at 10:34 AM, Marcus Gründler < > > marcus.gruend...@aixigo.de> wrote: > > > >> -1 > >> Hi Ismael, > >> > >> Although I really like the Java 8 features and understand the advantages > >> you > >> mentioned about Java 8 migration, I would suggest to stay with Java 7 as > >> a minimum requirement for a while. > >> > >> I think there are two aspects to consider - Kafka Server and Kafka > >> clients. On > >> the server part it would make sense to switch to Java 8 because you can > run > >> the broker independently from any enclosing runtime (no JEE server etc.) > >> > >> But if you change the requirement for Kafka clients, you would cut Kafka > >> support for quite a lot of real world deployments that run for example > on > >> an IBM WebSphere JEE Server (*sigh*). Since up to today there is no > >> WebSphere version that supports Java 8. > >> > >> And I think a split of Kafka server with Java8 and Kafka client JARs in > >> Java7 > >> would be too complicated to maintain. > >> > >> So my conclusion is - stay with Java 7 for a while. > >> > >> Regards, Marcus > >> > >> > >>> Am 16.06.2016 um 22:45 schrieb Ismael Juma : > >>> > >>> Hi all, > >>> > >>> I would like to start a discussion on making Java 8 a minimum > requirement > >>> for Kafka's next feature release (let's say Kafka 0.10.1.0 for now). > This > >>> is the first discussion on the topic so the idea is to understand how > >>> people feel about it. If people feel it's too soon, then we can pick up > >> the > >>> conversation again after Kafka 0.10.1.0. If the feedback is mostly > >>> positive, I will start a vote thread. > >>> > >>> Let's start with some dates. Java 7 hasn't received public updates > since > >>> April 2015[1], Java 8 was released in March 2014[2] and Java 9 is > >> scheduled > >>> to be released in March 2017[3]. > >>> > >>> The first argument for dropping support for Java 7 is that the last > >> public > >>> release by Oracle contains a large number of known security > >>> vulnerabilities. The effectiveness of Kafka's security features is > >> reduced > >>> if the underlying runtime is not itself secure. > >>> > >>> The second argument for moving to Java 8 is that it adds a number of > >>> compelling features: > >>> > >>> * Lambda expressions and method references (particularly useful for the > >>> Kafka Streams DSL) > >>> * Default methods (very useful for maintaining compatibility when > adding > >>> methods to interfaces) > >>> * java.util.stream (helpful for making collection transformations more > >>> concise) > >>> * Lots of improvements to java.util.concurrent (CompletableFuture, > >>> DoubleAdder, DoubleAccumulator, StampedLock, LongAdder, > LongAccumulator) > >>> * Other nice things: SplittableRandom, Optional (and many others I have > >> not > >>> mentioned) > >>> > >>> The third argument is that it will simplify our testing matrix, we > won't > >>> have to test with Java 7 any longer (this is particularly useful for > >> system > >>> tests that take hours to run). It will also make it easier to support > >> Scala > >>> 2.12, which requires Java 8. > >>> > >>> The fourth argument is that many other open-source projects have taken > >> the > >>> leap already. Examples are Cassandra[4], Lucene[5], Akka[6], Hadoop > 3[7], > >>> Jetty[8], Eclipse[9], IntelliJ[10] and many others[11]. Even Android > will > >>> support Java 8 in the next version (although it will take
Re: Zookeeper offsets in new consumer
We have a lot of tooling thats still dependent on offsets being in zookeeper but we were hoping to upgrade to the new consumer to solve another issue and would prefer not have to do both at the same time. On Tue, Jun 21, 2016 at 1:17 AM Gerard Klijs wrote: > No, why would you want to store the offsets in zookeeper? One of the > improvements is to not depend on zookeeper for the offsets. And there is > tooling to get the offsets (although the consumer group must exist). > > On Mon, Jun 20, 2016 at 10:57 PM Bryan Baugher wrote: > > > Hi everyone, > > > > With the new Kafka consumer[1] is it possible to use zookeeper based > offset > > storage? > > > > Bryan > > > > [1] - > > > > > http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html > > >
How to get earliest valid offset
Hi All, We are using kafka 2.10_0.9 new version, but consumer we are using old high level and low level api. I am trying to fetch earliest valid offset for topic, but it is returning latest offset if the data(log) is deleted after certain interval(which is configured in server properties) in previous version even if data gets deleted , we were able to get original earliest valid offset, I am using this code to get the offset suppose I have added 2 messages starting from offset 40 and 41, now file is deleted and now inserted new message at 42, this will create new log file, but when i fetch earlist valid offset I am getting as 42,I was expecting 40 I am not committing offsets anywhere, I do not want to do that. PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 10 ); Map map = new HashMap(); map.put(topicAndPartition, partitionOffsetRequestInfo); OffsetRequest request = new OffsetRequest(map, ( short ) 0 , CLIENT_ID ); OffsetResponse startOffsets = simpleConsumer.getOffsetsBefore(request); if (startOffsets.offsets(topicName, partition).length > 0 ) { long validoffset = startOffsets.offsets(topicName, partition)[ 0 ]; simpleConsumer.close(); return validoffset; } simpleConsumer.close(); Can you help me on this? Thanks, snehalata
Kafka cluster
Hi , I want to create kafka cluster for HA. Do we need to create 3 brokers?, or is it okay if create only 2 we are using only 1 partition for every topic, there is no parallelism while fetching data. Please suggest. Thanks, Snehalata
Re: [DISCUSS] Java 8 as a minimum requirement
Hi Ismael, thanks for the pointer to the latest WebSphere documentation - I wasn’t aware of that release. We currently have customers that run our software frontend on an older WebSphere version that runs on Java 7 and push data to kafka brokers in the backend. Replacing Kafka brokers wouldn’t be an issue here since we are in control of the backend part. The WebSphere server where our frontend part (and therefore our kafka client) is running is a kind of general infrastructure of that customer and upgrade cycles are slow and independent of our release cycles. Of course that could be solved by a kind of proxy in front of the kafka brokers, so maybe we shouldn’t pay too much tribute to legacy systems :-) Regards, Marcus > Am 17.06.2016 um 15:44 schrieb Ismael Juma : > > Hi Marcus, > > Thanks for your feedback. > > With regards to IBM WebSphere, the latest stable release (8.5.5) supports > Java 8 according to the documentation: > > http://www-01.ibm.com/support/docview.wss?uid=swg27005002 > > Having said that, it is fair to discuss servers and clients separately. In > Kafka, you can't use newer clients with older brokers, but you can use > older clients with newer brokers. As such, the scenario we're talking about > is that of users who can upgrade their brokers and clients to the latest > Kafka version, but are stuck with an older version of WebSphere, right? Are > you aware of such users? > > Ismael > On Fri, Jun 17, 2016 at 10:34 AM, Marcus Gründler < > marcus.gruend...@aixigo.de> wrote: > >> -1 >> Hi Ismael, >> >> Although I really like the Java 8 features and understand the advantages >> you >> mentioned about Java 8 migration, I would suggest to stay with Java 7 as >> a minimum requirement for a while. >> >> I think there are two aspects to consider - Kafka Server and Kafka >> clients. On >> the server part it would make sense to switch to Java 8 because you can run >> the broker independently from any enclosing runtime (no JEE server etc.) >> >> But if you change the requirement for Kafka clients, you would cut Kafka >> support for quite a lot of real world deployments that run for example on >> an IBM WebSphere JEE Server (*sigh*). Since up to today there is no >> WebSphere version that supports Java 8. >> >> And I think a split of Kafka server with Java8 and Kafka client JARs in >> Java7 >> would be too complicated to maintain. >> >> So my conclusion is - stay with Java 7 for a while. >> >> Regards, Marcus >> >> >>> Am 16.06.2016 um 22:45 schrieb Ismael Juma : >>> >>> Hi all, >>> >>> I would like to start a discussion on making Java 8 a minimum requirement >>> for Kafka's next feature release (let's say Kafka 0.10.1.0 for now). This >>> is the first discussion on the topic so the idea is to understand how >>> people feel about it. If people feel it's too soon, then we can pick up >> the >>> conversation again after Kafka 0.10.1.0. If the feedback is mostly >>> positive, I will start a vote thread. >>> >>> Let's start with some dates. Java 7 hasn't received public updates since >>> April 2015[1], Java 8 was released in March 2014[2] and Java 9 is >> scheduled >>> to be released in March 2017[3]. >>> >>> The first argument for dropping support for Java 7 is that the last >> public >>> release by Oracle contains a large number of known security >>> vulnerabilities. The effectiveness of Kafka's security features is >> reduced >>> if the underlying runtime is not itself secure. >>> >>> The second argument for moving to Java 8 is that it adds a number of >>> compelling features: >>> >>> * Lambda expressions and method references (particularly useful for the >>> Kafka Streams DSL) >>> * Default methods (very useful for maintaining compatibility when adding >>> methods to interfaces) >>> * java.util.stream (helpful for making collection transformations more >>> concise) >>> * Lots of improvements to java.util.concurrent (CompletableFuture, >>> DoubleAdder, DoubleAccumulator, StampedLock, LongAdder, LongAccumulator) >>> * Other nice things: SplittableRandom, Optional (and many others I have >> not >>> mentioned) >>> >>> The third argument is that it will simplify our testing matrix, we won't >>> have to test with Java 7 any longer (this is particularly useful for >> system >>> tests that take hours to run). It will also make it easier to support >> Scala >>> 2.12, which requires Java 8. >>> >>> The fourth argument is that many other open-source projects have taken >> the >>> leap already. Examples are Cassandra[4], Lucene[5], Akka[6], Hadoop 3[7], >>> Jetty[8], Eclipse[9], IntelliJ[10] and many others[11]. Even Android will >>> support Java 8 in the next version (although it will take a while before >>> most phones will use that version sadly). This reduces (but does not >>> eliminate) the chance that we would be the first project that would >> cause a >>> user to consider a Java upgrade. >>> >>> The main argument for not making the change is that a reasonable number >>
Re: RocksDBWindowStore.putInternal does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG?
HI, I am using confluent-3.0.0-2.11, with kafka and streams versions org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem seems to be with null keys because the original messages are not produced with keys, and I am creating a key value pair in the map function before aggregating to KTable. The RocksDBWindowStore putInternal is expecting the timestamp to be appended to the Key, which was not the case. It somehow corrected itself, once I started producing messages with some non null key. The code is here https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java Thanks, Unmesh On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang wrote: > Hello Unmesh, > > Timestamp extractor is always applied at the beginning of the topology for > each incoming record, and the extracted timestamp is carried throughout the > topology. > > Could you share your stack trace upon failure with your source code? And > what version of Kafka Streams are you using? Some old version of the > library requires the key (as for your case, "String parsedKey = > parsedRecord.get("region").toString() + parsedRecord.get("location"). > toString();") to be not null, but it has been resolved in the recent fixes. > > Guozhang > > On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi > wrote: > > > Hi, > > > > I was trying to experiment with Kafka streams, and had following code > > > > KTable, Integer> aggregated = locationViews > > .map((key, value) -> { > > GenericRecord parsedRecord = parse(value); > > String parsedKey = parsedRecord.get("region").toString() + > > parsedRecord.get("location").toString(); > > return new KeyValue<>(parsedKey, 1); > > }).reduceByKey((v1, v2) -> v1 + v2, > > TimeWindows.of("aggregated", 5000), Serdes.String(), > > Serdes.Integer()); > > > > This code fails in RocksDBWindowStore.putInternal, where its trying > > to get timestamp from message key. > > > > I am not producing message with key, so I tried putting > > TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > > > streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > > WallclockTimestampExtractor.class.getName()); > > > > But RocksDBWindowStore, still expects timestamp in the message key and > > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > > > Is it the case that Kafka message should always have a key if we have > > to use Windowing? or this is an issue with RocksDBWindowStore? > > > > > > Thanks, > > > > Unmesh > > > > > > -- > -- Guozhang >