What is the minimum value allowed for StreamingContext's Seconds parameter?

2016-05-23 Thread YaoPau
Just wondering how small the microbatches can be, and any best practices on
the smallest value that should be used in production.  For example, any
issue with running it at 0.01 seconds?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-minimum-value-allowed-for-StreamingContext-s-Seconds-parameter-tp27007.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



A number of issues when running spark-ec2

2016-04-16 Thread YaoPau
I launched a cluster with: "./spark-ec2 --key-pair my_pem --identity-file
../../ssh/my_pem.pem launch jg_spark2" and I got the "Spark standalone
cluster started at http://ec2-54-88-249-255.compute-1.amazonaws.com:8080;
and "Done!" success confirmations at the end.  I confirmed on EC2 that 1
Master and 1 Slave were both launched and passed their status checks.

But none of the Spark commands seem to work (spark-shell, pyspark, etc), and
port 8080 isn't being used.  The full output from launching the cluster is
below.  Any ideas what the issue is?

>>
launch
jg_spark2/Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/plugin.py:40:
PendingDeprecationWarning: the imp module is deprecated in favour of
importlib; see the module's documentation for alternative uses
  import imp
/Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/provider.py:197:
ResourceWarning: unclosed file <_io.TextIOWrapper
name='/Users/jg/.aws/credentials' mode='r' encoding='UTF-8'>
  self.shared_credentials.load_from_path(shared_path)
Setting up security groups...
Creating security group jg_spark2-master
Creating security group jg_spark2-slaves
Searching for existing cluster jg_spark2 in region us-east-1...
Spark AMI: ami-5bb18832
Launching instances...
Launched 1 slave in us-east-1a, regid = r-e7d97944
Launched master in us-east-1a, regid = r-d3d87870
Waiting for AWS to propagate instance metadata...
Waiting for cluster to enter 'ssh-ready' state

Warning: SSH connection error. (This could be temporary.)
Host: ec2-54-88-249-255.compute-1.amazonaws.com
SSH return code: 255
SSH output: b'ssh: connect to host ec2-54-88-249-255.compute-1.amazonaws.com
port 22: Connection refused'

./Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/connection.py:190:
ResourceWarning: unclosed 
  self.queue.pop(0)


Warning: SSH connection error. (This could be temporary.)
Host: ec2-54-88-249-255.compute-1.amazonaws.com
SSH return code: 255
SSH output: b'ssh: connect to host ec2-54-88-249-255.compute-1.amazonaws.com
port 22: Connection refused'

./Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/connection.py:190:
ResourceWarning: unclosed 
  self.queue.pop(0)


Warning: SSH connection error. (This could be temporary.)
Host: ec2-54-88-249-255.compute-1.amazonaws.com
SSH return code: 255
SSH output: b'ssh: connect to host ec2-54-88-249-255.compute-1.amazonaws.com
port 22: Connection refused'

./Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/connection.py:190:
ResourceWarning: unclosed 
  self.queue.pop(0)


Warning: SSH connection error. (This could be temporary.)
Host: ec2-54-88-249-255.compute-1.amazonaws.com
SSH return code: 255
SSH output: b'ssh: connect to host ec2-54-88-249-255.compute-1.amazonaws.com
port 22: Connection refused'

./Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/connection.py:190:
ResourceWarning: unclosed 
  self.queue.pop(0)

Cluster is now in 'ssh-ready' state. Waited 612 seconds.
Generating cluster's SSH key on master...
Warning: Permanently added
'ec2-54-88-249-255.compute-1.amazonaws.com,54.88.249.255' (ECDSA) to the
list of known hosts.
Connection to ec2-54-88-249-255.compute-1.amazonaws.com closed.
Warning: Permanently added
'ec2-54-88-249-255.compute-1.amazonaws.com,54.88.249.255' (ECDSA) to the
list of known hosts.
Transferring cluster's SSH key to slaves...
ec2-54-209-124-74.compute-1.amazonaws.com
Warning: Permanently added
'ec2-54-209-124-74.compute-1.amazonaws.com,54.209.124.74' (ECDSA) to the
list of known hosts.
Cloning spark-ec2 scripts from
https://github.com/amplab/spark-ec2/tree/branch-1.5 on master...
Warning: Permanently added
'ec2-54-88-249-255.compute-1.amazonaws.com,54.88.249.255' (ECDSA) to the
list of known hosts.
Cloning into 'spark-ec2'...
remote: Counting objects: 2072, done.
remote: Total 2072 (delta 0), reused 0 (delta 0), pack-reused 2072
Receiving objects: 100% (2072/2072), 355.67 KiB | 0 bytes/s, done.
Resolving deltas: 100% (792/792), done.
Checking connectivity... done.
Connection to ec2-54-88-249-255.compute-1.amazonaws.com closed.
Deploying files to master...
Warning: Permanently added
'ec2-54-88-249-255.compute-1.amazonaws.com,54.88.249.255' (ECDSA) to the
list of known hosts.
building file list ... done
root/spark-ec2/ec2-variables.sh

sent 1658 bytes  received 42 bytes  1133.33 bytes/sec
total size is 1517  speedup is 0.89
Running setup on master...
Warning: Permanently added
'ec2-54-88-249-255.compute-1.amazonaws.com,54.88.249.255' (ECDSA) to the
list of known hosts.
Connection to ec2-54-88-249-255.compute-1.amazonaws.com closed.
Warning: Permanently added
'ec2-54-88-249-255.compute-1.amazonaws.com,54.88.249.255' (ECDSA) to the
list of known hosts.
Setting up Spark on ip-172-31-55-237.ec2.internal...
Setting executable permissions on scripts...
RSYNC'ing /root/spark-ec2 to other cluster nodes...
ec2-54-209-124-74.compute-1.amazonaws.com
Warning: Permanently added

Why do I need to handle dependencies on EMR but not on-prem Hadoop?

2016-04-08 Thread YaoPau
On-prem I'm running PySpark on Cloudera's distribution, and I've never had to
worry about dependency issues.  I import my libraries on my driver node only
using pip or conda, run my jobs in yarn-client mode, and everything works (I
just assumed the relevant libraries are copied temporarily to each executor
node during execution).

