Re: RocksDBWindowStore.putInternal does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG?

2016-06-21 Thread Unmesh Joshi
I could reproduce it with following steps.  Adding Stacktrace in the end.

1. Create a stream and consume it without Windowing.

KTable aggregation = locationViews
.map((key, value) -> {
GenericRecord parsedRecord = parse(value);
String parsedKey = parsedRecord.get("region").toString() +
parsedRecord.get("location").toString();
return new KeyValue<>(parsedKey, 1);
}).reduceByKey((v1, v2) -> v1 + v2, Serdes.String(),
Serdes.Integer(), "aggregated");

aggregation.foreach((k, v) -> System.out.println(k + ", " + v));


2. Stop the consumer and then change the code to add windowing.

KTable aggregation = locationViews
.map((key, value) -> {
GenericRecord parsedRecord = parse(value);
String parsedKey = parsedRecord.get("region").toString() +
parsedRecord.get("location").toString();
return new KeyValue<>(parsedKey, 1);
}).reduceByKey((v1, v2) -> v1 + v2,
TimeWindows.of("aggregated", 10), Serdes.String(),
Serdes.Integer());


aggregation.foreach((k, v) -> System.out.println(k + ", " + v));


If I already had a consumer without windowing, it throws following
exception when consumer is run as in step 2.


java.lang.IndexOutOfBoundsException
at java.nio.Buffer.checkIndex(Buffer.java:546)
at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:416)
at
org.apache.kafka.streams.state.internals.WindowStoreUtils.timestampFromBinaryKey(WindowStoreUtils.java:63)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:295)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:44)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:190)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:184)
at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)
at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)

On Wed, Jun 22, 2016 at 1:12 AM, Guozhang Wang  wrote:

> Could you share your stack trace upon failure?
>
>
>
> On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi 
> wrote:
>
> > HI,
> >
> > I am using confluent-3.0.0-2.11, with kafka and streams versions
> > org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and
> > org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem
> seems
> > to be with null keys because the original messages are not produced with
> > keys, and I am creating a key value pair in the map function before
> > aggregating to KTable. The RocksDBWindowStore putInternal is expecting
> the
> > timestamp to be appended to the Key, which was not the case.
> > It somehow corrected itself, once I started producing messages with some
> > non null key.
> >
> > The code is here
> >
> >
> https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java
> >
> >
> > Thanks,
> > Unmesh
> >
> >
> >
> >
> >
> > On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang 
> > wrote:
> >
> > > Hello Unmesh,
> > >
> > > Timestamp extractor is always applied at the beginning of the topology
> > for
> > > each incoming record, and the 

Re: AdminUtils

2016-06-21 Thread Ismael Juma
On Wed, Jun 22, 2016 at 12:32 AM, Chris Barlock  wrote:
>
> What is KIP-4?


https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations


> Until this is available, I think I'm stuck with
> AdminUtils.  Is there any (java)doc available for it, or am I going to
> have to dig through the scala files and figure out what has changed?


You can generate the scaladoc yourself, but it's probably less work to just
look at the file:

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala

Ismael


Re: AdminUtils

2016-06-21 Thread Chris Barlock
Ismael:

What is KIP-4?  Until this is available, I think I'm stuck with 
AdminUtils.  Is there any (java)doc available for it, or am I going to 
have to dig through the scala files and figure out what has changed?  The 
new REST interface is a possibility, but, if I recall, it does not 
support, at least, createTopic. 

Chris

IBM Middleware
Research Triangle Park, NC
(919) 224-2240
Internet:  barl...@us.ibm.com



From:   Ismael Juma 
To: users@kafka.apache.org
Date:   06/21/2016 05:47 PM
Subject:Re: AdminUtils
Sent by:isma...@gmail.com



Hi Chris,

Yes, `AdminUtils` is not public API. The plan is to introduce 
`AdminClient`
as part of KIP-4.

The Kafka protocol additions for `createTopic` and `deleteTopic` are
currently being discussed and it looks like they will be part of the next
Kafka release based on current progress.

The API of `AdminClient` hasn't been discussed yet, but it should be
possible to support the methods you mentioned. Maybe you can provide your
feedback as the KIP-4 discussions happen in kafka-dev?

Ismael

On Tue, Jun 21, 2016 at 10:39 PM, Chris Barlock  
wrote:

