StateStoreSaveExec / StateStoreRestoreExec

2017-01-02 Thread Jeremy Smith
I have a question about state tracking in Structured Streaming.

First let me briefly explain my use case: Given a mutable data source (i.e.
an RDBMS) in which we assume we can retrieve a set of newly created row
versions (being a row that was created or updated between two given
`Offset`s, whatever those are), we can create a Structured Streaming
`Source` which retrieves the new row versions. Further assuming that every
logical row has some primary key, then as long as we can track the current
offset for each primary key, we can differentiate between new and updated
rows. Then, when a row is updated, we can record that the previous version
of that row expired at some particular time. That's essentially what I'm
trying to do. This would effectively give you an "event-sourcing" type of
historical/immutable log of changes out of a mutable data source.

I noticed that in Spark 2.0.1 there was a concept of a StateStore, which
seemed like it would allow me to do exactly the tracking that I needed, so
I decided to try and use that built-in functionality rather than some
external key/value store for storing the current "version number" of each
primary key. There were a lot of hard-coded hoops I had to jump through,
but I eventually made it work by implementing some custom LogicalPlans and
SparkPlans around StateStore[Save/Restore]Exec.

Now, in Spark 2.1.0 it seems to have gotten even further away from what I
was using it for - the keyExpressions of StateStoreSaveExec must include a
timestamp column, which means that those expressions are not really keys
(at least not for a logical row). So it appears I can't use it that way
anymore (I can't blame Spark for this, as I knew what I was getting into
when leveraging developer APIs). There are also several hard-coded checks
which now make it clear that StateStore functionality is only to be used
for streaming aggregates, which is not really what I'm doing.

My question is - is there a good way to accomplish the above use case
within Structured Streaming? Or is this the wrong use case for the state
tracking functionality (which increasingly seems to be targeted toward
aggregates only)? Is there a plan for any kind of generalized
`mapWithState`-type functionality for Structured Streaming, or should I
just give up on that and use an external key/value store for my state
tracking?

Thanks,
Jeremy


Re: Apache Arrow data in buffer to RDD/DataFrame/Dataset?

2016-08-05 Thread Jeremy Smith
If you had a persistent, off-heap buffer of Arrow data on each executor,
and you could get an iterator over that buffer from inside of a task, then
you could conceivably define an RDD over it by just extending RDD and
returning the iterator from the compute method.  If you want to make a
Dataset or DataFrame, though, it's going to be tough to avoid copying the
data.  You can't avoid Spark copying data into InternalRows unless your RDD
is an RDD[InternalRow] and you create a BaseRelation for it that specifies
needsConversion = false.  It might be possible to implement InternalRow
over your Arrow buffer, but I'm still fuzzy on whether nor not that would
prevent copying/marshaling of the data.  Maybe one of the actual
contributors on Spark SQL will chime in with deeper knowledge.

Jeremy

On Fri, Aug 5, 2016 at 12:43 PM, Holden Karau  wrote:

> Spark does not currently support Apache Arrow - probably a good place to
> chat would be on the Arrow mailing list where they are making progress
> towards unified JVM & Python/R support which is sort of a precondition of a
> functioning Arrow interface between Spark and Python.
>
> On Fri, Aug 5, 2016 at 12:40 PM, jpivar...@gmail.com 
> wrote:
>
>> In a few earlier posts [ 1
>> <
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Tungsten-off-heap-memory-access-for-C-libraries-td13898.html
>> >
>> ] [ 2
>> <
>> http://apache-spark-developers-list.1001551.n3.nabble.com/How-to-access-the-off-heap-representation-of-cached-data-in-Spark-2-0-td17701.html
>> >
>> ], I asked about moving data from C++ into a Spark data source (RDD,
>> DataFrame, or Dataset). The issue is that even the off-heap cache might
>> not
>> have a stable representation: it might change from one version to the
>> next.
>>
>> I recently learned about Apache Arrow, a data layer that Spark currently
>> or
>> will someday share with Pandas, Impala, etc. Suppose that I can fill a
>> buffer (such as a direct ByteBuffer) with Arrow-formatted data, is there
>> an
>> easy--- or even zero-copy--- way to use that in Spark? Is that an API that
>> could be developed?
>>
>> I'll be at the KDD Spark 2.0 tutorial on August 15. Is that a good place
>> to
>> ask this question?
>>
>> Thanks,
>> -- Jim
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Apache-Arrow-data-in-buffer-to-RDD-DataFrame-Dataset-tp18563.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Parquet partitioning / appends

2016-08-18 Thread Jeremy Smith
Hi,