But on EMR, I installed a library called fuzzywuzzy on the driver using pip,
then tried running this basic script in "pyspark --master yarn-client" mode:

>>>
mydata = sc.textFile("s3n://my_bucket/rum_20160331/*")
sample = mydata.take(3)
new_rdd = sc.parallelize(sample)
import random
import fuzzywuzzy

choices = ['hello', 'xylophone', 'zebra']
mapped_rdd = new_rdd.map(lambda row: str(fuzzywuzzy.process.extract(row,
choices, limit=2)))
mapped_rdd.collect()
>>>

and I'm getting the error:

ImportError: ('No module named fuzzywuzzy', , ('fuzzywuzzy',)) 

which makes me think I have to use py-files for the first time ever, and
resolve dependencies manually.

Why does this happen?  How is it that, on the on-prem Cloudera version,
Spark executor nodes are able to access all the libraries I've only
installed on my driver, but on EMR they can't?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-do-I-need-to-handle-dependencies-on-EMR-but-not-on-prem-Hadoop-tp26712.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming, PySpark 1.3, randomly losing connection

2015-12-18 Thread YaoPau
Hi - I'm running Spark Streaming using PySpark 1.3 in yarn-client mode on CDH
5.4.4.  The job sometimes runs a full 24hrs, but more often it fails
sometime during the day.

I'm getting several vague errors that I don't see much about when searching
online:

- py4j.Py4JException: Error while obtaining a new communication channel
- java.net.ConnectException: Connection timed out
- py4j.Py4JException: An exception was raised by the Python Proxy. Return
Message: null

What could be happening here, and is there a workaround for keeping this
process up a full 24hrs at a time?




15/12/16 23:17:53 INFO TaskSetManager: Finished task 0.0 in stage 948.0 (TID
986) in 79670 ms on phd40010043.xyz.com (1/1)
15/12/16 23:17:53 INFO YarnScheduler: Removed TaskSet 948.0, whose tasks
have all completed, from pool 
15/12/16 23:17:53 INFO DAGScheduler: Stage 948 (repartition at
DataFrame.scala:835) finished in 79.673 s
15/12/16 23:17:53 INFO DAGScheduler: looking for newly runnable stages
15/12/16 23:17:53 INFO DAGScheduler: running: Set()
15/12/16 23:17:53 INFO DAGScheduler: waiting: Set(Stage 949)
15/12/16 23:17:53 INFO DAGScheduler: failed: Set()
15/12/16 23:17:53 INFO DAGScheduler: Missing parents for Stage 949: List()
15/12/16 23:17:53 INFO DAGScheduler: Submitting Stage 949
(MapPartitionsRDD[3944] at repartition at DataFrame.scala:835), which is now
runnable
15/12/16 23:17:53 INFO MemoryStore: ensureFreeSpace(77816) called with
curMem=51550693, maxMem=2778778828
15/12/16 23:17:53 INFO MemoryStore: Block broadcast_1156 stored as values in
memory (estimated size 76.0 KB, free 2.5 GB)
15/12/16 23:17:53 INFO MemoryStore: ensureFreeSpace(27738) called with
curMem=51628509, maxMem=2778778828
15/12/16 23:17:53 INFO MemoryStore: Block broadcast_1156_piece0 stored as
bytes in memory (estimated size 27.1 KB, free 2.5 GB)
15/12/16 23:17:53 INFO BlockManagerInfo: Added broadcast_1156_piece0 in
memory on phe40010004.xyz.com:36705 (size: 27.1 KB, free: 2.6 GB)
15/12/16 23:17:53 INFO BlockManagerMaster: Updated info of block
broadcast_1156_piece0
15/12/16 23:17:53 INFO SparkContext: Created broadcast 1156 from broadcast
at DAGScheduler.scala:839
15/12/16 23:17:53 INFO DAGScheduler: Submitting 1 missing tasks from Stage
949 (MapPartitionsRDD[3944] at repartition at DataFrame.scala:835)
15/12/16 23:17:53 INFO YarnScheduler: Adding task set 949.0 with 1 tasks
15/12/16 23:17:53 INFO TaskSetManager: Starting task 0.0 in stage 949.0 (TID
987, phd40010020.xyz.com, PROCESS_LOCAL, 1345 bytes)
15/12/16 23:17:53 INFO BlockManagerInfo: Added broadcast_1156_piece0 in
memory on phd40010020.xyz.com:40560 (size: 27.1 KB, free: 2.6 GB)
15/12/16 23:17:53 INFO MapOutputTrackerMasterActor: Asked to send map output
locations for shuffle 314 to sparkexecu...@phd40010020.xyz.com:17913
15/12/16 23:17:53 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 314 is 154 bytes
15/12/16 23:17:56 INFO TaskSetManager: Finished task 0.0 in stage 949.0 (TID
987) in 2824 ms on phd40010020.xyz.com (1/1)
15/12/16 23:17:56 INFO DAGScheduler: Stage 949 (runJob at
newParquet.scala:646) finished in 2.827 s
15/12/16 23:17:56 INFO YarnScheduler: Removed TaskSet 949.0, whose tasks
have all completed, from pool 
15/12/16 23:17:56 INFO DAGScheduler: Job 636 finished: runJob at
newParquet.scala:646, took 82.548844 s
15/12/16 23:17:56 INFO ParquetFileReader: Initiating action with
parallelism: 5
15/12/16 23:17:56 INFO SparkContext: Starting job: collect at
SparkPlan.scala:83
15/12/16 23:17:56 INFO DAGScheduler: Registering RDD 3948 (mapPartitions at
Exchange.scala:100)
15/12/16 23:17:56 INFO DAGScheduler: Got job 637 (collect at
SparkPlan.scala:83) with 1 output partitions (allowLocal=false)
15/12/16 23:17:56 INFO DAGScheduler: Final stage: Stage 951(collect at
SparkPlan.scala:83)
15/12/16 23:17:56 INFO DAGScheduler: Parents of final stage: List(Stage 950)
15/12/16 23:17:56 INFO DAGScheduler: Missing parents: List(Stage 950)
15/12/16 23:17:56 INFO DAGScheduler: Submitting Stage 950
(MapPartitionsRDD[3948] at mapPartitions at Exchange.scala:100), which has
no missing parents
15/12/16 23:17:56 INFO MemoryStore: ensureFreeSpace(56016) called with
curMem=51656247, maxMem=2778778828
15/12/16 23:17:56 INFO MemoryStore: Block broadcast_1157 stored as values in
memory (estimated size 54.7 KB, free 2.5 GB)
15/12/16 23:17:56 INFO MemoryStore: ensureFreeSpace(35457) called with
curMem=51712263, maxMem=2778778828
15/12/16 23:17:56 INFO MemoryStore: Block broadcast_1157_piece0 stored as
bytes in memory (estimated size 34.6 KB, free 2.5 GB)
15/12/16 23:17:56 INFO BlockManagerInfo: Added broadcast_1157_piece0 in
memory on phe40010004.xyz.com:36705 (size: 34.6 KB, free: 2.6 GB)
15/12/16 23:17:56 INFO BlockManagerMaster: Updated info of block
broadcast_1157_piece0
15/12/16 23:17:56 INFO SparkContext: Created broadcast 1157 from broadcast
at DAGScheduler.scala:839
15/12/16 23:17:56 INFO DAGScheduler: Submitting 1 missing tasks from Stage
950 (MapPartitionsRDD[3948] at mapPartitions at 

Python 3.x support

2015-12-17 Thread YaoPau
I found the jira for Python 3 support  here
  , but it looks like
support for 3.4 was still unresolved.

Which Python 3 versions are supported by Spark 1.5?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-3-x-support-tp25731.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



sortByKey not spilling to disk? (PySpark 1.3)

2015-12-09 Thread YaoPau
I'm running sortByKey on a dataset that's nearly the amount of memory I've
provided to executors (I'd like to keep the amount of used memory low so
other jobs can run), and I'm getting the vague "filesystem closed" error. 
When I re-run with more memory it runs fine.

By default shouldn't sortByKey be spilling to disk?  I'm fine with that,
this is a scheduled job where runtime isn't a big issue, and preserving
memory for other jobs is more important.  What can I do to ensure that
sortByKey spills to disk and doesn't result in that error?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-not-spilling-to-disk-PySpark-1-3-tp25660.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL 1.3 not finding attribute in DF

2015-12-06 Thread YaoPau
When I run df.printSchema() I get:

root
 |-- durable_key: string (nullable = true)
 |-- code: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- county: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- market: string (nullable = true)
 |-- tim_zone_desc: string (nullable = true)

A series-like statement by itself seems to work:

>>> df.state_code == "NY"
Column<(state_code = NY)>

But all of these return the error below:

>>> df.where(df.state_code == "NY")
>>> df[df.state_code == "NY"]
>>> df.filter(df['state_code'] == "NY")

I'm guessing this is a bug.  Is there a workaround in 1.3?


Py4JJavaError Traceback (most recent call last)
 in ()
> 1 df.filter(df['state_code'] == "NY")

/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/sql/dataframe.py in
filter(self, condition)
627 jdf = self._jdf.filter(condition)
628 elif isinstance(condition, Column):
--> 629 jdf = self._jdf.filter(condition._jc)
630 else:
631 raise TypeError("condition should be string or Column")

/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539 
540 for temp_arg in temp_args:

/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o375.filter.
: org.apache.spark.sql.AnalysisException: resolved attributes state_code
missing from
latitude,country_code,tim_zone_desc,longitude,dma_durable_key,submarket,dma_code,dma_desc,county,city,zip_code,state_code;
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3.apply(CheckAnalysis.scala:93)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3.apply(CheckAnalysis.scala:43)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:88)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.apply(CheckAnalysis.scala:43)
at
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1069)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
at
org.apache.spark.sql.DataFrame.logicalPlanToDataFrame(DataFrame.scala:157)
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:508)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-3-not-finding-attribute-in-DF-tp25599.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL 1.3 not finding attribute in DF