> While working on upgrading from 0.8.2.1 to 0.10.0.0, I found out that
> AdminUtils has changed -- and not in a backwards-compatible manner.  I
> gather this is not a public API since I can't find any Javadoc for it. 
So,
> in 0.10.0.0 are there public replacements for:
>
> AdminUtils.fetchTopicMetadataFromZk
> AdminUtils.topicExists
> AdminUtils.createTopic
> AdminUtils.deleteTopic
>
> ?
>
> Thanks,
>
> Chris
>
>
>






Re: AdminUtils

2016-06-21 Thread Ismael Juma
Hi Chris,

Yes, `AdminUtils` is not public API. The plan is to introduce `AdminClient`
as part of KIP-4.

The Kafka protocol additions for `createTopic` and `deleteTopic` are
currently being discussed and it looks like they will be part of the next
Kafka release based on current progress.

The API of `AdminClient` hasn't been discussed yet, but it should be
possible to support the methods you mentioned. Maybe you can provide your
feedback as the KIP-4 discussions happen in kafka-dev?

Ismael

On Tue, Jun 21, 2016 at 10:39 PM, Chris Barlock  wrote:

> While working on upgrading from 0.8.2.1 to 0.10.0.0, I found out that
> AdminUtils has changed -- and not in a backwards-compatible manner.  I
> gather this is not a public API since I can't find any Javadoc for it. So,
> in 0.10.0.0 are there public replacements for:
>
> AdminUtils.fetchTopicMetadataFromZk
> AdminUtils.topicExists
> AdminUtils.createTopic
> AdminUtils.deleteTopic
>
> ?
>
> Thanks,
>
> Chris
>
>
>


0.9 KafkaConsumer Memory Usage

2016-06-21 Thread noah
I'm using 0.9.0.1 consumers on 0.9.0.1 brokers. In a single Java service,
we have 4 producers and 5 consumers. They are all KafkaProducer and
KafkaConsumer instances (the new consumer.)

Since the 0.9 upgrade, this service is now OOMing after a being up for a
few minutes. Heap dumps show >80MB of objects related to topic and
partition metadata (hundreds of thousands of org.apache.kafka.common.Node
objects.)

Digging through the heap, I see references to all sorts of topics in the
cluster that this service is not producing to or consuming from. We are not
using any pattern matching on topics.

Unusual things about this service and cluster:
* Smallish heap (was fine with 128mb on the 0.8 consumer, but even bumping
up to 256mb we still OOM on 0.9)
* We have a large number of dev/test topics on this cluster (several
hundred) and thousands of partitions as a result. The service is only
concerned with ~15 of these topics.

It seems pretty likely that our issue is that metadata for all partitions
is being cached locally. Is there a way to keep that from happening? Or are
there other known memory leak issues that could be causing us to OOM?


AdminUtils

2016-06-21 Thread Chris Barlock
While working on upgrading from 0.8.2.1 to 0.10.0.0, I found out that 
AdminUtils has changed -- and not in a backwards-compatible manner.  I 
gather this is not a public API since I can't find any Javadoc for it. So, 
in 0.10.0.0 are there public replacements for:

AdminUtils.fetchTopicMetadataFromZk
AdminUtils.topicExists
AdminUtils.createTopic
AdminUtils.deleteTopic

?

Thanks,

Chris




Re: RocksDBWindowStore.putInternal does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG?

2016-06-21 Thread Guozhang Wang
Could you share your stack trace upon failure?



On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi 
wrote:

> HI,
>
> I am using confluent-3.0.0-2.11, with kafka and streams versions
> org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and
> org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem seems
> to be with null keys because the original messages are not produced with
> keys, and I am creating a key value pair in the map function before
> aggregating to KTable. The RocksDBWindowStore putInternal is expecting the
> timestamp to be appended to the Key, which was not the case.
> It somehow corrected itself, once I started producing messages with some
> non null key.
>
> The code is here
>
> https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java
>
>
> Thanks,
> Unmesh
>
>
>
>
>
> On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang 
> wrote:
>
> > Hello Unmesh,
> >
> > Timestamp extractor is always applied at the beginning of the topology
> for
> > each incoming record, and the extracted timestamp is carried throughout
> the
> > topology.
> >
> > Could you share your stack trace upon failure with your source code? And
> > what version of Kafka Streams are you using? Some old version of the
> > library requires the key (as for your case, "String parsedKey =
> > parsedRecord.get("region").toString() + parsedRecord.get("location").
> > toString();") to be not null, but it has been resolved in the recent
> fixes.
> >
> > Guozhang
> >
> > On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi 
> > wrote:
> >
> > > Hi,
> > >
> > > I was trying to experiment with Kafka streams, and had following code
> > >
> > > KTable aggregated = locationViews
> > > .map((key, value) -> {
> > > GenericRecord parsedRecord = parse(value);
> > > String parsedKey = parsedRecord.get("region").toString() +
> > > parsedRecord.get("location").toString();
> > > return new KeyValue<>(parsedKey, 1);
> > > }).reduceByKey((v1, v2) -> v1 + v2,
> > > TimeWindows.of("aggregated", 5000), Serdes.String(),
> > > Serdes.Integer());
> > >
> > > This code fails in  RocksDBWindowStore.putInternal, where its trying
> > > to get timestamp from message key.
> > >
> > > I am not producing message with key, so I tried putting
> > > TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
> > >
> > >
> streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > > WallclockTimestampExtractor.class.getName());
> > >
> > > But RocksDBWindowStore, still expects timestamp in the message key and
> > > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
> > >
> > > Is it the case that Kafka message should always have a key if we have
> > > to use Windowing? or this is an issue with RocksDBWindowStore?
> > >
> > >
> > > Thanks,
> > >
> > > Unmesh
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


RE: Error while sending message to Kafka broker on SSL with Kafka 0.10.0.0

2016-06-21 Thread Subhash Agrawal
Now I changed the test producer call like this.
C:\development\kafka\kafka_2.11-0.10.0.0\kafka_2.11-0.10.0.0\bin\windows>.\kafka-console-producer.bat
 --broker-list localhost:8393 --topic test --prod
ucer.config ..\..\config\producer.properties

and updated producer.properties like this
security.protocol=SSL
ssl.truststore.location=C:/test.jks
ssl.truststore.password=test

Now I see this error message.
C:\development\kafka\kafka_2.11-0.10.0.0\kafka_2.11-0.10.0.0\bin\windows>.\kafka-console-producer.bat
 --broker-list localhost:8393 --topic test --prod
ucer.config ..\..\config\producer.properties
[2016-06-21 10:23:34,738] WARN The configuration metadata.broker.list = 
localhost:8393 was supplied but isn't a known config. (org.apache.kafka.client
s.producer.ProducerConfig)

[2016-06-21 10:23:55,180] WARN Failed to send SSL Close message  
(org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: An established connection was aborted by the software in 
your host machine
at sun.nio.ch.SocketDispatcher.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:89)
at sun.nio.ch.IOUtil.write(IOUtil.java:60)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:450)
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690)
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47)
at org.apache.kafka.common.network.Selector.close(Selector.java:471)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:348)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)

Please let me know what I am missing.

Thanks
Subhash Agrawal.

From: Subhash Agrawal
Sent: Monday, June 20, 2016 4:20 PM
To: d...@kafka.apache.org; 'users@kafka.apache.org'
Subject: Error while sending message to Kafka broker on SSL with Kafka 0.10.0.0

Hi All,
I am seeing error while sending message via test producer on SSL port. I am 
able to successfully send message to non-SSL port.

Here is my broker configuration.

listeners=SSL://:9093
security.inter.broker.protocol=SSL
ssl.client.auth=required
ssl.keystore.location=C:/test.jks
ssl.keystore.password=test
ssl.key.password=test
ssl.truststore.location= C:/test.jks
ssl.truststore.password=test

and I see no error while starting up kafka server as per its console output.

 [2016-06-20 15:59:32,627] INFO Registered broker 0 at path /brokers/ids/0 with 
addresses: SSL -> EndPoint(SAGRAWAL-PC.opentext.net,9093,SSL) (kafka.ut
ils.ZkUtils)
[2016-06-20 15:59:32,635] INFO Kafka version : 0.10.0.0 
(org.apache.kafka.common.utils.AppInfoParser)
[2016-06-20 15:59:32,636] INFO Kafka commitId : b8642491e78c5a13 
(org.apache.kafka.common.utils.AppInfoParser)
[2016-06-20 15:59:32,637] INFO [Kafka Server 0], started 
(kafka.server.KafkaServer)

