Re: Can't get all stored values via range iterator

2015-11-18 Thread Yi Pan
Hi, Alexander,

Very glad that you figured it out! Thanks!

-Yi

On Tue, Nov 17, 2015 at 1:41 PM, Alexander Filipchik 
wrote:

> Just want to update you on this one. After some time spent in debugging I
> found that the actual problem was a piece of our code that was calling
> next() on a range iterator twice :(.
> After removing the duplicate call everything works as expected.
>
> Thank you!
>
> Alex
>
> On Mon, Nov 16, 2015 at 10:45 PM, Yi Pan  wrote:
>
> > Hi, Alexander,
> >
> > Sorry to reply late on this one. I embedded my questions and comments
> > in-between the lines:
> >
> > On Sun, Nov 15, 2015 at 7:07 PM, Alexander Filipchik <
> afilipc...@gmail.com
> > >
> > wrote:
> >
> > >
> > > nodeIterator = store.range(
> > > String.join(".", nodeId, String.valueOf(Character.MIN_VALUE)),
> > > String.join(".", nodeId, String.valueOf(Character.MAX_VALUE)));
> > >
> > >
> > Theoretically, what you want is a prefix scan, the start key should be
> > nodeId + '.' and end key should be nodeId + '.' + maxId, in which maxId
> > should have each character = Character.MAX_VALUE with total length that
> is
> > equal or greater than the max possible nodeId.
> >
> > I restreamed RockDB changelog topic and I can see all this edges stored
> > > there, but query still returnes only 4.3M nodes.
> > >
> >
> > Could you help to clarify what you did here to "see all these edges" and
> to
> > "query still returns only 4.3M nodes"?
> >
> >
> > > 1) Have anyone seen such a behaviour before?
> > >
> >
> > Not I am aware of.
> >
> >
> > > 2) What is the best way to debug it on a remote machine? Any particular
> > > logs to look for? Any RockDb config params that should be enabled?
> > >
> >
> > You can try to add Jmx debug port option to task.opts. With Samza 0.10
> > (latest from trunk), the JMX server port is reported from the AppMaster's
> > web API. As for the state store config, you can try to disable the
> > CachedStore to prevent any potential issues w/ cache management.
> >
> >
> > > 3)  Is it a good idea to store a graph in such a format?
> > >
> >
> > As long as you can partition the data based on nodeId, it should be fine.
> >
> >
> > >
> > > Thank you,
> > > Alex
> > >
> >
> > Please let us know if you find any issues with your use case.
> >
> > -Yi
> >
>


Re: Can't get all stored values via range iterator

2015-11-17 Thread Alexander Filipchik
Just want to update you on this one. After some time spent in debugging I
found that the actual problem was a piece of our code that was calling
next() on a range iterator twice :(.
After removing the duplicate call everything works as expected.

Thank you!

Alex

On Mon, Nov 16, 2015 at 10:45 PM, Yi Pan  wrote:

> Hi, Alexander,
>
> Sorry to reply late on this one. I embedded my questions and comments
> in-between the lines:
>
> On Sun, Nov 15, 2015 at 7:07 PM, Alexander Filipchik  >
> wrote:
>
> >
> > nodeIterator = store.range(
> > String.join(".", nodeId, String.valueOf(Character.MIN_VALUE)),
> > String.join(".", nodeId, String.valueOf(Character.MAX_VALUE)));
> >
> >
> Theoretically, what you want is a prefix scan, the start key should be
> nodeId + '.' and end key should be nodeId + '.' + maxId, in which maxId
> should have each character = Character.MAX_VALUE with total length that is
> equal or greater than the max possible nodeId.
>
> I restreamed RockDB changelog topic and I can see all this edges stored
> > there, but query still returnes only 4.3M nodes.
> >
>
> Could you help to clarify what you did here to "see all these edges" and to
> "query still returns only 4.3M nodes"?
>
>
> > 1) Have anyone seen such a behaviour before?
> >
>
> Not I am aware of.
>
>
> > 2) What is the best way to debug it on a remote machine? Any particular
> > logs to look for? Any RockDb config params that should be enabled?
> >
>
> You can try to add Jmx debug port option to task.opts. With Samza 0.10
> (latest from trunk), the JMX server port is reported from the AppMaster's
> web API. As for the state store config, you can try to disable the
> CachedStore to prevent any potential issues w/ cache management.
>
>
> > 3)  Is it a good idea to store a graph in such a format?
> >
>
> As long as you can partition the data based on nodeId, it should be fine.
>
>
> >
> > Thank you,
> > Alex
> >
>
> Please let us know if you find any issues with your use case.
>
> -Yi
>


