Re: Fast Unit Tests

2018-05-01 Thread Geoff Von Allmen
I am pretty new to spark/scala myself, but I just recently implemented unit
tests to test my transformations/aggregations and such myself.

I’m using the mrpowers spark-fast-tests
 and spark-daria
 libraries.

I am also using a JDBC sink in the foreach writer. I’ve mocked the sink to
place the generated MYSQL statements in a global object and then I compare
the output there to an expected set of mysql statements.

I’m running this with FlatSpec ScalaTests, where my spark inputs are
manually generated fixtures for each test case. Everything seems to be
running well and it's nice and quick.
​

On Tue, May 1, 2018 at 8:25 AM, marcos rebelo  wrote:

> Hey all,
>
> We are using Scala and SQL heavily, but I have a problem with VERY SLOW
> Unit Tests.
>
> Is there a way to do fast Unit Tests on Spark?
>
> How are you guys going around it?
>
> Best Regards
> Marcos Rebelo
>


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Geoff Von Allmen
Ok - `LOCAL` makes sense now.

Do you have the option to still use `spark-submit` in this scenario, but
using the following options:

```bash
--master "local[*]" \
--deploy-mode "client" \
...
```

I know in the past, I have setup some options using `.config("Option",
"value")` when creating the spark session, and then other runtime options
as you describe above with `spark.conf.set`. At this point though I've just
moved everything out into a `spark-submit` script.

On Fri, Apr 13, 2018 at 8:18 AM, Jason Boorn  wrote:

> Hi Geoff -
>
> Appreciate the help here - I do understand what you’re saying below.  And
> I am able to get this working when I submit a job to a local cluster.
>
> I think part of the issue here is that there’s ambiguity in the
> terminology.  When I say “LOCAL” spark, I mean an instance of spark that is
> created by my driver program, and is not a cluster itself.  It means that
> my master node is “local”, and this mode is primarily used for testing.
>
> https://jaceklaskowski.gitbooks.io/mastering-apache-
> spark/content/spark-local.html
>
> While I am able to get alluxio working with spark-submit, I am unable to
> get it working when using local mode.  The mechanisms for setting class
> paths during spark-submit are not available in local mode.  My
> understanding is that all one is able to use is:
>
> spark.conf.set(“”)
>
> To set any runtime properties of the local instance.  Note that it is
> possible (and I am more convinced of this as time goes on) that alluxio
> simply does not work in spark local mode as described above.
>
>
> On Apr 13, 2018, at 11:09 AM, Geoff Von Allmen 
> wrote:
>
> I fought with a ClassNotFoundException for quite some time, but it was
> for kafka.
>
> The final configuration that got everything working was running
> spark-submit with the following options:
>
> --jars "/path/to/.ivy2/jars/package.jar" \
> --driver-class-path "/path/to/.ivy2/jars/package.jar" \
> --conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
> --packages org.some.package:package_name:version
>
> While this was needed for me to run in cluster mode, it works equally
> well for client mode as well.
>
> One other note when needing to supplied multiple items to these args -
> --jars and --packages should be comma separated, --driver-class-path and
> extraClassPath should be : separated
>
> HTH
> ​
>
> On Fri, Apr 13, 2018 at 4:28 AM, jb44  wrote:
>
>> Haoyuan -
>>
>> As I mentioned below, I've been through the documentation already.  It has
>> not helped me to resolve the issue.
>>
>> Here is what I have tried so far:
>>
>> - setting extraClassPath as explained below
>> - adding fs.alluxio.impl through sparkconf
>> - adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
>> this matters in my case)
>> - compiling the client from source
>>
>> Do you have any other suggestions on how to get this working?
>>
>> Thanks
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Geoff Von Allmen
I fought with a ClassNotFoundException for quite some time, but it was for
kafka.

The final configuration that got everything working was running spark-submit
with the following options:

--jars "/path/to/.ivy2/jars/package.jar" \
--driver-class-path "/path/to/.ivy2/jars/package.jar" \
--conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
--packages org.some.package:package_name:version

While this was needed for me to run in cluster mode, it works equally well
for client mode as well.

One other note when needing to supplied multiple items to these args -
--jars and --packages should be comma separated, --driver-class-path and
extraClassPath should be : separated

HTH
​

On Fri, Apr 13, 2018 at 4:28 AM, jb44  wrote:

> Haoyuan -
>
> As I mentioned below, I've been through the documentation already.  It has
> not helped me to resolve the issue.
>
> Here is what I have tried so far:
>
> - setting extraClassPath as explained below
> - adding fs.alluxio.impl through sparkconf
> - adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
> this matters in my case)
> - compiling the client from source
>
> Do you have any other suggestions on how to get this working?
>
> Thanks
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: select count * doesnt seem to respect update mode in Kafka Structured Streaming?

2018-03-20 Thread Geoff Von Allmen
The following
<http://spark.apache.org/docs/latest/configuration.html#spark-streaming>
settings
may be what you’re looking for:

   - spark.streaming.backpressure.enabled
   - spark.streaming.backpressure.initialRate
   - spark.streaming.receiver.maxRate
   - spark.streaming.kafka.maxRatePerPartition