C:\development\kafka\kafka_2.11-0.10.0.0\kafka_2.11-0.10.0.0\bin\windows>.\kafka-topics.bat
 --create --zookeeper localhost:2181 --replication-factor 1
 --partitions 1 --topic test
Created topic "test".

When I want to send message using test producer on SSL port, I keep on seeing 
this warning message.

C:\development\kafka\kafka_2.11-0.10.0.0\kafka_2.11-0.10.0.0\bin\windows>.\kafka-console-producer.bat
 --broker-list localhost:9093 --topic test
aaa
[2016-06-20 16:13:38,565] WARN Bootstrap broker localhost:9093 disconnected 
(org.apache.kafka.clients.NetworkClient)
[2016-06-20 16:13:38,781] WARN Bootstrap broker localhost:9093 disconnected 
(org.apache.kafka.clients.NetworkClient)
[2016-06-20 16:13:39,010] WARN Bootstrap broker localhost:9093 disconnected 
(org.apache.kafka.clients.NetworkClient)
Terminate batch job (Y/N)? y

Any idea what should I  look into?

Thanks
Subhash Agrawal


0.9 client persistently high CPU usage

2016-06-21 Thread Simon Cooper
Hi all,

We've got a problem with high CPU usage on a 0.9 client. We've got a monitoring 
system that polls kafka topics for metadata (to get the last message offset) 
every so often, and this has started using very high CPU continuously. We're 
seeing the following being spammed in the logs every 100ms:

2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | 
Initialize connection to node -1 for sending metadata request
2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | 
Initiating connection to node -1 at :9092.
2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | 
Error connecting to node -1 at :9092:
java.nio.channels.ClosedByInterruptException: null
at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
 ~[na:1.8.0_60-ea]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659) 
~[na:1.8.0_60-ea]
at org.apache.kafka.common.network.Selector.connect(Selector.java:153) 
~[monitoring-collector.jar:na]
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:489) 
[monitoring-collector.jar:na]
at 
org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:47) 
[monitoring-collector.jar:na]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:624)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:543)
 [monitoring-collector.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:254) 
[monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:290)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:272)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1299)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1106)
 [monitoring-collector.jar:na]
...
2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | 
Give up sending metadata request since no node is available

This is a single-broker cluster (id: 1), all on a single machine. There is 
nothing being logged in the broker logs.

Can anyone help work out what is going wrong, and how we could fix it? In 
particular, the '-1' node id is suspicious, but we can't work out where this 
value is coming from.

Thanks,
SimonC


Re: [DISCUSS] Java 8 as a minimum requirement

2016-06-21 Thread Damian Guy
+1

On Tue, 21 Jun 2016 at 09:59 Marcus Gründler 
wrote:

> Hi Ismael,
>
> thanks for the pointer to the latest WebSphere documentation - I wasn’t
> aware
> of that release.
>
> We currently have customers that run our software frontend on an older
> WebSphere version that runs on Java 7 and push data to kafka brokers in the
> backend. Replacing Kafka brokers wouldn’t be an issue here since we are in
> control of the backend part.
>
> The WebSphere server where our frontend part (and therefore our kafka
> client)
> is running is a kind of general infrastructure of that customer and upgrade
> cycles are slow and independent of our release cycles.
>
> Of course that could be solved by a kind of proxy in front of the kafka
> brokers,
> so maybe we shouldn’t pay too much tribute to legacy systems :-)
>
> Regards, Marcus
>
>
> > Am 17.06.2016 um 15:44 schrieb Ismael Juma :
> >
> > Hi Marcus,
> >
> > Thanks for your feedback.
> >
> > With regards to IBM WebSphere, the latest stable release (8.5.5) supports
> > Java 8 according to the documentation:
> >
> > http://www-01.ibm.com/support/docview.wss?uid=swg27005002
> >
> > Having said that, it is fair to discuss servers and clients separately.
> In
> > Kafka, you can't use newer clients with older brokers, but you can use
> > older clients with newer brokers. As such, the scenario we're talking
> about
> > is that of users who can upgrade their brokers and clients to the latest
> > Kafka version, but are stuck with an older version of WebSphere, right?
> Are
> > you aware of such users?
> >
> > Ismael
> > On Fri, Jun 17, 2016 at 10:34 AM, Marcus Gründler <
> > marcus.gruend...@aixigo.de> wrote:
> >
> >> -1
> >> Hi Ismael,
> >>
> >> Although I really like the Java 8 features and understand the advantages
> >> you
> >> mentioned about Java 8 migration, I would suggest to stay with Java 7 as
> >> a minimum requirement for a while.
> >>
> >> I think there are two aspects to consider - Kafka Server and Kafka
> >> clients. On
> >> the server part it would make sense to switch to Java 8 because you can
> run
> >> the broker independently from any enclosing runtime (no JEE server etc.)
> >>
> >> But if you change the requirement for Kafka clients, you would cut Kafka
> >> support for quite a lot of real world deployments that run for example
> on
> >> an IBM WebSphere JEE Server (*sigh*). Since up to today there is no
> >> WebSphere version that supports Java 8.
> >>
> >> And I think a split of Kafka server with Java8 and Kafka client JARs in
> >> Java7
> >> would be too complicated to maintain.
> >>
> >> So my conclusion is - stay with Java 7 for a while.
> >>
> >> Regards, Marcus
> >>
> >>
> >>> Am 16.06.2016 um 22:45 schrieb Ismael Juma :
> >>>
> >>> Hi all,
> >>>
> >>> I would like to start a discussion on making Java 8 a minimum
> requirement
> >>> for Kafka's next feature release (let's say Kafka 0.10.1.0 for now).
> This
> >>> is the first discussion on the topic so the idea is to understand how
> >>> people feel about it. If people feel it's too soon, then we can pick up
> >> the
> >>> conversation again after Kafka 0.10.1.0. If the feedback is mostly
> >>> positive, I will start a vote thread.
> >>>
> >>> Let's start with some dates. Java 7 hasn't received public updates
> since
> >>> April 2015[1], Java 8 was released in March 2014[2] and Java 9 is
> >> scheduled
> >>> to be released in March 2017[3].
> >>>
> >>> The first argument for dropping support for Java 7 is that the last
> >> public
> >>> release by Oracle contains a large number of known security
> >>> vulnerabilities. The effectiveness of Kafka's security features is
> >> reduced
> >>> if the underlying runtime is not itself secure.
> >>>
> >>> The second argument for moving to Java 8 is that it adds a number of
> >>> compelling features:
> >>>
> >>> * Lambda expressions and method references (particularly useful for the
> >>> Kafka Streams DSL)
> >>> * Default methods (very useful for maintaining compatibility when
> adding
> >>> methods to interfaces)
> >>> * java.util.stream (helpful for making collection transformations more
> >>> concise)
> >>> * Lots of improvements to java.util.concurrent (CompletableFuture,
> >>> DoubleAdder, DoubleAccumulator, StampedLock, LongAdder,
> LongAccumulator)
> >>> * Other nice things: SplittableRandom, Optional (and many others I have
> >> not
> >>> mentioned)
> >>>
> >>> The third argument is that it will simplify our testing matrix, we
> won't
> >>> have to test with Java 7 any longer (this is particularly useful for
> >> system
> >>> tests that take hours to run). It will also make it easier to support
> >> Scala
> >>> 2.12, which requires Java 8.
> >>>
> >>> The fourth argument is that many other open-source projects have taken
> >> the
> >>> leap already. Examples are Cassandra[4], Lucene[5], Akka[6], Hadoop
> 3[7],
> >>> Jetty[8], Eclipse[9], IntelliJ[10] and many others[11]. Even Android
> 

How to get earliest valid offset

2016-06-21 Thread Snehalata Nagaje


Hi All, 


We are using kafka 2.10_0.9 new version, but consumer we are using old high 
level and low level api. 

I am trying to fetch earliest valid offset for topic, but it is returning 
latest offset if the data(log) is deleted after certain interval(which is 
configured in server properties) 

in previous version even if data gets deleted , we were able to get original 
earliest valid offset, I am using this code to get the offset 

suppose I have added 2 messages starting from offset 40 and 41, now file is 
deleted and now inserted new message at 42, this will create new log file, but 
when i fetch earlist valid offset I am getting as 42,I was expecting 40 


I am not committing offsets anywhere, I do not want to do that. 

PartitionOffsetRequestInfo partitionOffsetRequestInfo = new 
PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 10 ); 
Map map = new 
HashMap(); 
map.put(topicAndPartition, partitionOffsetRequestInfo); 
OffsetRequest request = new OffsetRequest(map, ( short ) 0 , CLIENT_ID ); 
OffsetResponse startOffsets = simpleConsumer.getOffsetsBefore(request); 

