RocksDB error

2017-08-13 Thread Sameer Kumar
Exception while doing the join, cant decipher more on this. Has anyone
faced it. complete exception trace attached.

2017-08-14 11:15:55 ERROR ConsumerCoordinator:269 - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
for group c-7-a34 failed on partition assignment
org.apache.kafka.streams.errors.ProcessorStateException: Error opening
store KSTREAM-JOINTHIS-18-store-201708140520 at location
/data/streampoc/c-7-a34/0_4/KSTREAM-JOINTHIS-18-store/KSTREAM-JOINTHIS-18-store-201708140520
at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:198)
at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:165)
at
org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40)
at
org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:86)
at
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81)
at
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore$1.restore(RocksDBSegmentedBytesStore.java:113)
at
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:55)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:216)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:186)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:151)
at
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.rocksdb.RocksDBException:
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:231)
at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:191)
... 19 more

-sameer.
2017-08-14 11:15:55 ERROR ConsumerCoordinator:269 - User provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for 
group c-7-a34 failed on partition assignment
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
KSTREAM-JOINTHIS-18-store-201708140520 at location 
/data/streampoc/c-7-a34/0_4/KSTREAM-JOINTHIS-18-store/KSTREAM-JOINTHIS-18-store-201708140520
at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:198)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:165)
at 
org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40)
at 
org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:86)
at 
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81)
at 
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore$1.restore(RocksDBSegmentedBytesStore.java:113)
at 
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:55)
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:216)
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:186)
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:151)
at 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensure

Synchronized methods in RockSB key store

2017-08-13 Thread Sameer Kumar
Hi,

I was inspecting RocksDBStore and I saw get and put methods being
synchronized.

Are there any issues in RocksDB due to which we have used it. Could someone
please enlighten more on this.

 @Override
public synchronized V get(K key) {
validateStoreOpen();
byte[] byteValue = getInternal(serdes.rawKey(key));
if (byteValue == null) {
return null;
} else {
return serdes.valueFrom(byteValue);
}
}

@SuppressWarnings("unchecked")
@Override
public synchronized void put(K key, V value) {
validateStoreOpen();
byte[] rawKey = serdes.rawKey(key);
byte[] rawValue = serdes.rawValue(value);
putInternal(rawKey, rawValue);
}

-Sameer.


Kafka 0.11.0.0

2017-08-13 Thread Srikanth Sampath
Hi,
We are currently on 0.8.2.2 in production and are planning on moving to the
latest stable release which currently is 0.11.0.0.  Have a few questions:

* Folks who are on 0.11.0.0 in production - can you comment on how stable
is it?  Any problems?

* Usually, I look for a maintenance release, is one planned for 0.11.x?  If
so, is 0.11.0.1 or such planned - and when?

* How is 0.10.2.1 in comparison?   Why would you recommend me or not to
move to 0.11.0.0 on a practical note?

Thanks a bunch in advance.
-Srikanth


Re: Shooting for microsecond latency between a Kafka producer and a Kafka consumer

2017-08-13 Thread Chao Wang

Hi Viktor,

Thanks! I didn't know that. In my case, I was just sending topics 
carrying byte arrays of size 1--512B.


Chao

On 08/11/2017 10:03 AM, Viktor Somogyi wrote:

Hi Chao,

Apart from the actual request, the consumer sends metadata requests as
well. What is actually going through the network when you get these
latencies?
Try increasing the metadata max age with metadata.max.age.ms.

(Disclaimer: I myself never ran any benchmarking, just trying to help :). )

Viktor

On Mon, Aug 7, 2017 at 7:37 PM, Chao Wang  wrote:


Thanks, David. I was trying to do Kafka pub/sub on a local, closed
network. In my case, I observed microsecond-latency with bare-bone sockets,
and I would like to know how to configure Kafka to achieve similar result;
if it turned out to be infeasible, what might be the cause of the
additional latency?

Thanks,

Chao



On 08/07/2017 12:10 PM, David Garcia wrote:


You are not going to get that kind of latency (i.e. less than 100
microseconds).  In my experience, consumer->producer latency averages
around: 20 milliseconds (cluster is in AWS with enhanced networking).

On 8/3/17, 2:32 PM, "Chao Wang"  wrote:

  Hi,
   I observed that it took 2-6 milliseconds for a topic to be
received by a
  Kafka consumer from a Kafka producer, and I wonder what I might be
  missing or I was wrong in configuring Kafka for low latency
(targeting
  at < 100 microseconds). I did the following:
   1. On the broker, I tried to prevent frequent flush of data to
disk
  (log.flush.interval.messages=10)
   2. On the producer, I tried to reduce the delay by setting
batch.size=0,
  linger.ms=0, acks =0, and I invoked flush() right after send()
   3. On the consumer, I set poll(0) (i.e., fetch every data once
its
  available?)
   I got similar observation (millisecond latency) in varying