​

On Mon, Mar 19, 2018 at 5:27 PM, kant kodali  wrote:

> Yes it indeed makes sense! Is there a way to get incremental counts when I
> start from 0 and go through 10M records? perhaps count for every micro
> batch or something?
>
> On Mon, Mar 19, 2018 at 1:57 PM, Geoff Von Allmen 
> wrote:
>
>> Trigger does not mean report the current solution every 'trigger
>> seconds'. It means it will attempt to fetch new data and process it no
>> faster than trigger seconds intervals.
>>
>> If you're reading from the beginning and you've got 10M entries in kafka,
>> it's likely pulling everything down then processing it completely and
>> giving you an initial output. From here on out, it will check kafka every 1
>> second for new data and process it, showing you only the updated rows. So
>> the initial read will give you the entire output since there is nothing to
>> be 'updating' from. If you add data to kafka now that the streaming job has
>> completed it's first batch (and leave it running), it will then show you
>> the new/updated rows since the last batch every 1 second (assuming it can
>> fetch + process in that time span).
>>
>> If the combined fetch + processing time is > the trigger time, you will
>> notice warnings that it is 'falling behind' (I forget the exact verbiage,
>> but something to the effect of the calculation took XX time and is falling
>> behind). In that case, it will immediately check kafka for new messages and
>> begin processing the next batch (if new messages exist).
>>
>> Hope that makes sense -
>>
>>
>> On Mon, Mar 19, 2018 at 13:36 kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I have 10 million records in my Kafka and I am just trying to
>>> spark.sql(select count(*) from kafka_view). I am reading from kafka and
>>> writing to kafka.
>>>
>>> My writeStream is set to "update" mode and trigger interval of one
>>> second (Trigger.ProcessingTime(1000)). I expect the counts to be
>>> printed every second but looks like it would print after going through all
>>> 10M. why?
>>>
>>> Also, it seems to take forever whereas Linux wc of 10M rows would take
>>> 30 seconds.
>>>
>>> Thanks!
>>>
>>
>


Re: select count * doesnt seem to respect update mode in Kafka Structured Streaming?

2018-03-19 Thread Geoff Von Allmen
Trigger does not mean report the current solution every 'trigger seconds'.
It means it will attempt to fetch new data and process it no faster than
trigger seconds intervals.

If you're reading from the beginning and you've got 10M entries in kafka,
it's likely pulling everything down then processing it completely and
giving you an initial output. From here on out, it will check kafka every 1
second for new data and process it, showing you only the updated rows. So
the initial read will give you the entire output since there is nothing to
be 'updating' from. If you add data to kafka now that the streaming job has
completed it's first batch (and leave it running), it will then show you
the new/updated rows since the last batch every 1 second (assuming it can
fetch + process in that time span).

If the combined fetch + processing time is > the trigger time, you will
notice warnings that it is 'falling behind' (I forget the exact verbiage,
but something to the effect of the calculation took XX time and is falling
behind). In that case, it will immediately check kafka for new messages and
begin processing the next batch (if new messages exist).

Hope that makes sense -


On Mon, Mar 19, 2018 at 13:36 kant kodali  wrote:

> Hi All,
>
> I have 10 million records in my Kafka and I am just trying to
> spark.sql(select count(*) from kafka_view). I am reading from kafka and
> writing to kafka.
>
> My writeStream is set to "update" mode and trigger interval of one second (
> Trigger.ProcessingTime(1000)). I expect the counts to be printed every
> second but looks like it would print after going through all 10M. why?
>
> Also, it seems to take forever whereas Linux wc of 10M rows would take 30
> seconds.
>
> Thanks!
>


Structured Streaming: distinct (Spark 2.2)

2018-03-19 Thread Geoff Von Allmen
I see in the documentation that the distinct operation is not supported

in Structured Streaming. That being said, I have noticed that you are able
to successfully call distinct() on a data frame and it seems to perform the
desired operation and doesn’t fail with the AnalysisException as expected.
If I call it with a column name specified, then it will fail with
AnalysisException.

I am using Structured Streaming to read from a Kafka stream and my question
(and concern) is that:

   - The distinct operation is properly applied across the *current* batch
   as read from Kafka, however, the distinct operation would not apply
   across batches.

I have tried the following:

   - Started the streaming job to see my baseline data and left the job
   streaming
   - Created events in kafka that would increment my counts if distinct was
   not performing as expected
   - Results:
  - Distinct still seems to be working over the entire data set even as
  I add new data.
  - As I add new data, I see spark process the data (I’m doing output
  mode = update) but there are no new results indicating the distinct
  function is in fact still working across batches as spark pulls
in the new
  data from kafka.

Does anyone know more about the intended behavior of distinct in Structured
Streaming?

If this is working as intended, does this mean I could have a dataset that
is growing without bound being held in memory/disk or something to that
effect (so it has some way to make that distinct operation against previous
data)?
​