if (startOffsets.offsets(topicName, partition).length > 0 ) { 
long validoffset = startOffsets.offsets(topicName, partition)[ 0 ]; 
simpleConsumer.close(); 
return validoffset; 
} 
simpleConsumer.close(); 


Can you help me on this? 


Thanks, 
snehalata 


Kafka cluster

2016-06-21 Thread Snehalata Nagaje

Hi ,


I want to create kafka cluster for HA.

Do we need to create 3 brokers?, or is it okay if create only 2 we are using 
only 1 partition for every topic, there is no parallelism while fetching data.

Please suggest.

Thanks,
Snehalata


Re: [DISCUSS] Java 8 as a minimum requirement

2016-06-21 Thread Marcus Gründler
Hi Ismael,

thanks for the pointer to the latest WebSphere documentation - I wasn’t aware 
of that release.

We currently have customers that run our software frontend on an older 
WebSphere version that runs on Java 7 and push data to kafka brokers in the 
backend. Replacing Kafka brokers wouldn’t be an issue here since we are in 
control of the backend part.

The WebSphere server where our frontend part (and therefore our kafka client) 
is running is a kind of general infrastructure of that customer and upgrade 
cycles are slow and independent of our release cycles.

Of course that could be solved by a kind of proxy in front of the kafka 
brokers, 
so maybe we shouldn’t pay too much tribute to legacy systems :-)

Regards, Marcus


> Am 17.06.2016 um 15:44 schrieb Ismael Juma :
> 
> Hi Marcus,
> 
> Thanks for your feedback.
> 
> With regards to IBM WebSphere, the latest stable release (8.5.5) supports
> Java 8 according to the documentation:
> 
> http://www-01.ibm.com/support/docview.wss?uid=swg27005002
> 
> Having said that, it is fair to discuss servers and clients separately. In
> Kafka, you can't use newer clients with older brokers, but you can use
> older clients with newer brokers. As such, the scenario we're talking about
> is that of users who can upgrade their brokers and clients to the latest
> Kafka version, but are stuck with an older version of WebSphere, right? Are
> you aware of such users?
> 
> Ismael
> On Fri, Jun 17, 2016 at 10:34 AM, Marcus Gründler <
> marcus.gruend...@aixigo.de> wrote:
> 
>> -1
>> Hi Ismael,
>> 
>> Although I really like the Java 8 features and understand the advantages
>> you
>> mentioned about Java 8 migration, I would suggest to stay with Java 7 as
>> a minimum requirement for a while.
>> 
>> I think there are two aspects to consider - Kafka Server and Kafka
>> clients. On
>> the server part it would make sense to switch to Java 8 because you can run
>> the broker independently from any enclosing runtime (no JEE server etc.)
>> 
>> But if you change the requirement for Kafka clients, you would cut Kafka
>> support for quite a lot of real world deployments that run for example on
>> an IBM WebSphere JEE Server (*sigh*). Since up to today there is no
>> WebSphere version that supports Java 8.
>> 
>> And I think a split of Kafka server with Java8 and Kafka client JARs in
>> Java7
>> would be too complicated to maintain.
>> 
>> So my conclusion is - stay with Java 7 for a while.
>> 
>> Regards, Marcus
>> 
>> 
>>> Am 16.06.2016 um 22:45 schrieb Ismael Juma :
>>> 
>>> Hi all,
>>> 
>>> I would like to start a discussion on making Java 8 a minimum requirement
>>> for Kafka's next feature release (let's say Kafka 0.10.1.0 for now). This
>>> is the first discussion on the topic so the idea is to understand how
>>> people feel about it. If people feel it's too soon, then we can pick up
>> the
>>> conversation again after Kafka 0.10.1.0. If the feedback is mostly
>>> positive, I will start a vote thread.
>>> 
>>> Let's start with some dates. Java 7 hasn't received public updates since
>>> April 2015[1], Java 8 was released in March 2014[2] and Java 9 is
>> scheduled
>>> to be released in March 2017[3].
>>> 
>>> The first argument for dropping support for Java 7 is that the last
>> public
>>> release by Oracle contains a large number of known security
>>> vulnerabilities. The effectiveness of Kafka's security features is
>> reduced
>>> if the underlying runtime is not itself secure.
>>> 
>>> The second argument for moving to Java 8 is that it adds a number of
>>> compelling features:
>>> 
>>> * Lambda expressions and method references (particularly useful for the
>>> Kafka Streams DSL)
>>> * Default methods (very useful for maintaining compatibility when adding
>>> methods to interfaces)
>>> * java.util.stream (helpful for making collection transformations more
>>> concise)
>>> * Lots of improvements to java.util.concurrent (CompletableFuture,
>>> DoubleAdder, DoubleAccumulator, StampedLock, LongAdder, LongAccumulator)
>>> * Other nice things: SplittableRandom, Optional (and many others I have
>> not
>>> mentioned)
>>> 
>>> The third argument is that it will simplify our testing matrix, we won't
>>> have to test with Java 7 any longer (this is particularly useful for
>> system
>>> tests that take hours to run). It will also make it easier to support
>> Scala
>>> 2.12, which requires Java 8.
>>> 
>>> The fourth argument is that many other open-source projects have taken
>> the
>>> leap already. Examples are Cassandra[4], Lucene[5], Akka[6], Hadoop 3[7],
>>> Jetty[8], Eclipse[9], IntelliJ[10] and many others[11]. Even Android will
>>> support Java 8 in the next version (although it will take a while before
>>> most phones will use that version sadly). This reduces (but does not
>>> eliminate) the chance that we would be the first project that would
>> cause a
>>> user to consider a Java upgrade.
>>> 
>>> The main argument for not making the 