2015-12-06 Thread YaoPau
If anyone runs into the same issue, I found a workaround:

>>> df.where('state_code = "NY"')

works for me.

>>> df.where(df.state_code == "NY").collect()

fails with the error from the first post.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-3-not-finding-attribute-in-DF-tp25599p25600.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: sparkavro for PySpark 1.3

2015-12-05 Thread YaoPau
Here's what I'm currently trying:

--
I'm including --packages com.databricks:spark-avro_2.10:1.0.0 in my pyspark
call.  This seems to work:

Ivy Default Cache set to: /home/jrgregg/.ivy2/cache
The jars for the packages stored in: /home/jrgregg/.ivy2/jars
:: loading settings :: url =
jar:file:/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.databricks#spark-avro_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found com.databricks#spark-avro_2.10;1.0.0 in central
found org.apache.avro#avro;1.7.6 in central
found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in central
found com.thoughtworks.paranamer#paranamer;2.3 in central
found org.xerial.snappy#snappy-java;1.0.5 in central
found org.apache.commons#commons-compress;1.4.1 in central
found org.tukaani#xz;1.0 in central
found org.slf4j#slf4j-api;1.6.4 in central
:: resolution report :: resolve 629ms :: artifacts dl 22ms
:: modules in use:
com.databricks#spark-avro_2.10;1.0.0 from central in [default]
com.thoughtworks.paranamer#paranamer;2.3 from central in [default]
org.apache.avro#avro;1.7.6 from central in [default]
org.apache.commons#commons-compress;1.4.1 from central in [default]
org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
org.codehaus.jackson#jackson-mapper-asl;1.9.13 from central in [default]
org.slf4j#slf4j-api;1.6.4 from central in [default]
org.tukaani#xz;1.0 from central in [default]
org.xerial.snappy#snappy-java;1.0.5 from central in [default]
-
|  |modules||   artifacts   |
|   conf   | number| search|dwnlded|evicted|| number|dwnlded|
-
|  default |   9   |   0   |   0   |   0   ||   9   |   0   |
-
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 9 already retrieved (0kB/12ms)

--

Then in my code I have:

df.repartition(partitions).save(save_avro, "com.databricks.spark.avro")

This results in:

---
Py4JJavaError Traceback (most recent call last)
 in ()
> 1 schemaBirf.repartition(partitions).save(save_avro,
"com.databricks.spark.avro")