Re: Standalone Cluster: ClassNotFound org.apache.kafka.common.serialization.ByteArrayDeserializer

2017-12-27 Thread Geoff Von Allmen
I’ve tried it both ways.

Uber jar gives me gives me the following:

   - Caused by: java.lang.ClassNotFoundException: Failed to find data
   source: kafka. Please find packages at
   http://spark.apache.org/third-party-projects.html

If I only do minimal packaging and add
org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar as a --package and
then add it to the --driver-class-path then I get past that error, but I
get the error I showed in the original post.

I agree it seems it’s missing the kafka-clients jar file as that is where
the ByteArrayDeserializer is, though it looks like it’s present as far as I
can tell.

I can see the following two packages in the ClassPath entries on the
history server (Though the source shows: **(redacted) — not sure
why?)

   - spark://:/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar
   -
   spark://:/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar

As as side note, i’m running both a master and worker on the same system
just to test out running in cluster mode. Not sure if that would have
anything to do with it. I would think it would make it easier since it's
got access to all the same file system... but I'm pretty new to Spark.

I have also read through and followed those instructions as well as many
others at this point.

Thanks!
​

On Wed, Dec 27, 2017 at 12:56 AM, Eyal Zituny 
wrote:

> Hi,
> it seems that you're missing the kafka-clients jar (and probably some
> other dependencies as well)
> how did you packaged you application jar? does it includes all the
> required dependencies (as an uber jar)?
> if it's not an uber jar you need to pass via the driver-class-path and the
> executor-class-path all the files\dirs where your dependencies can be found
> (note that those must be accessible from each node in the cluster)
> i suggest to go over the manual
> <https://spark.apache.org/docs/latest/submitting-applications.html>
>
> Eyal
>
>
> On Wed, Dec 27, 2017 at 1:08 AM, Geoff Von Allmen 
> wrote:
>
>> I am trying to deploy a standalone cluster but running into ClassNotFound
>> errors.
>>
>> I have tried a whole myriad of different approaches varying from
>> packaging all dependencies into a single JAR and using the --packages
>> and --driver-class-path options.
>>
>> I’ve got a master node started, a slave node running on the same system,
>> and am using spark submit to get the streaming job kicked off.
>>
>> Here is the error I’m getting:
>>
>> Exception in thread "main" java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at 
>> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
>> at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
>> Caused by: java.lang.NoClassDefFoundError: 
>> org/apache/kafka/common/serialization/ByteArrayDeserializer
>> at 
>> org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:376)
>> at 
>> org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)
>> at 
>> org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:323)
>> at 
>> org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:60)
>> at 
>> org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
>> at 
>> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:88)
>> at 
>> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:88)
>> at 
>> org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
>> at 
>> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
>> at com.Customer.start(Customer.scala:47)
>> at com.Main$.main(Main.scala:23)
>> at com.Main.main(Main.scala)
>> ... 6 more
>> Caused by: java.lang.ClassNotFoundException: 
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 18 mo

Standalone Cluster: ClassNotFound org.apache.kafka.common.serialization.ByteArrayDeserializer

2017-12-26 Thread Geoff Von Allmen
I am trying to deploy a standalone cluster but running into ClassNotFound
errors.

I have tried a whole myriad of different approaches varying from packaging
all dependencies into a single JAR and using the --packages and
--driver-class-path options.

I’ve got a master node started, a slave node running on the same system,
and am using spark submit to get the streaming job kicked off.

Here is the error I’m getting:

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer
at 
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:376)
at 
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)
at 
org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:323)
at 
org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:60)
at 
org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at 
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:88)
at 
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:88)
at 
org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at 
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.Customer.start(Customer.scala:47)
at com.Main$.main(Main.scala:23)
at com.Main.main(Main.scala)
... 6 more
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.serialization.ByteArrayDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 18 more

Here is the spark submit command I’m using:

./spark-submit \
--master spark://: \
--files jaas.conf \
--deploy-mode cluster \
--driver-java-options "-Djava.security.auth.login.config=./jaas.conf" \
--conf 
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf"
\
--packages org.apache.spark:spark-sql-kafka-0-10_2.11 \
--driver-class-path
~/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.1.jar \
--class  \
--verbose \
my_jar.jar

I’ve tried all sorts of combinations of including different packages and
driver-class-path jar files. As far as I can find, the serializer should be
in the kafka-clients jar file, which I’ve tried including to no success.

Pom Dependencies are as follows:



org.scala-lang
scala-library
2.11.12


org.apache.spark
spark-streaming-kafka-0-10_2.11
2.2.1


org.apache.spark
spark-core_2.11
2.2.1


org.apache.spark
spark-sql_2.11
2.2.1


org.apache.spark
spark-sql-kafka-0-10_2.11
2.2.1


mysql
mysql-connector-java
8.0.8-dmr


joda-time
joda-time
2.9.9



If I remove --deploy-mode and run it as client … it works just fine.

Thanks Everyone -

Geoff V.
​