Re: Can't get all stored values via range iterator

2015-11-16 Thread Yi Pan
Hi, Alexander,

Sorry to reply late on this one. I embedded my questions and comments
in-between the lines:

On Sun, Nov 15, 2015 at 7:07 PM, Alexander Filipchik 
wrote:

>
> nodeIterator = store.range(
> String.join(".", nodeId, String.valueOf(Character.MIN_VALUE)),
> String.join(".", nodeId, String.valueOf(Character.MAX_VALUE)));
>
>
Theoretically, what you want is a prefix scan, the start key should be
nodeId + '.' and end key should be nodeId + '.' + maxId, in which maxId
should have each character = Character.MAX_VALUE with total length that is
equal or greater than the max possible nodeId.

I restreamed RockDB changelog topic and I can see all this edges stored
> there, but query still returnes only 4.3M nodes.
>

Could you help to clarify what you did here to "see all these edges" and to
"query still returns only 4.3M nodes"?


> 1) Have anyone seen such a behaviour before?
>

Not I am aware of.


> 2) What is the best way to debug it on a remote machine? Any particular
> logs to look for? Any RockDb config params that should be enabled?
>

You can try to add Jmx debug port option to task.opts. With Samza 0.10
(latest from trunk), the JMX server port is reported from the AppMaster's
web API. As for the state store config, you can try to disable the
CachedStore to prevent any potential issues w/ cache management.


> 3)  Is it a good idea to store a graph in such a format?
>

As long as you can partition the data based on nodeId, it should be fine.


>
> Thank you,
> Alex
>

Please let us know if you find any issues with your use case.

-Yi


Can't get all stored values via range iterator

2015-11-15 Thread Alexander Filipchik
Hello Samza community!

I'm currently building a graph processing POC with Samza as an engine and
faced an interesting problem.

What I'm trying to do is to cache a graph in KV storage(RocksDbK) using
simple notation:
key: node1_id.node2_id (or nodeIds separated by a dot)
value: empty string for now.

The first operation I was trying to implement is to get all adjacent nodes
of a particular node.
And in code it is:

nodeIterator = store.range(
String.join(".", nodeId, String.valueOf(Character.MIN_VALUE)),
String.join(".", nodeId, String.valueOf(Character.MAX_VALUE)));

And it worked pretty well locally on a small graph, but I got weird results
when I tried to increase graph size.

Experiment:
Number of edges: 5 billions
Number of nodes: around 90M
Kafka partitions: 20
Kafka brokers: 3 with 4TB of space.
Yarn workers: 2 with 256Gb of RAM and 2TB of disc space each.

So, I pumped this whole grap into samza and it swallowed it just fine. But
when I tried to query on of the nodes which has a lot of neighbours It
returned only 55% of them.
Number of actual neighbours: 7.7M
Number of returned neighbours: 4.4M

I restreamed RockDB changelog topic and I can see all this edges stored
there, but query still returnes only 4.3M nodes.

So, currently I'm trying to figure out what will be the best way of
debugging it. So, here are some questions:
1) Have anyone seen such a behaviour before?
2) What is the best way to debug it on a remote machine? Any particular
logs to look for? Any RockDb config params that should be enabled?
3)  Is it a good idea to store a graph in such a format?

Thank you,
Alex