value size
  from 1 to 512B, and also similar results when either colocating
  producer/consumer or putting them on separate PCs (connecting by a
  switch). As a verification, I implemented simple C/C++ sockets for
  transmission and observed latencies no more than 100 microseconds.
   Thanks,
   Chao








Re: [kafka streams] discuss: dynamically update subscription pattern

2017-08-13 Thread Guozhang Wang
Hello Bart,

Before we talk about dynamic subscription in Streams I'd like to ask some
more questions about your usage.

Let's say the current running application is reading from topic A, and have
just reached offset 100; now from the global state there is a notification
saying "stop on topic A, start on topic B". How would you start fetching
from topic B? Would you start at the earliest offset or latest offset (so
it is totally irrelevant to the point you stopped at topic A), or would you
start at offset 100? In addition, if your application maintains some local
state, how would the state be affected by the switching of the topic, since
at that point it is actually reflecting "the state of the application up to
topic A's offset 100", could you still reuse that state with topic B at the
specified offset?

To make things a bit more complicated, if later another notification from
the global state has arrived saying "now switch back to topic A again",
would you restart at where you stopped, i.e. offset 100, or you'll stop at
the earliest or latest offset of topic A, are the application states still
reusable?

I think if your answer to these questions are "no", then you'd better treat
them as different applications, i.e. one app reading from A, and one app
reading from B but with similar topology. It's just that based on the meta
topic the apps may be stopped / restarted dynamically. Currently Kafka
Streams do not have support for dynamic support yet, since lots of such
request scenarios turned out to be not really fit to be collapsed into a
single applications; if there is indeed a common request, I think one way
to do that is to make PartitionAssignor customizable by users as you
suggested, so that only the selected partitions are used to re-form the
tasks; but one still need some way to trigger a rebalance so that the
PartitionAssignor can be called.


Guozhang

On Fri, Aug 11, 2017 at 1:42 AM, Bart Vercammen  wrote:

> Hi,
>
> I have a question basically on how it would be the best way to implement
> something within Kafka Streams.  The thing I would like to do: "dynamically
> update the subscription pattern of the source topics.
>
> The reasoning behind this (in my project):
> meta data about the source topics is evented on an other kafka topic, that
> should be tracked by the kafka streams topology, and depending on that meta
> data specific source topics should be added, or removed from the kafka
> streams topology.
>
> Currently I track the "meta data topic" as "global state", so that every
> processor can actually access it to fetch the meta data (this meta data for
> instance also describes whether or not a specific topic pattern should be
> tracked by the stream processor) - so consider this as some kind of
> "configuration" stream about the source topics.
>
> So now it comes,
> Is there any way I could (from a running topology) update the kafka
> consumer subscriptions?
> So that I'm able to replace the source topic pattern while the topology is
> running?
>
> I don't think there currently is a way to do this, but as under the hood it
> is just a kafka consumer, my believe is that it should be possible somehow
> ...
>
> I was thinking about the PartitionAssigner ... if I could get my hands on
> that one, maybe I could dynamically configure it to only allow specific
> topic-patterns?
> Or directly alter the subscription on the underlying consumer?
>
> I don't know all the nifty details about the Kafka Streams internals, so it
> would be nice if someone could direct me in the right direction to achieve
> this ...
>
> Thanks,
> Bart
>



-- 
-- Guozhang


Re: Streams: Fetch Offset 0 is out of range for partition foo-0, resetting offset

2017-08-13 Thread Guozhang Wang
Hi Garrett,

Since your error message says "offset X" is out of range, it means that the
offset was reset to because there was no data any more on topic partition
"foo-0". I suspect that is because all the log segments got truncated and
the topic partition contains empty list. It is less likely caused by
KAFKA-5510 and hence offsets.retention.minutes may not help here.

Since you mentioned setting log.retention.hours=48 does not help, and that
the input sample data may be a day or two before the new build goes out, I
suspect there may be some messages with timestamps older than 48 hours
published to the log, causing it to roll new segments and get deleted
immediately: note that the Kafka brokers use the current system time to
determine the diffs with the message timestamps. If that is the case it is
not a Streams issue, not even a general Consumer issue, but a Kafka broker
side log retention operation.

What I'm not clear is that in your error message "X" is actually 0: this is
quite weird that a consumer may auto-reset its position to 0, did you run
some tools periodically to reset the offset to 0?


Guozhang





On Wed, Aug 9, 2017 at 7:16 AM, Garrett Barton 
wrote:

> I have a small test setup with a local zk/kafka server and a streams app
> that loads sample data.  The test setup is usually up for a day or two
> before a new build goes out and its blown away and loaded from scratch.
>
> Lately I've seen that after a few hours the stream app will stop processing
> and start spamming the logs with:
>
> org.apache.kafka.clients.consumer.internals.Fetcher: Fetch Offset 0 is out
> of range for partition foo-0, resetting offset
> org.apache.kafka.clients.consumer.internals.Fetcher: Fetch Offset 0 is out
> of range for partition foo-0, resetting offset
> org.apache.kafka.clients.consumer.internals.Fetcher: Fetch Offset 0 is out
> of range for partition foo-0, resetting offset
>
> Pretty much sinks a core into spamming the logs.
>
> Restarting the application puts it right back in that broke state.
>
> I thought it was because of this:
> https://issues.apache.org/jira/browse/KAFKA-5510
> So I set my log.retention.hours=48, and offsets.retention.minutes=10081,
> which is huge compared to the total data retention time.  Yet same error
> occurred.
>
> Any ideas?
>



-- 
-- Guozhang


Re: Forwarding consumer with kafka streams

2017-08-13 Thread Guozhang Wang
Hi Richardo,

What you described seems very similar to the demo example code as stated
here:
https://github.com/apache/kafka/blob/trunk/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/Pipe.java

If you started the program it should just pipe all data starting from the
earliest offset and pipe it to the target topic, no matter how much data
the source topic already have stored.


Guozhang



On Sat, Aug 12, 2017 at 2:35 PM, Eno Thereska 
wrote:

> Hi Ricardo,
>
> Kafka Streams should handle that case as well. What streams config are you
> using, could you share it? There is one parameter that is called
> “ConsumerConfig.AUTO_OFFSET_RESET_CONFIG” and by default it’s set to
> “earliest”. Any chance your app has changed it to “latest”?
>
> Thanks
> Eno
>
> > On Aug 12, 2017, at 5:13 PM, Ricardo Costa  wrote:
> >
> > Hi,
> >
> > I've implemented a forwarding consumer which literally just consumes the
> > messages from a source topic, logs them and then publishes them to a
> target
> > topic.
> >
> > I wanted to keep the implementation simple with very little code so I
> went
> > with kafka-streams. I have a really simple topology with a source for the
> > source topic, a sink for the target topic and a logging processor
> > in-between.
> >
> > I'm quite happy with the solution, really simple and elegant, I ran some
> > basic tests and everything seemed to be working. As I went on to build
> more
> > test cases, I found that the stream only does its thing if I push
> messages
> > to the source topic *after* creating the stream and waiting until it is
> > fully initialized. Is this the expected behaviour? I need the stream to
> be
> > started at any point in time and forward the messages that were buffered
> on
> > the source topic until then. Are kafka-streams not fit for this use case?
> > Or am I missing something?
> >
> > Thanks in advance!
> >
> > --
> > Ricardo
>
>


-- 
-- Guozhang


Re: How to clear a particular partition?

2017-08-13 Thread Hans Jespersen
This is an area that is being worked on. See KIP-107 for details.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient
 


-hans


> On Aug 10, 2017, at 10:52 AM, Sven Ludwig  wrote:
> 
> Hello,
>  
> assume that all producers and consumers regarding a topic-partition have been 
> shutdown.
>  
> Is it possible in this situation to empty that topic-partition, while the 
> other topic-partitions keep working?
>  
> Like for example, is it possible to trigger a log truncation to 0 on the 
> leader for that partition using some admin tool?
>  
> Kind Regards,
> Sven
>  



Re: How to clear a particular partition?

2017-08-13 Thread jan
I can't help you here but maybe can focus the question - why would you want to?

jan

On 10/08/2017, Sven Ludwig  wrote:
> Hello,
>
> assume that all producers and consumers regarding a topic-partition have
> been shutdown.
>
> Is it possible in this situation to empty that topic-partition, while the
> other topic-partitions keep working?
>
> Like for example, is it possible to trigger a log truncation to 0 on the
> leader for that partition using some admin tool?
>
> Kind Regards,
> Sven
>
>


Re: Create Topic Error: Create Topic Error and cannot write to console producer

2017-08-13 Thread Ascot Moss
Hi,


Without changing any configuration, got the error again now:

[2017-08-13 20:09:52,727] ERROR Error when sending message to topic test02
with key: null, value: 5 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
test02-1: 1542 ms has passed since batch creation plus linger time

[2017-08-13 20:09:53,835] ERROR Error when sending message to topic test02
with key: null, value: 5 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
test02-0: 1532 ms has passed since batch creation plus linger time


Producer:

kafka-console-producer.sh \
--broker-list n1:9093  \
--producer.config /homey/kafka/config/producer.n1.properties
--sync --topic test02


Consumer:

kafka-console-consumer.sh \
--bootstrap-server n1:9093  \
--consumer.config /home/kafka/config/consumer.n1.properties \
--topic test02 --from-beginning