Re: RocksDBWindowStore.putInternal does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG?

2016-06-21 Thread Unmesh Joshi
HI,

I am using confluent-3.0.0-2.11, with kafka and streams versions
org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and
org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem seems
to be with null keys because the original messages are not produced with
keys, and I am creating a key value pair in the map function before
aggregating to KTable. The RocksDBWindowStore putInternal is expecting the
timestamp to be appended to the Key, which was not the case.
It somehow corrected itself, once I started producing messages with some
non null key.

The code is here
https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java


Thanks,
Unmesh





On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang  wrote:

> Hello Unmesh,
>
> Timestamp extractor is always applied at the beginning of the topology for
> each incoming record, and the extracted timestamp is carried throughout the
> topology.
>
> Could you share your stack trace upon failure with your source code? And
> what version of Kafka Streams are you using? Some old version of the
> library requires the key (as for your case, "String parsedKey =
> parsedRecord.get("region").toString() + parsedRecord.get("location").
> toString();") to be not null, but it has been resolved in the recent fixes.
>
> Guozhang
>
> On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi 
> wrote:
>
> > Hi,
> >
> > I was trying to experiment with Kafka streams, and had following code
> >
> > KTable aggregated = locationViews
> > .map((key, value) -> {
> > GenericRecord parsedRecord = parse(value);
> > String parsedKey = parsedRecord.get("region").toString() +
> > parsedRecord.get("location").toString();
> > return new KeyValue<>(parsedKey, 1);
> > }).reduceByKey((v1, v2) -> v1 + v2,
> > TimeWindows.of("aggregated", 5000), Serdes.String(),
> > Serdes.Integer());
> >
> > This code fails in  RocksDBWindowStore.putInternal, where its trying
> > to get timestamp from message key.
> >
> > I am not producing message with key, so I tried putting
> > TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
> >
> > streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > WallclockTimestampExtractor.class.getName());
> >
> > But RocksDBWindowStore, still expects timestamp in the message key and
> > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
> >
> > Is it the case that Kafka message should always have a key if we have
> > to use Windowing? or this is an issue with RocksDBWindowStore?
> >
> >
> > Thanks,
> >
> > Unmesh
> >
>
>
>
> --
> -- Guozhang
>


Re: Zookeeper offsets in new consumer

2016-06-21 Thread Gerard Klijs
No, why would you want to store the offsets in zookeeper? One of the
improvements is to not depend on zookeeper for the offsets. And there is
tooling to get the offsets (although the consumer group must exist).

On Mon, Jun 20, 2016 at 10:57 PM Bryan Baugher  wrote:

> Hi everyone,
>
> With the new Kafka consumer[1] is it possible to use zookeeper based offset
> storage?
>
> Bryan
>
> [1] -
>
> http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>