I'm running into an issue wherein Spark (both 1.6.1 and 2.0.0) will fail
with a GC Overhead limit when creating a DataFrame from a parquet-backed
partitioned Hive table with a relatively large number of parquet files (~
175 partitions, and each partition contains many parquet files).  If I then
use Hive directly to create a new table from the partitioned table with
CREATE TABLE AS, Hive completes that with no problem and Spark then has no
problem reading the resulting table.

Part of the problem is that whenever we insert records to a parquet table,
it creates a new parquet file; this results in many small parquet files for
a streaming job. Since HDFS supports file appending, couldn't the records
be appended to the existing parquet file as a new row group? If I
understand correctly, this would be pretty straightforward - append the new
data pages and then write a copy of the existing footer with the new row
groups included.  It wouldn't be as optimal as creating a whole new parquet
file including all the data, but it would be much better than creating many
small files (for many different reasons, including the crash case above).
And I'm sure I can't be the only one struggling with streaming output to
parquet.

I know the typical solution to this is to periodically compact the small
files into larger files, but it seems like parquet ought to be appendable
as-is - which would obviate the need for that.

Here's a partial trace of the error for reference:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
java.io.ObjectStreamClass.getClassDataLayout0(ObjectStreamClass.java:1251)
at
java.io.ObjectStreamClass.getClassDataLayout(ObjectStreamClass.java:1195)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1885)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108)

Thanks,
Jeremy


Re: Not all KafkaReceivers processing the data Why?

2016-09-14 Thread Jeremy Smith
Take a look at how the messages are actually distributed across the
partitions. If the message keys have a low cardinality, you might get poor
distribution (i.e. all the messages are actually only in two of the five
partitions, leading to what you see in Spark).

If you take a look at the Kafka data directories, you can probably get an
idea of the distribution by just examining the sizes of each partition.

Jeremy

On Wed, Sep 14, 2016 at 12:33 PM, Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> Hello all,
>
>
>
> I have created a Kafka topic with 5 partitions.  And I am using
> createStream receiver API like following.   But somehow only one receiver
> is getting the input data. Rest of receivers are not processign anything.
> Can you please help?
>
>
>
> JavaPairDStream messages = null;
>
>
>
> if(sparkStreamCount > 0){
>
> // We create an input DStream for each partition of the
> topic, unify those streams, and then repartition the unified stream.
>
> List> kafkaStreams = new
> ArrayList>(sparkStreamCount);
>
> for (int i = 0; i < sparkStreamCount; i++) {
>
> kafkaStreams.add(
> KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER),
> contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap));
>
> }
>
> messages = jssc.union(kafkaStreams.get(0),
> kafkaStreams.subList(1, kafkaStreams.size()));
>
> }
>
> else{
>
> messages =  KafkaUtils.createStream(jssc,
> contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID),
> kafkaTopicMap);
>
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: Kafaka 0.8, 0.9 in Structured Streaming

2016-10-07 Thread Jeremy Smith
+1

We're on CDH, and it will probably be a while before they support Kafka
0.10. At the same time, we don't use their Spark and we're looking forward
to upgrading to 2.0.x and using structured streaming.

I was just going to write our own Kafka Source implementation which uses
the existing KafkaRDD but it would be much easier to get buy-in for an
official Spark module.

Jeremy

On Fri, Oct 7, 2016 at 12:41 PM, Michael Armbrust 
wrote:

> We recently merged support for Kafak 0.10.0 in Structured Streaming, but
> I've been hearing a few people tell me that they are stuck on an older
> version of Kafka and cannot upgrade.  I'm considering revisiting
> SPARK-17344 , but it
> would be good to have more information.
>
> Could people please vote or comment on the above ticket if a lack of
> support for older versions of kafka would block you from trying out
> structured streaming?
>
> Thanks!
>
> Michael
>


Spark has a compile dependency on scalatest

2016-10-28 Thread Jeremy Smith
Hey everybody,

Just a heads up that currently Spark 2.0.1 has a compile dependency on
Scalatest 2.2.6. It comes from spark-core's dependency on spark-launcher,
which has a transitive dependency on spark-tags, which has a compile
dependency on Scalatest.

This makes it impossible to use any other version of Scalatest for testing
your app if you declare a dependency on any Spark 2.0.1 module; you'll get
a bunch of runtime errors during testing (unless you figure out the reason
like I did and explicitly exclude Scalatest from the spark dependency).

I think that dependency should probably be moved to a test dependency
instead.

Thanks,
Jeremy


Re: Spark has a compile dependency on scalatest

2016-10-28 Thread Jeremy Smith
spark-core depends on spark-launcher (compile)
spark-launcher depends on spark-tags (compile)
spark-tags depends on scalatest (compile)

To be honest I'm not all that familiar with the project structure - should
I just exclude spark-launcher if I'm not using it?