/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/sql/dataframe.py in
save(self, path, source, mode, **options)
215 joptions = MapConverter().convert(options,
216  
self._sc._gateway._gateway_client)
--> 217 self._jdf.save(source, jmode, joptions)
218 
219 @property

/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539 
540 for temp_arg in temp_args:

/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o127.save.
: java.lang.NoClassDefFoundError:
org/apache/spark/sql/sources/HadoopFsRelationProvider
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at

sparkavro for PySpark 1.3

2015-12-03 Thread YaoPau
How can I read from and write to Avro using PySpark in 1.3?  

I can only find the  1.4 documentation
  , which
uses a sqlContext.read method that isn't available to me in 1.3.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sparkavro-for-PySpark-1-3-tp25561.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Getting different DESCRIBE results between SparkSQL and Hive

2015-11-23 Thread YaoPau
Example below.  The partition columns show up as regular columns.

I'll note that SHOW PARTITIONS works correctly in Spark SQL, so it's aware
of the partitions but it does not show them in DESCRIBE.



In Hive: "DESCRIBE pub.inventory_daily"

[['effective_date', 'string', ''],
 ['listing_skey', 'int', ''],
 ['car_durable_key', 'int', ''],
 ['car_id', 'int', ''],
 ['# Partition Information', 'NULL', 'NULL'],
 ['# col_name', 'data_type   ', 'comment '],
 ['', 'NULL', 'NULL'],
 ['year', 'smallint', ''],
 ['month', 'smallint', ''],
 ['day', 'smallint', '']]

In SparkSQL: hc.sql("DESCRIBE pub.inventory_daily").collect()

[Row(col_name=u'effective_date', data_type=u'string', comment=u''),
 Row(col_name=u'listing_skey', data_type=u'int', comment=u''),
 Row(col_name=u'car_durable_key', data_type=u'int', comment=u''),
 Row(col_name=u'car_id', data_type=u'int', comment=u''),
 Row(col_name=u'year', data_type=u'smallint', comment=u''),
 Row(col_name=u'month', data_type=u'smallint', comment=u''),
 Row(col_name=u'day', data_type=u'smallint', comment=u'')]





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-different-DESCRIBE-results-between-SparkSQL-and-Hive-tp25452.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL: filter if column substring does not contain a string

2015-11-14 Thread YaoPau
I'm using pyspark 1.3.0, and struggling with what should be simple. 
Basically, I'd like to run this:

site_logs.filter(lambda r: 'page_row' in r.request[:20])

meaning that I want to keep rows that have 'page_row' in the first 20
characters of the request column.  The following is the closest I've come up
with:

pages = site_logs.filter("request like '%page_row%'")

but that's missing the [:20] part.  If I instead try the .like function from
the Column API:

birf.filter(birf.request.like('bi_page')).take(5)

I get... Py4JJavaError: An error occurred while calling o71.filter.
: org.apache.spark.sql.AnalysisException: resolved attributes request
missing from
user_agent,status_code,log_year,bytes,log_month,request,referrer


What is the code to run this filter, and what are some recommended ways to
learn the Spark SQL syntax?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-filter-if-column-substring-does-not-contain-a-string-tp25385.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



sqlCtx.sql('some_hive_table') works in pyspark but not spark-submit

2015-11-07 Thread YaoPau
Within a pyspark shell, both of these work for me:

print hc.sql("SELECT * from raw.location_tbl LIMIT 10").collect()
print sqlCtx.sql("SELECT * from raw.location_tbl LIMIT 10").collect()

But when I submit both of those in batch mode (hc and sqlCtx both exist), I
get the following error.  Why is this happening?  I'll note that I'm running
on YARN (CDH) and connecting to the Hive Metastore by setting an environment
variable with export HADOOP_CONF_DIR=/etc/hive/conf/

An error occurred while calling o39.sql.
: java.lang.RuntimeException: Table Not Found: raw.location_tbl
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.catalyst.analysis.SimpleCatalog$$anonfun$1.apply(Catalog.scala:111)
at
org.apache.spark.sql.catalyst.analysis.SimpleCatalog$$anonfun$1.apply(Catalog.scala:111)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at
org.apache.spark.sql.catalyst.analysis.SimpleCatalog.lookupRelation(Catalog.scala:111)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:175)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:187)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:182)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:187)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:187)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:50)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:186)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:207)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:236)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:192)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:207)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:236)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:192)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:177)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:182)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:172)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at

Vague Spark SQL error message with saveAsParquetFile

2015-11-03 Thread YaoPau
I'm using Spark SQL to query one partition at a time of Hive external table
that sits atop .gzip data, and then I'm saving that partition to a new HDFS
location as a set of parquet snappy files using .saveAsParquetFile()

The query completes successfully, but then I get a vague error message I
think saying one of the temporary files is missing.  Any idea what the issue
is here?

--

