Kafka Streaming Join for range of gps coordinates

2016-08-27 Thread Farhon Zaharia
Hello friends,

I am designing a new streaming component and am looking at how to use Kafka
Streams.

I need some guidance with the appropriate flow.

*Problem to solve:*
The problem I am working on is I have a large group of riders and drivers.
I would like to match available riders to nearby drivers.  (For simplicity
think of Uber or Lyft)

The goal is to have the drivers within a certain delta of gps coordinates
be notified of a potential rider.

For example, a rider requests to be picked up at location:
37.788517, -122.406812

I would like to select the nearby drivers to send notifications of an
available match by selecting nearby drivers within a range

latitude < 37.798517 && latitude > 37.778517 && longitude < -122.4106812 &&
longitude > -122.3906812

*Note this is a POC and would prefer to select the most nearby drivers,
then after lookup the address and use my own graph for streets and
calculate the shortest path on my own.

I would like to have 3 initial topics:  riders, drivers, and paired-onride

What is the best way to do this with Kafka Streams?

*What I have tried or considered:*
I was considering storing drivers in a Ktable and having riders in a
KStream and joining them.  But I don't think this will work because the
join is dependent on the key, which in this case I was looking more for a
select statement to look for a range of gps coordinates as noted above.
The drivers location will be updated periodically.

I was also thinking of filtering the KStream based on the gps range and
making a smaller subselection of available drivers within a certain
distance to a rider.

At this point I am seeking some guidance and if this is not an ideal
use-case that is also ok.  Thanks for any information or direction you can
provide.


-Farhon


Kafka startup exception while recovering log file

2016-08-27 Thread Gaurav Agarwal
Hi All,

We are facing a weird problem where Kafka broker fails to start due to an
unhandled exception while 'recovering' a log segment. I have been able to
isolate the problem to a single record and providing the details below:

During Kafka restart, if index files are corrupted or they don't exist,
kafka broker is trying to 'recover' a LogSegment and rebuild the indexes -
LogSegment:recover()
I the main while loop here which iterates over the entries in the log:
while(iter.hasNext) { val entry = iter.next}, I get an entry with
complete underlying byte buffer as follows:

[82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, 10,
39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, 108,
46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, 114,
101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, 8,
34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
48, 72]

A toString() on this entry yields:

*MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, key
= null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 cap=197]),4449011)*

It appears that this record is corrupt and deserializing/decompressing it
causes exceptions which are unhandled. Specifically in 0.10.0 version this
calls fails with NoSuchElementException

ByteBufferMessageSet.deepIterator(entry).next().offset

Note: This message was written to disk using* kafka 0.10.0 broker running
snappy jar version 1.1.1.7* (which is known to have some read time bugs).
The log file itself is 512MB large and this message appears at around 4MB
in the file.

We have upgraded snappy; but should this condition be handled correctly?
What is the correct behavior here? Should the exception be handled and log
file be truncated? At the moment this causes kafka to completely crash with
no recovery path except of deleting the bad data file manually and then
starting kafka.

--

cheers,

gaurav


A test case to repro the crash

@Test

def testCorruptLog() {

 val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
-59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79,
-58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48,
0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
-115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99,
101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115,
116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50,
53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52,
52, 54, 48, 56, 48, 72);

  val msg = new Message(ByteBuffer.wrap(buf), None, None)

  val entry = new MessageAndOffset(msg, 4449011L)

  val deepIterator: Iterator[MessageAndOffset] =
ByteBufferMessageSet.deepIterator(entry)

  deepIterator.next().offset

}


kafka-streams project compiles using maven but failed using sbt

2016-08-27 Thread Tommy Go
Hi

I am playing with kafka-streams using Scala, but found some strange issue,
the following project compiles using mvn but failed using sbt:

https://github.com/deeplambda/kstream-debug

[error] 
/Users/tommy/tmp/kstream-debug/src/main/scala/kafka/streams/WordCountDemo.scala:49:
missing parameter type for expanded function ((x$1) =>
x$1.toUpperCase())
[error] val uppercasedWithMapValues: KStream[Array[Byte], String]
= textLines.mapValues(_.toUpperCase())
[error]
 ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed



Any ideas why the demo failed compiling using sbt ?

Thanks,
Tommy


Kafka bootup exception while recovering log file

2016-08-27 Thread Gaurav Agarwal
Hi All,

We are facing a weird problem where Kafka broker fails to start due to an
unhandled exception while 'recovering' a log segment. I have been able to
isolate the problem to a single record and providing the details below:

During Kafka restart, if index files are corrupted or they don't exist,
kafka broker is trying to 'recover' a LogSegment and rebuild the indexes -
LogSegment:recover()
I the main while loop here which iterates over the entries in the log:
while(iter.hasNext) { val entry = iter.next}, I get an entry with
complete underlying byte buffer as follows:

[82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, 10,
39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, 108,
46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, 114,
101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, 8,
34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
48, 72]

A toString() on this entry yields:

*MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, key
= null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 cap=197]),4449011)*

It appears that this record is corrupt and deserializing/decompressing it
causes exceptions which are unhandled. Specifically in 0.10.0 version this
calls fails with NoSuchElementException

ByteBufferMessageSet.deepIterator(entry).next().offset

Note: This message was written to disk using* kafka 0.10.0 broker running
snappy jar version 1.1.1.7* (which is known to have some read time bugs).
The log file itself is 512MB large and this message appears at around 4MB
in the file.

We have upgraded snappy; but should this condition be handled correctly?
What is the correct behavior here? Should the exception be handled and log
file be truncated? At the moment this causes kafka to completely crash with
no recovery path except of deleting the bad data file manually and then
starting kafka.

--

cheers,

gaurav


A test case to repro the crash

@Test

def testCorruptLog() {

 val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
-59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79,
-58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48,
0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
-115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99,
101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115,
116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50,
53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52,
52, 54, 48, 56, 48, 72);

  val msg = new Message(ByteBuffer.wrap(buf), None, None)

  val entry = new MessageAndOffset(msg, 4449011L)

  val deepIterator: Iterator[MessageAndOffset] = ByteBufferMessageSet.
deepIterator(entry)

  deepIterator.next().offset

}