On Fri, Oct 28, 2016 at 12:27 PM, Sean Owen  wrote:

> It's required because the tags module uses it to define annotations for
> tests. I don't see it in compile scope for anything but the tags module,
> which is then in test scope for other modules. What are you seeing that
> makes you say it's in compile scope?
>
> On Fri, Oct 28, 2016 at 8:19 PM Jeremy Smith 
> wrote:
>
>> Hey everybody,
>>
>> Just a heads up that currently Spark 2.0.1 has a compile dependency on
>> Scalatest 2.2.6. It comes from spark-core's dependency on spark-launcher,
>> which has a transitive dependency on spark-tags, which has a compile
>> dependency on Scalatest.
>>
>> This makes it impossible to use any other version of Scalatest for
>> testing your app if you declare a dependency on any Spark 2.0.1 module;
>> you'll get a bunch of runtime errors during testing (unless you figure out
>> the reason like I did and explicitly exclude Scalatest from the spark
>> dependency).
>>
>> I think that dependency should probably be moved to a test dependency
>> instead.
>>
>> Thanks,
>> Jeremy
>>
>


Re: Spark has a compile dependency on scalatest

2016-10-30 Thread Jeremy Smith
.fasterxml.jackson.core:jackson-core:jar:2.6.5:compile
>> [INFO]+- com.fasterxml.jackson.module:jackson-module-scala_2.10:jar:
>> 2.6.5:compile
>> [INFO]|  +- org.scala-lang:scala-reflect:jar:2.10.6:compile
>> [INFO]|  \- com.fasterxml.jackson.module:
>> jackson-module-paranamer:jar:2.6.5:compile
>> [INFO]+- org.apache.ivy:ivy:jar:2.4.0:compile
>> [INFO]+- oro:oro:jar:2.0.8:compile
>> [INFO]+- net.razorvine:pyrolite:jar:4.9:compile
>> [INFO]+- net.sf.py4j:py4j:jar:0.10.3:compile
>> [INFO]+- org.apache.spark:spark-tags_2.10:jar:2.0.1:compile
>> [INFO]|  \- org.scalatest:scalatest_2.10:jar:2.2.6:compile
>> [INFO]\- org.spark-project.spark:unused:jar:1.0.0:compile
>>
>> On Fri, Oct 28, 2016 at 1:04 PM, Sean Owen  wrote:
>>
>> Yes, but scalatest doesn't end up in compile scope, says Maven?
>>
>> ...
>>
>> [INFO] +- org.apache.spark:spark-tags_2.11:jar:2.1.0-SNAPSHOT:compile
>>
>> [INFO] |  +- (org.scalatest:scalatest_2.11:jar:2.2.6:test - scope
>> managed from compile; omitted for duplicate)
>>
>> [INFO] |  \- (org.spark-project.spark:unused:jar:1.0.0:compile - omitted
>> for duplicate)
>>
>> [INFO] +- org.apache.commons:commons-crypto:jar:1.0.0:compile
>>
>> [INFO] +- org.spark-project.spark:unused:jar:1.0.0:compile
>>
>> [INFO] +- org.scalatest:scalatest_2.11:jar:2.2.6:test
>>
>> ...
>>
>> On Fri, Oct 28, 2016 at 8:52 PM Jeremy Smith 
>> wrote:
>>
>> spark-core depends on spark-launcher (compile)
>> spark-launcher depends on spark-tags (compile)
>> spark-tags depends on scalatest (compile)
>>
>> To be honest I'm not all that familiar with the project structure -
>> should I just exclude spark-launcher if I'm not using it?
>>
>> On Fri, Oct 28, 2016 at 12:27 PM, Sean Owen  wrote:
>>
>> It's required because the tags module uses it to define annotations for
>> tests. I don't see it in compile scope for anything but the tags module,
>> which is then in test scope for other modules. What are you seeing that
>> makes you say it's in compile scope?
>>
>> On Fri, Oct 28, 2016 at 8:19 PM Jeremy Smith 
>> wrote:
>>
>> Hey everybody,
>>
>> Just a heads up that currently Spark 2.0.1 has a compile dependency on
>> Scalatest 2.2.6. It comes from spark-core's dependency on spark-launcher,
>> which has a transitive dependency on spark-tags, which has a compile
>> dependency on Scalatest.
>>
>> This makes it impossible to use any other version of Scalatest for
>> testing your app if you declare a dependency on any Spark 2.0.1 module;
>> you'll get a bunch of runtime errors during testing (unless you figure out
>> the reason like I did and explicitly exclude Scalatest from the spark
>> dependency).
>>
>> I think that dependency should probably be moved to a test dependency
>> instead.
>>
>> Thanks,
>> Jeremy
>>
>>
>>
>>