---
Py4JJavaError Traceback (most recent call last)
 in ()
  8 status, output = commands.getstatusoutput("hdfs dfs -rmr " +
save_path)
  9 partitions = int((df.count() // 75)+1)
---> 10 df.repartition(partitions).saveAsParquetFile(save_path + '/' +
str(dt1)[:10])
 11 
 12 dt1 += timedelta(days=1)

/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/sql/dataframe.py in
saveAsParquetFile(self, path)
119 True
120 """
--> 121 self._jdf.saveAsParquetFile(path)
122 
123 def registerTempTable(self, name):

/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539 
540 for temp_arg in temp_args:

/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o206.saveAsParquetFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 72
in stage 28.1 failed 4 times, most recent failure: Lost task 72.3 in stage
28.1 (TID 603, phd4.xxx.com): java.io.FileNotFoundException:
/hdata/1/yarn/nm/usercache/me/appcache/application_1446009923448_20507/blockmgr-20140dd7-7f36-4971-afe7-0278241f4439/22/temp_shuffle_c274830e-f626-4ae6-8923-ad72c561a84e
(No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:130)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:360)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$spillToPartitionFiles$1.apply(ExternalSorter.scala:355)
at scala.Array$.fill(Array.scala:267)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:355)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Vague-Spark-SQL-error-message-with-saveAsParquetFile-tp25265.html
Sent from 

Spark SQL Exception: Conf non-local session path expected to be non-null

2015-10-19 Thread YaoPau
I've connected Spark SQL to the Hive Metastore and currently I'm running SQL
code via pyspark.  Typically everything works fine, but sometimes after a
long-running Spark SQL job I get the error below, and from then on I can no
longer run Spark SQL commands.  I still do have both my sc and my sqlCtx.

Any idea what this could mean?

An error occurred while calling o36.sql.
: org.apache.spark.sql.AnalysisException: Conf non-local session path
expected to be non-null;
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:260)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:235)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:92)
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 

Any plans to support Spark Streaming within an interactive shell?

2015-10-13 Thread YaoPau
I'm seeing products that allow you to interact with a stream in realtime
(write code, and see the streaming output automatically change), which I
think makes it easier to test streaming code, although running it on batch
then turning streaming on certainly is a good way as well.

I played around with trying to get Spark Streaming working within the spark
shell, but it looks like as soon as I stop my ssc I can't restart it again. 
Are there any open tickets to add this functionality?  Just looking to see
if it's on the roadmap at all.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Any-plans-to-support-Spark-Streaming-within-an-interactive-shell-tp25053.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does Spark use more memory than MapReduce?

2015-10-12 Thread YaoPau
I had this question come up and I'm not sure how to answer it.  A user said
that, for a big job, he thought it would be better to use MapReduce since it
writes to disk between iterations instead of keeping the data in memory the
entire time like Spark generally does.

I mentioned that Spark can cache to disk as well, but I'm not sure about the
overarching question (which I realize is vague): for a typical job, would
Spark use more memory than a MapReduce job?  Are there any memory usage
inefficiencies from either?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-use-more-memory-than-MapReduce-tp25030.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL "SELECT ... LIMIT" scans the entire Hive table?

2015-10-05 Thread YaoPau
I'm using SqlCtx connected to Hive in CDH 5.4.4.  When I run "SELECT * FROM
my_db.my_tbl LIMIT 5", it scans the entire table like Hive would instead of
doing a .take(5) on it and returning results immediately.

Is there a way to get Spark SQL to use .take(5) instead of the Hive logic of
scanning the full table when running a SELECT?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-SELECT-LIMIT-scans-the-entire-Hive-table-tp24938.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL with Hive error: "Conf non-local session path expected to be non-null;"

2015-10-04 Thread YaoPau
I've been experimenting with using PySpark SQL to query Hive tables for the
last week and all has been smooth, but on a command I've run hundreds of
times successfully (a basic SELECT * ...), suddenly this error started
popping up every time I ran a sqlCtx command until I restarted my session. 
It's now working again after the restart.

Any idea what this could mean?  I'd like to start rolling this out to other
users and would like to be able to help if they run into the same.

An error occurred while calling o33.sql.
: org.apache.spark.sql.AnalysisException: Conf non-local session path
expected to be non-null;
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:260)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:235)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:92)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at

Pyspark: "Error: No main class set in JAR; please specify one with --class"

2015-10-01 Thread YaoPau
I'm trying to add multiple SerDe jars to my pyspark session.

I got the first one working by changing my PYSPARK_SUBMIT_ARGS to:

"--master yarn-client --executor-cores 5 --num-executors 5 --driver-memory
3g --executor-memory 5g --jars /home/me/jars/csv-serde-1.1.2.jar"

But when I tried to add a second, using: 

"--master yarn-client --executor-cores 5 --num-executors 5 --driver-memory
3g --executor-memory 5g --jars /home/me/jars/csv-serde-1.1.2.jar,
/home/me/jars/json-serde-1.3-jar-with-dependencies.jar"

I got the error "Error: No main class set in JAR; please specify one with
--class".

How do I specify the class for just the second JAR?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-No-main-class-set-in-JAR-please-specify-one-with-class-tp24900.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL deprecating Hive? How will I access Hive metadata in the future?

2015-09-29 Thread YaoPau
I've heard that Spark SQL will be or has already started deprecating HQL.  We
have Spark SQL + Python jobs that currently read from the Hive metastore to
get things like table location and partition values.  

Will we have to re-code these functions in future releases of Spark (maybe
by connecting to Hive directly), or will fetching Hive metastore data be
supported in future releases via regular SQL?

Jon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-deprecating-Hive-How-will-I-access-Hive-metadata-in-the-future-tp24874.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Run Spark job from within iPython+Spark?

2015-08-24 Thread YaoPau
I set up iPython Notebook to work with the pyspark shell, and now I'd like
use %run to basically 'spark-submit' another Python Spark file, and leave
the objects accessible within the Notebook.

I tried this, but got a ValueError: Cannot run multiple SparkContexts at
once error.  I then tried taking out the 'sc = SparkContext()' line from
the .py file, but then it couldn't access sc.

How can I %run another Python Spark file within iPython Notebook?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Run-Spark-job-from-within-iPython-Spark-tp24427.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: collect() works, take() returns ImportError: No module named iter

2015-08-13 Thread YaoPau
In case anyone runs into this issue in the future, we got it working: the
following variable must be set on the edge node:

export
PYSPARK_PYTHON=/your/path/to/whatever/python/you/want/to/run/bin/python

I didn't realize that variable gets passed to every worker node.  All I saw
when searching for this issue was documentation for an older version of
Spark which mentions using SPARK_YARN_USER_ENV to set PYSPARK_PYTHON within
spark-env.sh, which didn't work for us on Spark 1.3.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/collect-works-take-returns-ImportError-No-module-named-iter-tp24199p24234.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.3 + Parquet: Skipping data using statistics

2015-08-12 Thread YaoPau
I've seen this function referenced in a couple places, first  this forum post
https://forums.databricks.com/questions/951/why-should-i-use-parquet.html  
and  this talk by Michael Armbrust
https://www.youtube.com/watch?v=6axUqHCu__Y   during the 42nd minute.

As I understand it, if you create a Parquet file using Spark, Spark will
then have access to min/max vals for each column.  If a query asks for a
value outside that range (like a timestamp), Spark will know to skip that
file entirely.

Michael says this feature is turned off by default in 1.3.  How can I turn
this on?

I don't see much about this feature online.  A couple other questions:

- Does this only work for Parquet files that were created in Spark?  For
example, if I create the Parquet file using Hive + MapReduce, or Impala,
would Spark still have access to min/max values?

- Does this feature work at the row chunk level, or just at the file level?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-Parquet-Skipping-data-using-statistics-tp24233.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



collect() works, take() returns ImportError: No module named iter

2015-08-10 Thread YaoPau
I'm running Spark 1.3 on CDH 5.4.4, and trying to set up Spark to run via
iPython Notebook.  I'm getting collect() to work just fine, but take()
errors.  (I'm having issues with collect() on other datasets ... but take()
seems to break every time I run it.)

My code is below.  Any thoughts?

 sc
pyspark.context.SparkContext at 0x7ffbfa310f10
 sys.version
'2.7.10 |Anaconda 2.3.0 (64-bit)| (default, May 28 2015, 17:02:03) \n[GCC
4.4.7 20120313 (Red Hat 4.4.7-1)]'
 hourly = sc.textFile('tester')
 hourly.collect()
[u'a man',
 u'a plan',
 u'a canal',
 u'panama']
 hourly = sc.textFile('tester')
 hourly.take(2)
---
Py4JJavaError Traceback (most recent call last)
ipython-input-15-1feecba5868b in module()
  1 hourly = sc.textFile('tester')
 2 hourly.take(2)

/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py in take(self, num)
   1223 
   1224 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
- 1225 res = self.context.runJob(self, takeUpToNumLeft, p,
True)
   1226 
   1227 items += res

/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py in
runJob(self, rdd, partitionFunc, partitions, allowLocal)
841 # SparkContext#runJob.
842 mappedRDD = rdd.mapPartitions(partitionFunc)
-- 843 it = self._jvm.PythonRDD.runJob(self._jsc.sc(),
mappedRDD._jrdd, javaPartitions, allowLocal)
844 return list(mappedRDD._collect_iterator_through_file(it))
845 

/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
-- 538 self.target_id, self.name)
539 
540 for temp_arg in temp_args:

/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
-- 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage
10.0 (TID 47, dhd490101.autotrader.com):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File
/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py,
line 101, in main
process()
  File
/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py,
line 96, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File
/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/serializers.py,
line 236, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py, line
1220, in takeUpToNumLeft
while taken  left:
ImportError: No module named iter

at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
at
org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at

Error when running pyspark/shell.py to set up iPython notebook

2015-08-09 Thread YaoPau
I'm trying to set up iPython notebook on an edge node with port forwarding so
I can run pyspark off my laptop's browser.  I've mostly been following the
Cloudera guide here:
http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/

I got this working on one cluster running Spark 1.0.  But now on Spark 1.3
(with Python 2.7 and Java 7), I'm getting the error below when I run
/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/shell.py at the line: sc
= SparkContext(appName=PySparkShell, pyFiles=add_files)

Before showing the error, I'll note that running pyspark --master
yarn-client DOES work, so I can run pyspark fine atop YARN, but it looks
like ipython notebook is calling Spark via a different method and producing
an error.  Any ideas?

Traceback (most recent call last):
  File stdin, line 1, in module
  File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py, line
111, in __init__
conf, jsc, profiler_cls)
  File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py, line
159, in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
  File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py, line
212, in _initialize_context
return self._jvm.JavaSparkContext(jconf)
  File
/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 701, in __call__
  File
/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.io.FileNotFoundException:
/user/spark/applicationHistory/application_1438611042507_0055.inprogress (No
such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.init(FileOutputStream.java:221)
at java.io.FileOutputStream.init(FileOutputStream.java:110)
at
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:117)
at org.apache.spark.SparkContext.init(SparkContext.scala:399)
at
org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-running-pyspark-shell-py-to-set-up-iPython-notebook-tp24188.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What does Spark is not just MapReduce mean? Isn't every Spark job a form of MapReduce?

2015-06-28 Thread YaoPau
I've heard Spark is not just MapReduce mentioned during Spark talks, but it
seems like every method that Spark has is really doing something like (Map
- Reduce) or (Map - Map - Map - Reduce) etc behind the scenes, with the
performance benefit of keeping RDDs in memory between stages.

Am I wrong about that?  Is Spark doing anything more efficiently than a
series of Maps followed by a Reduce in memory?  What methods does Spark have
that can't easily be mapped (with somewhat similar efficiency) to Map and
Reduce in memory?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-does-Spark-is-not-just-MapReduce-mean-Isn-t-every-Spark-job-a-form-of-MapReduce-tp23518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What is needed to integrate Spark with Pandas and scikit-learn?

2015-06-19 Thread YaoPau
I'm running Spark on YARN, will be upgrading to 1.3 soon.  

For the integration, will I need to install Pandas and scikit-learn on every
node in my cluster, or is the integration just something that takes place on
the edge node after a collect in yarn-client mode?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-needed-to-integrate-Spark-with-Pandas-and-scikit-learn-tp23410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Are there ways to restrict what parameters users can set for a Spark job?

2015-06-12 Thread YaoPau
For example, Hive lets you set a whole bunch of parameters (# of reducers, #
of mappers, size of reducers, cache size, max memory to use for a join),
while Impala gives users a much smaller subset of parameters to work with,
which makes it nice to give to a BI team.

Is there a way to restrict which parameters a user can set for a Spark job? 
Maybe to cap the # of executors, or cap the memory for each executor, or to
enforce a default setting no matter what parameters are used.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Are-there-ways-to-restrict-what-parameters-users-can-set-for-a-Spark-job-tp23301.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-05 Thread YaoPau
I have a file badFullIPs.csv of bad IP addresses used for filtering.  In
yarn-client mode, I simply read it off the edge node, transform it, and then
broadcast it:

  val badIPs = fromFile(edgeDir + badfullIPs.csv)
  val badIPsLines = badIPs.getLines
  val badIpSet = badIPsLines.toSet
  val badIPsBC = sc.broadcast(badIpSet)
  badIPs.close

How can I accomplish this in yarn-cluster mode?

Jon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Read from file and broadcast before every Spark Streaming bucket?

2015-01-29 Thread YaoPau
I'm creating a real-time visualization of counts of ads shown on my website,
using that data pushed through by Spark Streaming.

To avoid clutter, it only looks good to show 4 or 5 lines on my
visualization at once (corresponding to 4 or 5 different ads), but there are
50+ different ads that show on my site.

What I'd like to do is quickly change which ads to pump through Spark
Streaming, without having to rebuild the .jar and push it to my edge node. 
Ideally I'd have a .csv file on my edge node with a list of 4 ad names, and
every time a StreamRDD is created it reads from that tiny file, creates a
broadcast variable, and uses that variable as a filter.  That way I could
just open up the .csv file, save it, and the stream filters correctly
automatically.

I keep getting errors when I try this.  Has anyone had success with a
broadcast variable that updates with each new streamRDD?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Read-from-file-and-broadcast-before-every-Spark-Streaming-bucket-tp21433.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



reduceByKeyAndWindow, but using log timestamps instead of clock seconds

2015-01-28 Thread YaoPau
The TwitterPopularTags example works great: the Twitter firehose keeps
messages pretty well in order by timestamp, and so to get the most popular
hashtags over the last 60 seconds, reduceByKeyAndWindow works well.

My stream pulls Apache weblogs from Kafka, and so it's not as simple:
messages can pass through out-of-order, and if I take down my streaming
process and start it up again, the Kafka index stays in place and now I
might be consuming 10x of what I was consuming before in order to catch up
to the current time.  In this case, reduceByKeyAndWindow won't work.

I'd like my bucket size to be 5 seconds, and I'd like to do the same thing
TwitterPopularTags is doing, except instead of hashtags I have row types,
and instead of aggregating by 60 seconds of clock time I'd like to aggregate
over all rows of that row type with a timestamp within 60 seconds of the
current time.

My thinking is to maintain state in an RDD and update it an persist it with
each 2-second pass, but this also seems like it could get messy.  Any
thoughts or examples that might help me?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKeyAndWindow-but-using-log-timestamps-instead-of-clock-seconds-tp21405.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Connect to Hive metastore (on YARN) from Spark Shell?

2015-01-21 Thread YaoPau
Is this possible, and if so what steps do I need to take to make this happen?  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Connect-to-Hive-metastore-on-YARN-from-Spark-Shell-tp21300.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to set UI port #?

2015-01-11 Thread YaoPau
I have multiple Spark Streaming jobs running all day, and so when I run my
hourly batch job, I always get a java.net.BindException: Address already in
use which starts at 4040 then goes to 4041, 4042, 4043 before settling at
4044.

That slows down my hourly job, and time is critical.  Is there a way I can
set it to 4044 by default, or prevent the UI from launching altogether?

Jon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-UI-port-tp21083.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does Spark automatically run different stages concurrently when possible?

2015-01-10 Thread YaoPau
I'm looking for ways to reduce the runtime of my Spark job.  My code is a
single file of scala code and is written in this order:

(1) val lines = Import full dataset using sc.textFile
(2) val ABonly = Parse out all rows that are not of type A or B
(3) val processA = Process only the A rows from ABonly
(4) val processB = Process only the B rows from ABonly

Is Spark doing (1) then (2) then (3) then (4) ... or is it by default doing
(1) then (2) then branching to both (3) and (4) simultaneously and running
both in parallel?  If not, how can I make that happen?

Jon





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to convert RDD to JSON?

2014-12-08 Thread YaoPau
Pretty straightforward: Using Scala, I have an RDD that represents a table
with four columns.  What is the recommended way to convert the entire RDD to
one JSON object?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-RDD-to-JSON-tp20585.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Appending with saveAsTextFile?

2014-11-29 Thread YaoPau
I am using Spark to aggregate logs that land in HDFS throughout the day.  The
job kicks off 15min after the hour and processes anything that landed the
previous hour.  

For example, the 2:15pm job will process anything that came in from
1:00pm-2:00pm.  99.9% of that data will consist of logs actually from the
1:00pm-2:00pm timespan.  But 0.1% will be data that, for one of several
reasons, trickled in from the 12:00pm hour or even earlier.

What I'd like to do is split by RDD by timestamp into several RDDs, then use
saveAsTextFile() to write each RDD to disk to its proper location.  So 99.9%
of the example data will go to /user/me/output/2014-11-29/13, while a small
portion will go to /user/me/output/2014-11-29/12, and maybe a couple rows
trickle in from the 10pm hour, and that aggregation goes to
/user/me/output/2014-11-29/10.

But when I run the job, I get error for the trickle /12 and /10 data saying
those directories already exist.  Is there a way I can do something like an
INSERT INTO using saveAsTextFile to append?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Appending-with-saveAsTextFile-tp20031.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Missing parents for stage (Spark Streaming)

2014-11-21 Thread YaoPau
When I submit a Spark Streaming job, I see these INFO logs printing
frequently:

14/11/21 18:53:17 INFO DAGScheduler: waiting: Set(Stage 216)
14/11/21 18:53:17 INFO DAGScheduler: failed: Set()
14/11/21 18:53:17 INFO DAGScheduler: Missing parents for Stage 216: List()
14/11/21 18:53:17 INFO DAGScheduler: Submitting Stage 216 (MappedRDD[1733]
at map at MappedDStream.scala:35), which is now runnable

I have a feeling this means there is some error with a Map I created as a
broadcast variable, but I'm not sure.  How can I figure out what this is
referring to?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Missing-parents-for-stage-Spark-Streaming-tp19530.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Cannot access data after a join (error: value _1 is not a member of Product with Serializable)

2014-11-19 Thread YaoPau
I joined two datasets together, and my resulting logs look like this:

(975894369,((72364,20141112T170627,web,MEMPHIS,AR,US,Central),(Male,John,Smith)))
(253142991,((30058,20141112T171246,web,ATLANTA16,GA,US,Southeast),(Male,Bob,Jones)))
(295305425,((28110,20141112T170454,iph,CHARLOTTE2,NC,US,Southeast),(Female,Mary,Williams)))

When I try to access the newly-joined data with JoinedInv.map(line =
line._2._2._1) I get the following error:

[ERROR] 
error: value _1 is not a member of Product with Serializable
[INFO]   val getOne = JoinedInv.map(line = line._2._2._1)
[INFO] ^
[ERROR] error: value foreach is not a member of Array[Nothing]
[INFO]   getOne.take(10).foreach(println)
[INFO]^

It looks like there are some rows where a JOIN did not occur (no key match
in the joined dataset), but because I can't access line._2._2._1 I don't
know of a way to check for that.  I can access line._2._2 but line._2._2
does not have the length attribute.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-access-data-after-a-join-error-value-1-is-not-a-member-of-Product-with-Serializable-tp19272.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Joining DStream with static file

2014-11-19 Thread YaoPau
Here is my attempt:

val sparkConf = new SparkConf().setAppName(LogCounter)
val ssc =  new StreamingContext(sparkConf, Seconds(2))

val sc = new SparkContext()
val geoData = sc.textFile(data/geoRegion.csv)
.map(_.split(','))
.map(line = (line(0), (line(1),line(2),line(3),line(4

val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)

val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details
removed for brevity
val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =rdd.join(geoData))

This is very wrong.  I have a feeling I should be broadcasting geoData
instead of reading it in with each task (it's a 100MB file), but I'm not
sure where to put the code that maps from the .csv to the final geoData rdd.

Also I'm not sure if geoData is even defined correctly (maybe it should use
ssc instead of sc?).  Please advise.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-DStream-with-static-file-tp19329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to broadcast a textFile?

2014-11-17 Thread YaoPau
I have a 1 million row file that I'd like to read from my edge node, and then
send a copy of it to each Hadoop machine's memory in order to run JOINs in
my spark streaming code.

I see examples in the docs of how use use broadcast() for a simple array,
but how about when the data is in a textFile?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-textFile-tp19083.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to broadcast a textFile?

2014-11-17 Thread YaoPau
OK then I'd still need to write the code (within my spark streaming code I'm
guessing) to convert my text file into an object like a HashMap before
broadcasting.  

How can I make sure only the HashMap is being broadcast while all the
pre-processing to create the HashMap is only performed once?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-textFile-tp19083p19094.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Given multiple .filter()'s, is there a way to set the order?

2014-11-14 Thread YaoPau
I have an RDD x of millions of STRINGs, each of which I want to pass
through a set of filters.  My filtering code looks like this:

x.filter(filter#1, which will filter out 40% of data).
   filter(filter#2, which will filter out 20% of data).
   filter(filter#3, which will filter out 2% of data).
   filter(filter#4, which will filter out 1% of data)

There is no ordering requirement (filter #2 does not depend on filter #1,
etc), but the filters are drastically different in the % of rows they should
eliminate.  What I'd like is an ordering similar to a || statement, where
if it fails on filter#1 the row automatically gets filtered out before the
other three filters run.

But when I play around with the ordering of the filters, the runtime doesn't
seem to change.  Is Spark somehow intelligently guessing how effective each
filter will be and ordering it correctly regardless of how I order them?  If
not, is there I way I can set the filter order?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Given-multiple-filter-s-is-there-a-way-to-set-the-order-tp18957.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Building a hash table from a csv file using yarn-cluster, and giving it to each executor

2014-11-13 Thread YaoPau
I built my Spark Streaming app on my local machine, and an initial step in
log processing is filtering out rows with spam IPs.  I use the following
code which works locally:

// Creates a HashSet for badIPs read in from file
val badIpSource = scala.io.Source.fromFile(wrongIPlist.csv)
val ipLines = badIpSource.getLines()


val set = new HashSet[String]()
val badIpSet = set ++ ipLines
badIpSource.close()

def isGoodIp(ip: String): Boolean = !badIpSet.contains(ip)

But when I try this using --master yarn-cluster I get Exception in thread
Thread-4 java.lang.reflect.InvocationTargetException ... Caused by:
java.io.FileNotFoundException: wrongIPlist.csv (No such file or directory). 
The file is there (I wasn't sure which directory it was accessing so it's in
both my current client directory and my HDFS home directory), so now I'm
wondering if reading a file in parallel is just not allowed in general and
that's why I'm getting the error.

I'd like each executor to have access to this HashSet (not a huge file,
about 3000 IPs) instead of having to do a more expensive JOIN.  Any
recommendations on a better way to handle this?  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Building-a-hash-table-from-a-csv-file-using-yarn-cluster-and-giving-it-to-each-executor-tp18850.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Converting Apache log string into map using delimiter

2014-11-11 Thread YaoPau
I have an RDD of logs that look like this:

/no_cache/bi_event?Log=0pg_inst=517638988975678942pg=fow_mwever=c.2.1.8site=xyz.compid=156431807121222351rid=156431666543211500srch_id=156431666581865115row=6seq=1tot=1tsp=1cmp=thmb_12co_txt_url=Viewinget=clickthmb_type=pct=uc=579855lnx=SPGOOGBRANDCAMPref_url=http%3A%2F%2Fwww.abcd.com

The pairs are separated by , and the keys/values of each pair are
separated by =.   Hive has a str_to_map function
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions
  
that will convert this String to a map that will make the following work:

mappedString[site] will return xyz.com

What's the most efficient way to do this in Scala + Spark?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Converting-Apache-log-string-into-map-using-delimiter-tp18641.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Converting Apache log string into map using delimiter

2014-11-11 Thread YaoPau
OK I got it working with:

z.map(row = (row.map(element = element.split(=)(0)) zip row.map(element
= element.split(=)(1))).toMap)

But I'm guessing there is a more efficient way than to create two separate
lists and then zip them together and then convert the result into a map.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Converting-Apache-log-string-into-map-using-delimiter-tp18641p18643.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkPi endlessly in yarnAppState: ACCEPTED

2014-11-07 Thread YaoPau
I'm using Cloudera 5.1.3, and I'm repeatedly getting the following output
after submitting the SparkPi example in yarn cluster mode
(http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/cdh_ig_running_spark_apps.html)
using:

spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster
--master yarn
$SPARK_HOME/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.3.jar 10

Output (repeated):

14/11/07 19:33:05 INFO Client: Application report from ASM: 
 application identifier: application_1415303569855_1100
 appId: 1100
 clientToAMToken: null
 appDiagnostics: 
 appMasterHost: N/A
 appQueue: root.yp
 appMasterRpcPort: -1
 appStartTime: 1415406486231
 yarnAppState: ACCEPTED
 distributedFinalState: UNDEFINED

I'll note that spark-submit is working correctly when running with master
local on the edge node.  

Any ideas how to solve this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkPi-endlessly-in-yarnAppState-ACCEPTED-tp18391.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org