Re: how to set up pyspark eclipse, pyDev, virtualenv? syntaxError: yield from walk(

2018-04-05 Thread Andy Davidson
Hi Hyukjin

Thanks for the links.

At this point I sort of got my eclipse, pyDev, spark, unitTests working. In
my unit test I can run from the cmd line or from with in eclipse a simple
unit test. The test creates a data frame from a text file and calls
df.show()

The last challenge is that it appears pyspark.sql.functions defines some
functions at run time. Examples are lit() and col(). The causes problem with
my IDE

https://issues.apache.org/jira/browse/SPARK-23878?page=com.atlassian.jira.pl
ugin.system.issuetabpanels%3Acomment-tabpanel=16427812#comm
ent-16427812

Andy

P.s. I original started my project using jupyter notebooks. The code base
got to big to manage using notebooks. I am in the process of refactoring
common code into python modules using a standard python IDE. In the IDE I
need to be import all the spark functions and be able to write and run unit
tests.

I choose eclipse because I have a lot of spark code written in java. Its
easier for me to have one IDE for all my java and python code.

From:  Hyukjin Kwon <gurwls...@gmail.com>
Date:  Thursday, April 5, 2018 at 6:09 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: how to set up pyspark eclipse, pyDev, virtualenv? syntaxError:
yield from walk(

> FYI, there is a PR and JIRA for virtualEnv support in PySpark
> 
> https://issues.apache.org/jira/browse/SPARK-13587
> https://github.com/apache/spark/pull/13599
> 
> 
> 2018-04-06 7:48 GMT+08:00 Andy Davidson <a...@santacruzintegration.com>:
>> FYI
>> 
>> http://www.learn4master.com/algorithms/pyspark-unit-test-set-up-sparkcontext
>> 
>> From:  Andrew Davidson <a...@santacruzintegration.com>
>> Date:  Wednesday, April 4, 2018 at 5:36 PM
>> To:  "user @spark" <user@spark.apache.org>
>> Subject:  how to set up pyspark eclipse, pyDev, virtualenv? syntaxError:
>> yield from walk(
>> 
>>> I am having a heck of a time setting up my development environment. I used
>>> pip to install pyspark. I also downloaded spark from apache.
>>> 
>>> My eclipse pyDev intereperter is configured as a python3 virtualenv
>>> 
>>> I have a simple unit test that loads a small dataframe. Df.show() generates
>>> the following error
>>> 
>>> 
>>> 2018-04-04 17:13:56 ERROR Executor:91 - Exception in task 0.0 in stage 0.0
>>> (TID 0)
>>> 
>>> org.apache.spark.SparkException:
>>> 
>>> Error from python worker:
>>> 
>>>   Traceback (most recent call last):
>>> 
>>> File "/Users/a/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/site.py",
>>> line 67, in 
>>> 
>>>   import os
>>> 
>>> File "/Users/a/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/os.py",
>>> line 409
>>> 
>>>   yield from walk(new_path, topdown, onerror, followlinks)
>>> 
>>>^
>>> 
>>>   SyntaxError: invalid syntax
>>> 
>>> 
>>> 
>>> 
>>> 
>>> My unittest classs is dervied from.
>>> 
>>> 
>>> 
>>> class PySparkTestCase(unittest.TestCase):
>>> 
>>> 
>>> 
>>> @classmethod
>>> 
>>> def setUpClass(cls):
>>> 
>>> conf = SparkConf().setMaster("local[2]") \
>>> 
>>> .setAppName(cls.__name__) #\
>>> 
>>> # .set("spark.authenticate.secret", "11")
>>> 
>>> cls.sparkContext = SparkContext(conf=conf)
>>> 
>>> sc_values[cls.__name__] = cls.sparkContext
>>> 
>>> cls.sqlContext = SQLContext(cls.sparkContext)
>>> 
>>> print("aedwip:", SparkContext)
>>> 
>>> 
>>> 
>>> @classmethod
>>> 
>>> def tearDownClass(cls):
>>> 
>>> print("calling stop tearDownClas, the content of sc_values=",
>>> sc_values)
>>> 
>>> sc_values.clear()
>>> 
>>> cls.sparkContext.stop()
>>> 
>>> 
>>> 
>>> This looks similar to Class  PySparkTestCase in
>>> https://github.com/apache/spark/blob/master/python/pyspark/tests.py
>>> 
>>> 
>>> 
>>> Any suggestions would be greatly appreciated.
>>> 
>>> 
>>> 
>>> Andy
>>> 
>>> 
>>> 
>>> My downloaed version is spark-2.3.0-bin-hadoop2.7
>>> 
>>> 
>>> 
>>> My virtual env version is
>>> 
>>> (spark-2.3.0) $ pip show pySpark
>>> 
>>> Name: pyspark
>>> 
>>> Version: 2.3.0
>>> 
>>> Summary: Apache Spark Python API
>>> 
>>> Home-page: https://github.com/apache/spark/tree/master/python
>>> 
>>> Author: Spark Developers
>>> 
>>> Author-email: d...@spark.apache.org
>>> 
>>> License: http://www.apache.org/licenses/LICENSE-2.0
>>> 
>>> Location: 
>>> /Users/a/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/site-packages
>>> 
>>> Requires: py4j
>>> 
>>> (spark-2.3.0) $
>>> 
>>> 
>>> 
>>> (spark-2.3.0) $ python --version
>>> 
>>> Python 3.6.1
>>> 
>>> (spark-2.3.0) $
>>> 
>>> 
> 




Re: how to set up pyspark eclipse, pyDev, virtualenv? syntaxError: yield from walk(

2018-04-05 Thread Andy Davidson
FYI

http://www.learn4master.com/algorithms/pyspark-unit-test-set-up-sparkcontext

From:  Andrew Davidson 
Date:  Wednesday, April 4, 2018 at 5:36 PM
To:  "user @spark" 
Subject:  how to set up pyspark eclipse, pyDev, virtualenv? syntaxError:
yield from walk(

> I am having a heck of a time setting up my development environment. I used pip
> to install pyspark. I also downloaded spark from apache.
> 
> My eclipse pyDev intereperter is configured as a python3 virtualenv
> 
> I have a simple unit test that loads a small dataframe. Df.show() generates
> the following error
> 
> 
> 2018-04-04 17:13:56 ERROR Executor:91 - Exception in task 0.0 in stage 0.0
> (TID 0)
> 
> org.apache.spark.SparkException:
> 
> Error from python worker:
> 
>   Traceback (most recent call last):
> 
> File "/Users/a/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/site.py",
> line 67, in 
> 
>   import os
> 
> File "/Users/a/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/os.py", line
> 409
> 
>   yield from walk(new_path, topdown, onerror, followlinks)
> 
>^
> 
>   SyntaxError: invalid syntax
> 
> 
> 
> 
> 
> My unittest classs is dervied from.
> 
> 
> 
> class PySparkTestCase(unittest.TestCase):
> 
> 
> 
> @classmethod
> 
> def setUpClass(cls):
> 
> conf = SparkConf().setMaster("local[2]") \
> 
> .setAppName(cls.__name__) #\
> 
> # .set("spark.authenticate.secret", "11")
> 
> cls.sparkContext = SparkContext(conf=conf)
> 
> sc_values[cls.__name__] = cls.sparkContext
> 
> cls.sqlContext = SQLContext(cls.sparkContext)
> 
> print("aedwip:", SparkContext)
> 
> 
> 
> @classmethod
> 
> def tearDownClass(cls):
> 
> print("calling stop tearDownClas, the content of sc_values=",
> sc_values)
> 
> sc_values.clear()
> 
> cls.sparkContext.stop()
> 
> 
> 
> This looks similar to Class  PySparkTestCase in
> https://github.com/apache/spark/blob/master/python/pyspark/tests.py
> 
> 
> 
> Any suggestions would be greatly appreciated.
> 
> 
> 
> Andy
> 
> 
> 
> My downloaed version is spark-2.3.0-bin-hadoop2.7
> 
> 
> 
> My virtual env version is
> 
> (spark-2.3.0) $ pip show pySpark
> 
> Name: pyspark
> 
> Version: 2.3.0
> 
> Summary: Apache Spark Python API
> 
> Home-page: https://github.com/apache/spark/tree/master/python
> 
> Author: Spark Developers
> 
> Author-email: d...@spark.apache.org
> 
> License: http://www.apache.org/licenses/LICENSE-2.0
> 
> Location: /Users/a/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/site-packages
> 
> Requires: py4j
> 
> (spark-2.3.0) $ 
> 
> 
> 
> (spark-2.3.0) $ python --version
> 
> Python 3.6.1
> 
> (spark-2.3.0) $ 
> 
> 




Re: Union of multiple data frames

2018-04-05 Thread Andy Davidson

Hi Ceasar

I have used Brandson approach in the past with out any problem

Andy
From:  Brandon Geise 
Date:  Thursday, April 5, 2018 at 11:23 AM
To:  Cesar , "user @spark" 
Subject:  Re: Union of multiple data frames

> Maybe something like
>  
> var finalDF = spark.sqlContext.emptyDataFrame
> for (df <- dfs){
> finalDF = finalDF.union(df)
> }
>  
>  
> Where dfs is a Seq of dataframes.
>  
> 
> From: Cesar 
> Date: Thursday, April 5, 2018 at 2:17 PM
> To: user 
> Subject: Union of multiple data frames
> 
>  
> 
>  
> 
> The following code works for small n, but not for large n (>20):
> 
>  
> 
> val dfUnion = Seq(df1,df2,df3,...dfn).reduce(_ union _)
> 
> dfUnion.show()
> 
>  
> 
> By not working, I mean that Spark takes a lot of time to create the execution
> plan.
> 
>  
> 
> Is there a more optimal way to perform a union of multiple data frames?
> 
>  
> 
> thanks
> -- 
> 
> Cesar Flores




how to set up pyspark eclipse, pyDev, virtualenv? syntaxError: yield from walk(

2018-04-04 Thread Andy Davidson
I am having a heck of a time setting up my development environment. I used
pip to install pyspark. I also downloaded spark from apache.

My eclipse pyDev intereperter is configured as a python3 virtualenv

I have a simple unit test that loads a small dataframe. Df.show() generates
the following error


2018-04-04 17:13:56 ERROR Executor:91 - Exception in task 0.0 in stage 0.0
(TID 0)

org.apache.spark.SparkException:

Error from python worker:

  Traceback (most recent call last):

File "/Users/a/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/site.py",
line 67, in 

  import os

File "/Users/a/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/os.py",
line 409

  yield from walk(new_path, topdown, onerror, followlinks)

   ^

  SyntaxError: invalid syntax





My unittest classs is dervied from.



class PySparkTestCase(unittest.TestCase):



@classmethod

def setUpClass(cls):

conf = SparkConf().setMaster("local[2]") \

.setAppName(cls.__name__) #\

# .set("spark.authenticate.secret", "11")

cls.sparkContext = SparkContext(conf=conf)

sc_values[cls.__name__] = cls.sparkContext

cls.sqlContext = SQLContext(cls.sparkContext)

print("aedwip:", SparkContext)



@classmethod

def tearDownClass(cls):

print("calling stop tearDownClas, the content of sc_values=",
sc_values)

sc_values.clear()

cls.sparkContext.stop()



This looks similar to Class  PySparkTestCase in
https://github.com/apache/spark/blob/master/python/pyspark/tests.py



Any suggestions would be greatly appreciated.



Andy



My downloaed version is spark-2.3.0-bin-hadoop2.7



My virtual env version is

(spark-2.3.0) $ pip show pySpark

Name: pyspark

Version: 2.3.0

Summary: Apache Spark Python API

Home-page: https://github.com/apache/spark/tree/master/python

Author: Spark Developers

Author-email: d...@spark.apache.org

License: http://www.apache.org/licenses/LICENSE-2.0

Location: 
/Users/a/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/site-packages

Requires: py4j

(spark-2.3.0) $ 



(spark-2.3.0) $ python --version

Python 3.6.1

(spark-2.3.0) $ 






trouble with 'pip pyspark' pyspark.sql.functions. ³unresolved import² for col() and lit()

2018-04-04 Thread Andy Davidson
I am having trouble setting up my python3 virtualenv.

I created a virtualenv Œspark-2.3.0¹ Installed pyspark using pip how ever I
am not able to import pyspark.sql.functions. I get ³unresolved import² when
I try to import col() and lit()

from pyspark.sql.functions import *


I found if I download spark from apache and set SPARK_ROOT I can get my
juypter notebook to work. This is a very error prone work around. I am
having simiilar problem with my eclipse pyDev virtualenv

Any suggestions would be greatly appreciated

Andy


# pip show in virtualenv

(spark-2.3.0) $ pip show pyspark

Name: pyspark

Version: 2.3.0

Summary: Apache Spark Python API

Home-page: https://github.com/apache/spark/tree/master/python

Author: Spark Developers

Author-email: d...@spark.apache.org

License: http://www.apache.org/licenses/LICENSE-2.0

Location: 
/Users/foo/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/site-packages

Requires: py4j

(spark-2.3.0) $ 


(spark-2.3.0) $ ls 
~/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/site-packages/pyspark/sql/fu
nctions.py 

~/workSpace/pythonEnv/spark-2.3.0/lib/python3.6/site-packages/pyspark/sql/fu
nctions.py


# Jupyter Notebook
Export SPARK_ROOT=~/workSpace/spark/spark-2.3.0-bin-hadoop2.7




Eclipse pyDev virtual ENV






-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: how to create all possible combinations from an array? how to join and explode row array?

2018-03-30 Thread Andy Davidson
NICE!

Thanks Brandon


Andy.

From:  Brandon Geise <brandonge...@gmail.com>
Date:  Friday, March 30, 2018 at 6:15 PM
To:  Andrew Davidson <a...@santacruzintegration.com>, "user @spark"
<user@spark.apache.org>
Subject:  Re: how to create all possible combinations from an array? how to
join and explode row array?

> Possibly instead of doing the initial grouping, just do a full outer join on
> zyzy.  This is in scala but should be easily convertible to python.
>  
> val data = Array(("john", "red"), ("john", "blue"), ("john", "red"), ("bill",
> "blue"), ("bill", "red"), ("sam", "green"))
> val distData: DataFrame = spark.sparkContext.parallelize(data).toDF("a",
> "b")
> distData.show()
> ++-+
> | a| b|
> ++-+
> |john| red|
> |john| blue|
> |john| red|
> |bill| blue|
> |bill| red|
> | sam|green|
> ++-+
>  
>  
> distData.as("tbl1").join(distData.as("tbl2"), Seq("a"),
> "fullouter").select("tbl1.b", "tbl2.b").distinct.show()
>  
> +-+-+
> | b| b|
> +-+-+
> | blue| red|
> | red| blue|
> | red| red|
> | blue| blue|
> |green|green|
> +-+-+
>  
>  
> 
> From: Andy Davidson <a...@santacruzintegration.com>
> Date: Friday, March 30, 2018 at 8:58 PM
> To: Andy Davidson <a...@santacruzintegration.com>, user
> <user@spark.apache.org>
> Subject: Re: how to create all possible combinations from an array? how to
> join and explode row array?
> 
>  
> 
> I was a little sloppy when I created the sample output. Its missing a few
> pairs
> 
>  
> 
> Assume for a given row I have [a, b, c] I want to create something like the
> cartesian join
> 
>  
> 
> From: Andrew Davidson <a...@santacruzintegration.com
> <mailto:a...@santacruzintegration.com> >
> Date: Friday, March 30, 2018 at 5:54 PM
> To: "user @spark" <user@spark.apache.org <mailto:user@spark.apache.org> >
> Subject: how to create all possible combinations from an array? how to join
> and explode row array?
> 
>  
>> 
>> I have a dataframe and execute  df.groupBy(³xyzy²).agg( collect_list(³abc²)
>> 
>>  
>> 
>> This produces a column of type array. Now for each row I want to create a
>> multiple pairs/tuples from the array so that I can create a contingency
>> table.  Any idea how I can transform my data so that call crosstab() ? The
>> join transformation operate on the entire dataframe. I need something at the
>> row array level?
>> 
>> 
>> 
>> 
>> 
>> Bellow is some sample python and describes what I would like my results to
>> be?
>> 
>> 
>> 
>> Kind regards
>> 
>> 
>> 
>> Andy
>> 
>>  
>> 
>>  
>> 
>> c1 = ["john", "bill", "sam"]
>> 
>> c2 = [['red', 'blue', 'red'], ['blue', 'red'], ['green']]
>> 
>> p = pd.DataFrame({"a":c1, "b":c2})
>> 
>>  
>> 
>> df = sqlContext.createDataFrame(p)
>> 
>> df.printSchema()
>> 
>> df.show()
>> 
>>  
>> 
>> root
>> 
>>  |-- a: string (nullable = true)
>> 
>>  |-- b: array (nullable = true)
>> 
>>  ||-- element: string (containsNull = true)
>> 
>>  
>> 
>> +++
>> 
>> |   a|   b|
>> 
>> +++
>> 
>> |john|[red, blue, red]|
>> 
>> |bill   | [blue, red]|
>> 
>> | sam| [green]|
>> 
>> +++
>> 
>>  
>> 
>>  
>> 
>> The output I am trying to create is. I could live with a crossJoin (cartesian
>> join) and add my own filtering if it makes the problem easier?
>> 
>>  
>> 
>>  
>> 
>> +++
>> 
>> |  x1|x2|
>> 
>> +++
>> 
>> red  | blue
>> 
>> red  | red
>> 
>> blue | red
>> 
>> +++
>> 
>>  
>> 
>>  




Re: how to create all possible combinations from an array? how to join and explode row array?

2018-03-30 Thread Andy Davidson
I was a little sloppy when I created the sample output. Its missing a few
pairs

Assume for a given row I have [a, b, c] I want to create something like the
cartesian join

From:  Andrew Davidson 
Date:  Friday, March 30, 2018 at 5:54 PM
To:  "user @spark" 
Subject:  how to create all possible combinations from an array? how to join
and explode row array?

> I have a dataframe and execute  df.groupBy(³xyzy²).agg( collect_list(³abc²)
> 
> This produces a column of type array. Now for each row I want to create a
> multiple pairs/tuples from the array so that I can create a contingency table.
> Any idea how I can transform my data so that call crosstab() ? The join
> transformation operate on the entire dataframe. I need something at the row
> array level?
> 
> 
> Bellow is some sample python and describes what I would like my results to be?
> 
> Kind regards
> 
> Andy
> 
> 
> c1 = ["john", "bill", "sam"]
> c2 = [['red', 'blue', 'red'], ['blue', 'red'], ['green']]
> p = pd.DataFrame({"a":c1, "b":c2})
> 
> df = sqlContext.createDataFrame(p)
> df.printSchema()
> df.show()
> 
> root
>  |-- a: string (nullable = true)
>  |-- b: array (nullable = true)
>  ||-- element: string (containsNull = true)
> 
> +++
> |   a|   b|
> +++
> |john|[red, blue, red]|
> |bill   | [blue, red]|
> | sam| [green]|
> +++
> 
> 
> The output I am trying to create is. I could live with a crossJoin (cartesian
> join) and add my own filtering if it makes the problem easier?
> 
> 
> +++
> |  x1|x2|
> +++
> red  | blue
> red  | red
> blue | red
> +++
> 
> 




how to create all possible combinations from an array? how to join and explode row array?

2018-03-30 Thread Andy Davidson
I have a dataframe and execute  df.groupBy(³xyzy²).agg( collect_list(³abc²)

This produces a column of type array. Now for each row I want to create a
multiple pairs/tuples from the array so that I can create a contingency
table.  Any idea how I can transform my data so that call crosstab() ? The
join transformation operate on the entire dataframe. I need something at the
row array level?


Bellow is some sample python and describes what I would like my results to
be?

Kind regards

Andy


c1 = ["john", "bill", "sam"]
c2 = [['red', 'blue', 'red'], ['blue', 'red'], ['green']]
p = pd.DataFrame({"a":c1, "b":c2})

df = sqlContext.createDataFrame(p)
df.printSchema()
df.show()

root
 |-- a: string (nullable = true)
 |-- b: array (nullable = true)
 ||-- element: string (containsNull = true)

+++
|   a|   b|
+++
|john|[red, blue, red]|
|bill   | [blue, red]|
| sam| [green]|
+++


The output I am trying to create is. I could live with a crossJoin
(cartesian join) and add my own filtering if it makes the problem easier?


+++
|  x1|x2|
+++
red  | blue
red  | red
blue | red
+++






newbie: how to partition data on file system. What are best practices?

2017-11-22 Thread Andy Davidson
I am working on a deep learning project. Currently we do everything on a
single machine. I am trying to figure out how we might be able to move to a
clustered spark environment.

Clearly its possible a machine or job on the cluster might fail so I assume
that the data needs to be replicated to some degree.

Eventually I expect to I will need to process multi petabyte files and will
need to come up with some sort of sharding. Communication costs could be a
problem. Does spark have any knowledge of how the data distributed,
replicated across the machine in my cluster?

Let say my data source is S3. I should I copy the data to my ec2 cluster or
try to read directly from S3?

If our pilot is successful we expect to need to process multi petabyte file.

What are best practices?

Kind regards

Andy

P.s. We expect to use AWS or some other cloud solution.




does "Deep Learning Pipelines" scale out linearly?

2017-11-22 Thread Andy Davidson
I am starting a new deep learning project currently we do all of our work on
a single machine using a combination of Keras and Tensor flow.
https://databricks.github.io/spark-deep-learning/site/index.html looks very
promising. Any idea how performance is likely to improve as I add machines
to my my cluster?

Kind regards

Andy


P.s. Is user@spark.apache.org the best place to ask questions about this
package?






anyone know what the status of spark-ec2 is?

2016-09-06 Thread Andy Davidson
Spark-ec2 used to be part of the spark distribution. It now seems to be
split into a separate repo https://github.com/amplab/spark-ec2

It does not seem to be listed on https://spark-packages.org/

Does anyone know what the status is? There is a readme.md how ever I am
unable to find any release notes. Is there a spark-ec2 mail list?

Kind regards

Andy




Re: pyspark unable to create UDF: java.lang.RuntimeException: org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a directory: /tmp tmp

2016-08-18 Thread Andy Davidson
NICE CATCH!!! Many thanks.



I spent all day on this bug



The error msg report /tmp. I did not think to look on hdfs.



[ec2-user@ip-172-31-22-140 notebooks]$ hadoop fs -ls hdfs:///tmp/

Found 1 items

-rw-r--r--   3 ec2-user supergroup418 2016-04-13 22:49 hdfs:///tmp

[ec2-user@ip-172-31-22-140 notebooks]$



I have no idea how hdfs:///tmp got created. I deleted it.

This causes a bunch of exceptions. These exceptions has useful message. I
was able to fix the problem as follows

$ hadoop fs -rmr hdfs:///tmp

Now I run the notebook. It creates hdfs:///tmp/hive but the permission are
wrong

$ hadoop fs -chmod 777 hdfs:///tmp/hive


From:  Felix Cheung <felixcheun...@hotmail.com>
Date:  Thursday, August 18, 2016 at 3:37 PM
To:  Andrew Davidson <a...@santacruzintegration.com>, "user @spark"
<user@spark.apache.org>
Subject:  Re: pyspark unable to create UDF: java.lang.RuntimeException:
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a
directory: /tmp tmp

> Do you have a file called tmp at / on HDFS?
> 
> 
> 
> 
> 
> On Thu, Aug 18, 2016 at 2:57 PM -0700, "Andy Davidson"
> <a...@santacruzintegration.com> wrote:
> 
> For unknown reason I can not create UDF when I run the attached notebook on my
> cluster. I get the following error
> 
> Py4JJavaError: An error occurred while calling
> None.org.apache.spark.sql.hive.HiveContext.
> : java.lang.RuntimeException: org.apache.hadoop.fs.FileAlreadyExistsException:
> Parent path is not a directory: /tmp tmp
> 
> The notebook runs fine on my Mac
> 
> In general I am able to run non UDF spark code with out any trouble
> 
> I start the notebook server as the user “ec2-user" and uses master URL
> spark://ec2-51-215-120-63.us-west-1.compute.amazonaws.com:6066
> 
> 
> I found the following message in the notebook server log file. I have log
> level set to warn
> 
> 16/08/18 21:38:45 WARN ObjectStore: Version information not found in
> metastore. hive.metastore.schema.verification is not enabled so recording the
> schema version 1.2.0
> 
> 16/08/18 21:38:45 WARN ObjectStore: Failed to get database default, returning
> NoSuchObjectException
> 
> 
> 
> The cluster was originally created using
> spark-1.6.1-bin-hadoop2.6/ec2/spark-ec2
> 
> 
> #from pyspark.sql import SQLContext, HiveContext
> #sqlContext = SQLContext(sc)
> ​
> #from pyspark.sql import DataFrame
> #from pyspark.sql import functions
> ​
> from pyspark.sql.types import StringType
> from pyspark.sql.functions import udf
> ​
> print("spark version: {}".format(sc.version))
> ​
> import sys
> print("python version: {}".format(sys.version))
> spark version: 1.6.1
> python version: 3.4.3 (default, Apr  1 2015, 18:10:40)
> [GCC 4.8.2 20140120 (Red Hat 4.8.2-16)]
> 
> 
> 
> # functions.lower() raises # py4j.Py4JException: Method lower([class
> java.lang.String]) does not exist# work around define a UDFtoLowerUDFRetType =
> StringType()#toLowerUDF = udf(lambda s : s.lower(),
> toLowerUDFRetType)toLowerUDF = udf(lambda s : s.lower(), StringType())
> You must build Spark with Hive. Export 'SPARK_HIVE=true' and run build/sbt
> assembly
> Py4JJavaErrorTraceback (most recent call last)
>  in ()  4 toLowerUDFRetType =
> StringType()  5 #toLowerUDF = udf(lambda s : s.lower(),
> toLowerUDFRetType)> 6 toLowerUDF = udf(lambda s : s.lower(),
> StringType())/root/spark/python/pyspark/sql/functions.py in udf(f, returnType)
> 1595 [Row(slen=5), Row(slen=3)]   1596 """
> -> 1597 return UserDefinedFunction(f, returnType)   1598
>1599 blacklist = ['map', 'since',
> 'ignore_unicode_prefix']/root/spark/python/pyspark/sql/functions.py in
> __init__(self, func, returnType, name)   1556 self.returnType =
> returnType
>1557 self._broadcast = None-> 1558 self._judf =
> self._create_judf(name)   1559
>1560 def _create_judf(self,
> name):/root/spark/python/pyspark/sql/functions.py in _create_judf(self, name)
> 1567 pickled_command, broadcast_vars, env, includes =
> _prepare_for_python_RDD(sc, command, self)   1568 ctx =
> SQLContext.getOrCreate(sc)-> 1569 jdt =
> ctx._ssql_ctx.parseDataType(self.returnType.json())   1570 if name is
> None:   1571 name = f.__name__ if hasattr(f, '__name__') else
> f.__class__.__name__
> 
> /root/spark/python/pyspark/sql/context.py in _ssql_ctx(self)681
> try:682 if not hasattr(self, '_scala_HiveContext'):--> 683
> self._scala_HiveContext = self._get_hive_ctx()684 return
> self._scala_HiveContext
> 685 except Py4JError as
> e:/root/spark/python/pyspark/sql/co

pyspark unable to create UDF: java.lang.RuntimeException: org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a directory: /tmp tmp

2016-08-18 Thread Andy Davidson
For unknown reason I can not create UDF when I run the attached notebook on
my cluster. I get the following error

Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException:
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a
directory: /tmp tmp

The notebook runs fine on my Mac

In general I am able to run non UDF spark code with out any trouble

I start the notebook server as the user “ec2-user" and uses master URL
spark://ec2-51-215-120-63.us-west-1.compute.amazonaws.com:6066


I found the following message in the notebook server log file. I have log
level set to warn

16/08/18 21:38:45 WARN ObjectStore: Version information not found in
metastore. hive.metastore.schema.verification is not enabled so recording
the schema version 1.2.0

16/08/18 21:38:45 WARN ObjectStore: Failed to get database default,
returning NoSuchObjectException



The cluster was originally created using
spark-1.6.1-bin-hadoop2.6/ec2/spark-ec2


#from pyspark.sql import SQLContext, HiveContext
#sqlContext = SQLContext(sc)
​
#from pyspark.sql import DataFrame
#from pyspark.sql import functions
​
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
​
print("spark version: {}".format(sc.version))
​
import sys
print("python version: {}".format(sys.version))
spark version: 1.6.1
python version: 3.4.3 (default, Apr  1 2015, 18:10:40)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)]



# functions.lower() raises
# py4j.Py4JException: Method lower([class java.lang.String]) does not exist
# work around define a UDF
toLowerUDFRetType = StringType()
#toLowerUDF = udf(lambda s : s.lower(), toLowerUDFRetType)
toLowerUDF = udf(lambda s : s.lower(), StringType())
You must build Spark with Hive. Export 'SPARK_HIVE=true' and run build/sbt
assembly
Py4JJavaErrorTraceback (most recent call last)
 in ()
  4 toLowerUDFRetType = StringType()
  5 #toLowerUDF = udf(lambda s : s.lower(), toLowerUDFRetType)
> 6 toLowerUDF = udf(lambda s : s.lower(), StringType())

/root/spark/python/pyspark/sql/functions.py in udf(f, returnType)
   1595 [Row(slen=5), Row(slen=3)]
   1596 """
-> 1597 return UserDefinedFunction(f, returnType)
   1598 
   1599 blacklist = ['map', 'since', 'ignore_unicode_prefix']

/root/spark/python/pyspark/sql/functions.py in __init__(self, func,
returnType, name)
   1556 self.returnType = returnType
   1557 self._broadcast = None
-> 1558 self._judf = self._create_judf(name)
   1559 
   1560 def _create_judf(self, name):

/root/spark/python/pyspark/sql/functions.py in _create_judf(self, name)
   1567 pickled_command, broadcast_vars, env, includes =
_prepare_for_python_RDD(sc, command, self)
   1568 ctx = SQLContext.getOrCreate(sc)
-> 1569 jdt = ctx._ssql_ctx.parseDataType(self.returnType.json())
   1570 if name is None:
   1571 name = f.__name__ if hasattr(f, '__name__') else
f.__class__.__name__

/root/spark/python/pyspark/sql/context.py in _ssql_ctx(self)
681 try:
682 if not hasattr(self, '_scala_HiveContext'):
--> 683 self._scala_HiveContext = self._get_hive_ctx()
684 return self._scala_HiveContext
685 except Py4JError as e:

/root/spark/python/pyspark/sql/context.py in _get_hive_ctx(self)
690 
691 def _get_hive_ctx(self):
--> 692 return self._jvm.HiveContext(self._jsc.sc())
693 
694 def refreshTable(self, tableName):

/root/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
__call__(self, *args)
   1062 answer = self._gateway_client.send_command(command)
   1063 return_value = get_return_value(
-> 1064 answer, self._gateway_client, None, self._fqn)
   1065 
   1066 for temp_arg in temp_args:

/root/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/root/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException:
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a
directory: /tmp tmp
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.mkdirs(FSDirectory.java:1
489)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesys
tem.java:2979)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.j

Py4JJavaError: An error occurred while calling None.org.apache.spark.sql.hive.HiveContext.

2016-08-18 Thread Andy Davidson
Hi I am using python3, Java8 and spark-1.6.1. I am running my code in
Jupyter notebook

The following code runs fine on my mac

udfRetType = ArrayType(StringType(), True)
findEmojiUDF = udf(lambda s : re.findall(emojiPattern2, s), udfRetType)

retDF = (emojiSpecialDF
# convert into a list of emojis
.select("body",
findEmojiUDF(emojiSpecialDF.body).alias("listEmojis"))
# explode , convert list of emojis into separate rows
.select("*", functions.explode("listEmojis").alias("emoji"))
   )

retDF.printSchema()
retDF.show(40, truncate=False)

When I run it on my cluster I get

Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException:
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a
directory: /tmp tmp

I check the files permissions. I start my notebook server as user ec2-user


[ec2-user exploration]$ ls -ld /tmp

drwxrwxrwt 5 root root 4096 Aug 18 18:14 /tmp





In the cluster I use masterURL
spark://ec2-54-215-230-73.us-west-1.compute.amazonaws.com:6066 (all my other
spark code seems to work fine)


Bellow is the complete stack trace

Any idea what the problem is?

Thanks

Andy

You must build Spark with Hive. Export 'SPARK_HIVE=true' and run build/sbt
assembly

Py4JJavaErrorTraceback (most recent call last)
 in ()
  1 udfRetType = ArrayType(StringType(), True)
> 2 findEmojiUDF = udf(lambda s : re.findall(emojiPattern2, s),
udfRetType)
  3 
  4 retDF = (emojiSpecialDF
  5 # convert into a list of emojis

/root/spark/python/pyspark/sql/functions.py in udf(f, returnType)
   1595 [Row(slen=5), Row(slen=3)]
   1596 """
-> 1597 return UserDefinedFunction(f, returnType)
   1598 
   1599 blacklist = ['map', 'since', 'ignore_unicode_prefix']

/root/spark/python/pyspark/sql/functions.py in __init__(self, func,
returnType, name)
   1556 self.returnType = returnType
   1557 self._broadcast = None
-> 1558 self._judf = self._create_judf(name)
   1559 
   1560 def _create_judf(self, name):

/root/spark/python/pyspark/sql/functions.py in _create_judf(self, name)
   1567 pickled_command, broadcast_vars, env, includes =
_prepare_for_python_RDD(sc, command, self)
   1568 ctx = SQLContext.getOrCreate(sc)
-> 1569 jdt = ctx._ssql_ctx.parseDataType(self.returnType.json())
   1570 if name is None:
   1571 name = f.__name__ if hasattr(f, '__name__') else
f.__class__.__name__

/root/spark/python/pyspark/sql/context.py in _ssql_ctx(self)
681 try:
682 if not hasattr(self, '_scala_HiveContext'):
--> 683 self._scala_HiveContext = self._get_hive_ctx()
684 return self._scala_HiveContext
685 except Py4JError as e:

/root/spark/python/pyspark/sql/context.py in _get_hive_ctx(self)
690 
691 def _get_hive_ctx(self):
--> 692 return self._jvm.HiveContext(self._jsc.sc())
693 
694 def refreshTable(self, tableName):

/root/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
__call__(self, *args)
   1062 answer = self._gateway_client.send_command(command)
   1063 return_value = get_return_value(
-> 1064 answer, self._gateway_client, None, self._fqn)
   1065 
   1066 for temp_arg in temp_args:

/root/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/root/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException:
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a
directory: /tmp tmp
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.mkdirs(FSDirectory.java:1
489)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesys
tem.java:2979)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.j
ava:2932)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java
:2911)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcS
erver.java:649)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslator
PB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:417)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNam

py4j.Py4JException: Method lower([class java.lang.String]) does not exist

2016-08-18 Thread Andy Davidson
I am still working on spark-1.6.1. I am using java8.

Any idea why

(df.select("*", functions.lower("rawTag").alias("tag²))

Would raise py4j.Py4JException: Method lower([class java.lang.String]) 
does
not exist

Thanks in advance

Andy

https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html?highlight=lo
wer#pyspark.sql.functions.lower


pyspark.sql.functions.lower(col)


Converts a string column to lower case.

New in version 1.5.

#import logging
import json

from pyspark.sql import SQLContext, HiveContext
sqlContext = SQLContext(sc)
#hiveContext = HiveContext(sc)

from pyspark.sql import DataFrame
from pyspark.sql import functions


Py4JErrorTraceback (most recent call last)
 in ()
 28 df = df.select("user_loc",
functions.explode("hashTags").alias("rawTag"))
 29 df.printSchema()
---> 30 (df.select("*", functions.lower("rawTag").alias("tag²))


/root/spark/python/pyspark/sql/functions.py in _(col)
 37 def _(col):
 38 sc = SparkContext._active_spark_context
---> 39 jc = getattr(sc._jvm.functions, name)(col._jc if
isinstance(col, Column) else col)
 40 return Column(jc)
 41 _.__name__ = name

/root/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
__call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814 
815 for temp_arg in temp_args:

/root/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/root/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JError(
311 "An error occurred while calling {0}{1}{2}.
Trace:\n{3}\n".
--> 312 format(target_id, ".", name, value))
313 else:
314 raise Py4JError(

Py4JError: An error occurred while calling
z:org.apache.spark.sql.functions.lower. Trace:
py4j.Py4JException: Method lower([class java.lang.String]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:360)
at py4j.Gateway.invoke(Gateway.java:254)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)





FW: [jupyter] newbie. apache spark python3 'Jupyter' data frame problem with auto completion and accessing documentation

2016-08-02 Thread Andy Davidson
FYI

From:  <jupy...@googlegroups.com> on behalf of Thomas Kluyver
<tak...@gmail.com>
Reply-To:  <jupy...@googlegroups.com>
Date:  Tuesday, August 2, 2016 at 3:26 AM
To:  Project Jupyter <jupy...@googlegroups.com>
Subject:  Re: [jupyter] newbie. apache spark python3 'Jupyter' data frame
problem with auto completion and accessing documentation

> Hi Andy,
> 
> On 1 August 2016 at 22:46, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> I wonder if the auto completion problem as todo with the way code is
>> typically written in spark. I.E. By creating chains of function alls? Each
>> function returns a data frame.
>> 
>> hashTagsDF.groupBy("tag").agg({"tag": "count"}).orderBy("count(tag)",
>> ascending=False).show()
> 
> At the moment, tab completion isn't smart enough to know what the result of
> calling a function or method is, so it won't complete anything after the first
> groupBy there. This is something we hope to improve for IPython 6.
> 
> Thomas
> 
> -- 
> You received this message because you are subscribed to the Google Groups
> "Project Jupyter" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to jupyter+unsubscr...@googlegroups.com.
> To post to this group, send email to jupy...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/jupyter/CAOvn4qiGbH_m-TqsSKA6ou-DenwJXJ9Z%2B
> xNXSQzV2u7Q5-6HhA%40mail.gmail.com
> <https://groups.google.com/d/msgid/jupyter/CAOvn4qiGbH_m-TqsSKA6ou-DenwJXJ9Z%2
> BxNXSQzV2u7Q5-6HhA%40mail.gmail.com?utm_medium=email_source=footer> .
> For more options, visit https://groups.google.com/d/optout.




Re: spark 1.6.0 read s3 files error.

2016-08-02 Thread Andy Davidson
Hi Freedafeng

I have been reading and writing to s3 using spark-1.6.x with out any
problems. Can you post a little code example and any error messages?

Andy

From:  freedafeng 
Date:  Tuesday, August 2, 2016 at 9:26 AM
To:  "user @spark" 
Subject:  Re: spark 1.6.0 read s3 files error.

> Any one, please? I believe many of us are using spark 1.6 or higher with
> s3... 
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-0-read-s3-files-
> error-tp27417p27451.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 




python 'Jupyter' data frame problem with autocompletion

2016-08-01 Thread Andy Davidson
I started using python3 and jupyter in a chrome browser. I seem to be having
trouble with data frame code completion. Regular python functions seems to
work correctly.

I wonder if I need to import something so the notebook knows about data
frames?

Kind regards


Andy 




Re: how to copy local files to hdfs quickly?

2016-07-30 Thread Andy Davidson
For lack of a better solution I am using ŒAWS s3 copy¹ to copy my files
locally and Œhadoop fs ­put ./tmp/* Œ to transfer them. In general put works
much better with a smaller number of big files compared to a large number of
small files

Your milage may vary

Andy

From:  Andrew Davidson 
Date:  Wednesday, July 27, 2016 at 4:25 PM
To:  "user @spark" 
Subject:  how to copy local files to hdfs quickly?

> I have a spark streaming app that saves JSON files to s3:// . It works fine
> 
> Now I need to calculate some basic summary stats and am running into horrible
> performance problems.
> 
> I want to run a test to see if reading from hdfs instead of s3 makes
> difference. I am able to quickly copy my the data from s3 to a machine in my
> cluster how ever hadoop fs ­put is pain fully slow. Is there a better way to
> copy large data to hdfs?
> 
> I should mention I am not using EMR . I.E. According to AWS support there is
> no way to have Œ$aws s3¹ copy directory to hdfs://
> 
> Hadoop distcp can not copy files from the local files system
> 
> Thanks in advance
> 
> Andy
> 
> 
> 
> 




Re: use big files and read from HDFS was: performance problem when reading lots of small files created by spark streaming.

2016-07-30 Thread Andy Davidson
a, executor); // concurrent writes to S3
>> 
>> }
>> 
>> private void saveImpl(List saveData, ExecutorService executor) {
>> 
>> List<Future> runningThreads = new ArrayList<Future>(POOL_SIZE);
>> 
>> for(SaveData data : saveData) {
>> 
>> SaveWorker worker = new SaveWorker(data);
>> 
>> Future f = executor.submit(worker);
>> 
>> runningThreads.add(f);
>> 
>> }
>> 
>> // wait for all the workers to complete
>> 
>> for (Future worker : runningThreads) {
>> 
>> try {
>> 
>> worker.get();
>> 
>> logger.debug("worker completed");
>> 
>> } catch (InterruptedException e) {
>> 
>> logger.error("", e);
>> 
>> } catch (ExecutionException e) {
>> 
>> logger.error("", e);
>> 
>> }
>> 
>> } 
>> 
>> }
>> 
>> 
>> 
>> static class SaveData {
>> 
>> private DataFrame df;
>> 
>> private String path;
>> 
>> 
>> 
>> SaveData(DataFrame df, String path) {
>> 
>> this.df = df;
>> 
>> this.path = path;
>> 
>> }
>> 
>> }
>> 
>> static class SaveWorker implements Runnable {
>> 
>> SaveData data;
>> 
>> 
>> 
>> public SaveWorker(SaveData data) {
>> 
>> this.data = data;
>> 
>> }
>> 
>> 
>> 
>> @Override
>> 
>> public void run() {
>> 
>> if (data.df.count() >= 1) {
>> 
>> data.df.write().json(data.path);
>> 
>> }
>> 
>> }
>> 
>> }
>> 
>> }
>> 
>> 
>> 
>> From:  Pedro Rodriguez <ski.rodrig...@gmail.com>
>> Date:  Wednesday, July 27, 2016 at 8:40 PM
>> To:  Andrew Davidson <a...@santacruzintegration.com>
>> Cc:  "user @spark" <user@spark.apache.org>
>> Subject:  Re: performance problem when reading lots of small files created by
>> spark streaming.
>> 
>>> There are a few blog posts that detail one possible/likely issue for
>>> example: 
>>> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
>>> 
>>> TLDR: The hadoop libraries spark uses assumes that its input comes from a
>>> file system (works with HDFS) however S3 is a key value store, not a file
>>> system. Somewhere along the line, this makes things very slow. Below I
>>> describe their approach and a library I am working on to solve this problem.
>>> 
>>> (Much) Longer Version (with a shiny new library in development):
>>> So far in my reading of source code, Hadoop attempts to actually read from
>>> S3 which can be expensive particularly since it does so from a single driver
>>> core (different from listing files, actually reading them, I can find the
>>> source code and link it later if you would like). The concept explained
>>> above is to instead use the AWS sdk to list files then distribute the files
>>> names as a collection with sc.parallelize, then read them in parallel. I
>>> found this worked, but lacking in a few ways so I started this project:
>>> https://github.com/EntilZha/spark-s3
>>> 
>>> This takes that idea further by:
>>> 1. Rather than sc.parallelize, implement the RDD interface where each
>>> partition is defined by the files it needs to read (haven't gotten to
>>> DataFrames yet)
>>> 2. At the driver node, use the AWS SDK to list all the files with their size
>>> (listing is fast), then run the Least Processing Time Algorithm to sift the
>>> files into roughly balanced partitions by size
>>> 3. API: S3Context(sc).textFileByPrefix("bucket", "file1",
>>> "folder2").regularRDDOperationsHere or import implicits and do
>>> sc.s3.textFileByPrefix
>>> 
>>> At present, I am battle testing and benchmarking it at my current job and
>>> results are promising with significant improvements to jobs dealing with
>>> many files especially many small files and to jobs whose input is unbalanced
>>> to start with. Jobs perform better because: 1) there isn't a long stall at
>>> the driver when hadoop decides how to split S3 files 2) the partitions end
>>> up nearly perfectly balanced because of LPT algorithm.
>>> 
>>> Since I hadn't intended to advertise this quite yet the documentation is not
>>> super polished but exists here:
>>> http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context
>>>

use big files and read from HDFS was: performance problem when reading lots of small files created by spark streaming.

2016-07-29 Thread Andy Davidson
; data.df.write().json(data.path);
> 
> }
> 
> }
> 
> }
> 
> }
> 
> 
> 
> From:  Pedro Rodriguez <ski.rodrig...@gmail.com>
> Date:  Wednesday, July 27, 2016 at 8:40 PM
> To:  Andrew Davidson <a...@santacruzintegration.com>
> Cc:  "user @spark" <user@spark.apache.org>
> Subject:  Re: performance problem when reading lots of small files created by
> spark streaming.
> 
>> There are a few blog posts that detail one possible/likely issue for example:
>> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
>> 
>> TLDR: The hadoop libraries spark uses assumes that its input comes from a
>> file system (works with HDFS) however S3 is a key value store, not a file
>> system. Somewhere along the line, this makes things very slow. Below I
>> describe their approach and a library I am working on to solve this problem.
>> 
>> (Much) Longer Version (with a shiny new library in development):
>> So far in my reading of source code, Hadoop attempts to actually read from S3
>> which can be expensive particularly since it does so from a single driver
>> core (different from listing files, actually reading them, I can find the
>> source code and link it later if you would like). The concept explained above
>> is to instead use the AWS sdk to list files then distribute the files names
>> as a collection with sc.parallelize, then read them in parallel. I found this
>> worked, but lacking in a few ways so I started this project:
>> https://github.com/EntilZha/spark-s3
>> 
>> This takes that idea further by:
>> 1. Rather than sc.parallelize, implement the RDD interface where each
>> partition is defined by the files it needs to read (haven't gotten to
>> DataFrames yet)
>> 2. At the driver node, use the AWS SDK to list all the files with their size
>> (listing is fast), then run the Least Processing Time Algorithm to sift the
>> files into roughly balanced partitions by size
>> 3. API: S3Context(sc).textFileByPrefix("bucket", "file1",
>> "folder2").regularRDDOperationsHere or import implicits and do
>> sc.s3.textFileByPrefix
>> 
>> At present, I am battle testing and benchmarking it at my current job and
>> results are promising with significant improvements to jobs dealing with many
>> files especially many small files and to jobs whose input is unbalanced to
>> start with. Jobs perform better because: 1) there isn't a long stall at the
>> driver when hadoop decides how to split S3 files 2) the partitions end up
>> nearly perfectly balanced because of LPT algorithm.
>> 
>> Since I hadn't intended to advertise this quite yet the documentation is not
>> super polished but exists here:
>> http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context
>> 
>> I am completing the sonatype process for publishing artifacts on maven
>> central (this should be done by tomorrow so referencing
>> "io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to
>> hear if this library solution works, otherwise I hope the blog post above is
>> illuminating.
>> 
>> Pedro
>> 
>> On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson
>> <a...@santacruzintegration.com> wrote:
>>> I have a relatively small data set however it is split into many small JSON
>>> files. Each file is between maybe 4K and 400K
>>> This is probably a very common issue for anyone using spark streaming. My
>>> streaming app works fine, how ever my batch application takes several hours
>>> to run. 
>>> 
>>> All I am doing is calling count(). Currently I am trying to read the files
>>> from s3. When I look at the app UI it looks like spark is blocked probably
>>> on IO? Adding additional workers and memory does not improve performance.
>>> 
>>> I am able to copy the files from s3 to a worker relatively quickly. So I do
>>> not think s3 read time is the problem.
>>> 
>>> In the past when I had similar data sets stored on HDFS I was able to use
>>> coalesce() to reduce the number of partition from 200K to 30. This made a
>>> big improvement in processing time. How ever when I read from s3 coalesce()
>>> does not improve performance.
>>> 
>>> I tried copying the files to a normal file system and then using Œhadoop fs
>>> put¹ to copy the files to hdfs how ever this takes several hours and is no
>>> where near completion. It appears hdfs does not deal with small files well.
>>> 
>>> I am considering copying the files from s3 to a normal file system on one of
>>> my workers and then concatenating the files into a few much large files,
>>> then using Œhadoop fs put¹ to move them to hdfs. Do you think this would
>>> improve the spark count() performance issue?
>>> 
>>> Does anyone know of heuristics for determining the number or size of the
>>> concatenated files?
>>> 
>>> Thanks in advance
>>> 
>>> Andy
>> 
>> 
>> 
>> -- 
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>> 
>> ski.rodrig...@gmail.com | pedrorodriguez.io <http://pedrorodriguez.io>  |
>> 909-353-4423
>> Github: github.com/EntilZha <http://github.com/EntilZha>  | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>> 




Re: spark 1.6.0 read s3 files error.

2016-07-28 Thread Andy Davidson
Hi Freedafeng

Can you tells a little more? I.E. Can you paste your code and error message?

Andy

From:  freedafeng 
Date:  Thursday, July 28, 2016 at 9:21 AM
To:  "user @spark" 
Subject:  Re: spark 1.6.0 read s3 files error.

> The question is, what is the cause of the problem? and how to fix it? Thanks.
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-0-read-s3-files-
> error-tp27417p27424.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 




Re: performance problem when reading lots of small files created by spark streaming.

2016-07-28 Thread Andy Davidson
; files into roughly balanced partitions by size
> 3. API: S3Context(sc).textFileByPrefix("bucket", "file1",
> "folder2").regularRDDOperationsHere or import implicits and do
> sc.s3.textFileByPrefix
> 
> At present, I am battle testing and benchmarking it at my current job and
> results are promising with significant improvements to jobs dealing with many
> files especially many small files and to jobs whose input is unbalanced to
> start with. Jobs perform better because: 1) there isn't a long stall at the
> driver when hadoop decides how to split S3 files 2) the partitions end up
> nearly perfectly balanced because of LPT algorithm.
> 
> Since I hadn't intended to advertise this quite yet the documentation is not
> super polished but exists here:
> http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context
> 
> I am completing the sonatype process for publishing artifacts on maven central
> (this should be done by tomorrow so referencing
> "io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to hear
> if this library solution works, otherwise I hope the blog post above is
> illuminating.
> 
> Pedro
> 
> On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> I have a relatively small data set however it is split into many small JSON
>> files. Each file is between maybe 4K and 400K
>> This is probably a very common issue for anyone using spark streaming. My
>> streaming app works fine, how ever my batch application takes several hours
>> to run. 
>> 
>> All I am doing is calling count(). Currently I am trying to read the files
>> from s3. When I look at the app UI it looks like spark is blocked probably on
>> IO? Adding additional workers and memory does not improve performance.
>> 
>> I am able to copy the files from s3 to a worker relatively quickly. So I do
>> not think s3 read time is the problem.
>> 
>> In the past when I had similar data sets stored on HDFS I was able to use
>> coalesce() to reduce the number of partition from 200K to 30. This made a big
>> improvement in processing time. How ever when I read from s3 coalesce() does
>> not improve performance.
>> 
>> I tried copying the files to a normal file system and then using Œhadoop fs
>> put¹ to copy the files to hdfs how ever this takes several hours and is no
>> where near completion. It appears hdfs does not deal with small files well.
>> 
>> I am considering copying the files from s3 to a normal file system on one of
>> my workers and then concatenating the files into a few much large files, then
>> using Œhadoop fs put¹ to move them to hdfs. Do you think this would improve
>> the spark count() performance issue?
>> 
>> Does anyone know of heuristics for determining the number or size of the
>> concatenated files?
>> 
>> Thanks in advance
>> 
>> Andy
> 
> 
> 
> -- 
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
> 
> ski.rodrig...@gmail.com | pedrorodriguez.io <http://pedrorodriguez.io>  |
> 909-353-4423
> Github: github.com/EntilZha <http://github.com/EntilZha>  | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
> 




performance problem when reading lots of small files created by spark streaming.

2016-07-27 Thread Andy Davidson
I have a relatively small data set however it is split into many small JSON
files. Each file is between maybe 4K and 400K
This is probably a very common issue for anyone using spark streaming. My
streaming app works fine, how ever my batch application takes several hours
to run. 

All I am doing is calling count(). Currently I am trying to read the files
from s3. When I look at the app UI it looks like spark is blocked probably
on IO? Adding additional workers and memory does not improve performance.

I am able to copy the files from s3 to a worker relatively quickly. So I do
not think s3 read time is the problem.

In the past when I had similar data sets stored on HDFS I was able to use
coalesce() to reduce the number of partition from 200K to 30. This made a
big improvement in processing time. How ever when I read from s3 coalesce()
does not improve performance.

I tried copying the files to a normal file system and then using Œhadoop fs
put¹ to copy the files to hdfs how ever this takes several hours and is no
where near completion. It appears hdfs does not deal with small files well.

I am considering copying the files from s3 to a normal file system on one of
my workers and then concatenating the files into a few much large files,
then using Œhadoop fs put¹ to move them to hdfs. Do you think this would
improve the spark count() performance issue?

Does anyone know of heuristics for determining the number or size of the
concatenated files?

Thanks in advance

Andy




Re: A question about Spark Cluster vs Local Mode

2016-07-27 Thread Andy Davidson
Hi Ascot

When you run in cluster mode it means your cluster manager will cause your
driver to execute on one of the works in your cluster.

The advantage of this is you can log on to a machine in your cluster and
submit your application and then log out. The application will continue to
run.

Here is part of shell script I use to start a streaming app in cluster mode.
This app has been running for several months now

numCores=2 # must be at least 2 else steaming app will not get any data.
over all we are using 3 cores



# --executor-memory=1G # default is supposed to be 1G. If we did not set we
are seeing 6G

executorMem=1G



$SPARK_ROOT/bin/spark-submit \

--class "com.pws.sparkStreaming.collector.StreamingKafkaCollector" \

--master $MASTER_URL \

--deploy-mode cluster \

--total-executor-cores $numCores \

--executor-memory $executorMem \

$jarPath --clusterMode $*



From:  Ascot Moss 
Date:  Wednesday, July 27, 2016 at 6:48 PM
To:  "user @spark" 
Subject:  A question about Spark Cluster vs Local Mode

> Hi,
> 
> If I submit the same job to spark in cluster mode, does it mean in cluster
> mode it will be run in cluster memory pool and it will fail if it runs out of
> cluster's memory?
> 
> 
> --driver-memory 64g \
> 
> --executor-memory 16g \
> 
> Regards




how to copy local files to hdfs quickly?

2016-07-27 Thread Andy Davidson
I have a spark streaming app that saves JSON files to s3:// . It works fine

Now I need to calculate some basic summary stats and am running into
horrible performance problems.

I want to run a test to see if reading from hdfs instead of s3 makes
difference. I am able to quickly copy my the data from s3 to a machine in my
cluster how ever hadoop fs ­put is pain fully slow. Is there a better way to
copy large data to hdfs?

I should mention I am not using EMR . I.E. According to AWS support there is
no way to have Œ$aws s3¹ copy directory to hdfs://

Hadoop distcp can not copy files from the local files system

Thanks in advance

Andy








spark-2.x what is the default version of java ?

2016-07-27 Thread Andy Davidson
I currently have to configure spark-1.x to use Java 8 and python 3.x. I
noticed that 
http://spark.apache.org/releases/spark-release-2-0-0.html#removals mentions
java 7 is deprecated.

Is the default now Java 8 ?

Thanks 

Andy

Deprecations
The following features have been deprecated in Spark 2.0, and might be
removed in future versions of Spark 2.x:
* Fine-grained mode in Apache Mesos
* Support for Java 7
* Support for Python 2.6




Re: spark 1.6.0 read s3 files error.

2016-07-27 Thread Andy Davidson
Hi Freedafeng

The following works for me df will be a data frame. fullPath is lists list
of various part files stored in s3.

fullPath = 
['s3n:///json/StreamingKafkaCollector/s1/2016-07-10/146817304/part-r
-0-a2121800-fa5b-44b1-a994-67795' ]

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('json').load(fullPath).select(³key1") #.cache()


My file is raw JSON. I you might have to tweak the above statement to work
with gz files

Andy


From:  freedafeng 
Date:  Wednesday, July 27, 2016 at 10:36 AM
To:  "user @spark" 
Subject:  spark 1.6.0 read s3 files error.

> cdh 5.7.1. pyspark.
> 
> codes: ===
> from pyspark import SparkContext, SparkConf
> 
> conf = SparkConf().setAppName('s3 ---')
> sc = SparkContext(conf=conf)
> 
> myRdd =
> sc.textFile("s3n:///y=2016/m=5/d=26/h=20/2016.5.26.21.9.52.6d53180a-28b9-4
> e65-b749-b4a2694b9199.json.gz")
> 
> count = myRdd.count()
> print "The count is", count
> 
> ===
> standalone mode: command line:
> 
> AWS_ACCESS_KEY_ID=??? AWS_SECRET_ACCESS_KEY=??? ./bin/spark-submit
> --driver-memory 4G  --master  spark://master:7077 --conf
> "spark.default.parallelism=70"  /root/workspace/test/s3.py
> 
> Error:
> )
> 16/07/27 17:27:26 INFO spark.SparkContext: Created broadcast 0 from textFile
> at NativeMethodAccessorImpl.java:-2
> Traceback (most recent call last):
>   File "/root/workspace/test/s3.py", line 12, in 
> count = myRdd.count()
>   File
> "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/pyspark
> .zip/pyspark/rdd.py",
> line 1004, in count
>   File
> "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/pyspark
> .zip/pyspark/rdd.py",
> line 995, in sum
>   File
> "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/pyspark
> .zip/pyspark/rdd.py",
> line 869, in fold
>   File
> "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/pyspark
> .zip/pyspark/rdd.py",
> line 771, in collect
>   File
> "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/py4j-0.
> 9-src.zip/py4j/java_gateway.py",
> line 813, in __call__
>   File
> "/opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/lib/spark/python/lib/py4j-0.
> 9-src.zip/py4j/protocol.py",
> line 308, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : java.lang.VerifyError: Bad type on operand stack
> Exception Details:
>   Location:
>
> org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/Stri
> ng;Ljava/lang/String;)V
> @155: invokevirtual
>   Reason:
> Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is
> not assignable to 'org/jets3t/service/model/StorageObject'
>   Current Frame:
> bci: @155
> flags: { }
> locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore',
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object'
> }
> stack: { 'org/jets3t/service/S3Service', 'java/lang/String',
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object',
> integer }
>   Bytecode:
> 000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
> 010: 5659 b701 5713 0192 b601 5b2b b601 5b13
> 020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
> 030: b400 7db6 00e7 b601 5bb6 015e b901 9802
> 040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
> 050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
> 060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
> 070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
> 080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
> 090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
> 0a0: 000a 4e2a 2d2b b700 c7b1
>   Exception Handler Table:
> bci [0, 116] => handler: 162
> bci [117, 159] => handler: 162
>   Stackmap Table:
> same_frame_extended(@65)
> same_frame(@117)
> same_locals_1_stack_item_frame(@162,Object[#139])
> same_frame(@169)
> 
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3Fi
> leSystem.java:338)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem
> .java:328)
> 
> .
> 
> TIA
> 
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-0-read-s3-files-
> error-tp27417.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 




spark-2.0 support for spark-ec2 ?

2016-07-27 Thread Andy Davidson
Congratulations on releasing 2.0!



spark-2.0.0-bin-hadoop2.7 no longer includes the spark-ec2 script How ever
http://spark.apache.org/docs/latest/index.html  has a link to the spark-ec2
github repo https://github.com/amplab/spark-ec2



Is this the right group to discuss spark-ec2?

Any idea how stable spark-ec2 is on spark-2.0?

Should we use master or branch-2.0? It looks like the default might be the
branch-1.6 ?

Thanks

Andy


P.s. The new stand alone documentation is a big improvement. I have a much
better idea of what spark-ec2 does and how to upgrade my system.















getting more concurrency best practices

2016-07-26 Thread Andy Davidson
Bellow is a very simple application. It runs very slowly. It does not look
like I am getting a lot of parallel execution. I image this is a very common
work flow. Periodically I want to runs some standard summary statistics
across several different data sets.

Any suggestions would be greatly appreciated.

Andy

Overview 
All the sets use the same data format. The data is twitter tweet stored in
JSON. The JSON is very complicated. Each record could be as large as 4k. The
data is collected using spark streaming. Every mini batch is stored in S3 as
separate object. E.G. s3n://buckName/date/timestampMS/parts*. I only select
one col. From the data frame. The column is “top” level key in the JSON
structure

The program is simple

For each data set
1. Find all the part files
2. Load them into a data frame
3. Calculate the summary stat and print
4. Free memory
In my example bellow the data sets are not very big.

# fullPath is list of part files.
sqlContext.read.format('json').load(fullPath).select("body") #.cache()


1
%%timeit -n 1 -r 1
2
# %timeit # line magic
3
# %%timeit # cell magic
4
# -n 1 -r 1 # run cell once
5
​
6
for prefix in districtDataSets:
7
dataSet = [name for name in constituentDataSets if
name.startswith(prefix)]
8
# print(dataSets)
9
# would be nice if we could have this loop run in parallel
10
constituentDFS = getDataFrames(dataSet) # returns a dictionary
11
# we could union but would probably be slower
12
total = 0
13
for name in constituentDFS:
14
c = constituentDFS[name].count();
15
total = total + c;
16
print("{} {:15,}".format(prefix, total))
17
# free memory
18
del constituentDFS
19

ne-2 110169
fl-8 12
mi-1 2552
ny-19 27657
ny-24 59739
pa-8 42867
wi-8 7352
ny-3 51136
ny-1 105296
ny-22 5671287
mn-2 34834
tx-8 5246
il-6 12772
co-6 24700
1 loop, best of 1: 2h 41min 8s per loop
Environment

I am using spark-1.6.1

My app is using
10 cores, 
6GB per node
5 executors
1 driver

Each executor has at most 2 active tasks


Over all the resources do not seem to be utilized well. I do not think
adding machines would improve performance.

I launch the notebook server as follows

#

# allow notebooks to use all avalible resources

# 

export PYSPARK_PYTHON=python3.4

export PYSPARK_DRIVER_PYTHON=python3.4

export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"

$SPARK_ROOT/bin/pyspark \

--master $MASTER_URL \

--driver-memory 2G \

--executor-memory 6G \

$extraPkgs \

$*



All my data is being read from s3
- Is there an easy way to figure out how much time I am spending reading?
- I am guessing S3 is really slow. I have lot of objects to read.
- I image copying the data to HDFS would run faster how ever I have not
found an easy way to copy the data. I am using ec2. Looks like I would have
to copy from s3 to a file partition in my cluster and then copy to HDFS



Looking at the stages It does not look like shuffle is a major problem







Re: Spark Web UI port 4040 not working

2016-07-26 Thread Andy Davidson
Yup in cluster mode you need to figure out what machine the driver is
running on. That is the machine the UI will run on

https://issues.apache.org/jira/browse/SPARK-15829

You may also have fire wall issues

Here are some notes I made about how to figure out what machine the driver
is running on when using cluster mode

Application UI Bug work around
If our application is running in cluster mode the Master Console UI will
incorrectly assume the Application UI¹s are running on the master machine.
The UI is actually running on the same machine as the driver
To find the URL to the apps running in cluster mode
1. find the worker id for the app you are interested in.
* on the master console UI go to the Running App¹s section
* you should see a column ŒWorker¹
* a worker is something like worker-20160322041632-172.31.23.201-34909
2. got to the AWS ec2 console
3. from the private ip find the public DNS name
* the private ip in our example is 172.31.23.201

From:  Jacek Laskowski 
Date:  Tuesday, July 26, 2016 at 6:38 AM
To:  Jestin Ma 
Cc:  Chanh Le , "user @spark" 
Subject:  Re: Spark Web UI port 4040 not working

> Hi,
> 
> Do you perhaps deploy using cluster mode? Is this EC2? You'd need to
> figure out where the driver runs and use the machine's IP.
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
> On Tue, Jul 26, 2016 at 3:36 PM, Jestin Ma  wrote:
>>  I tried doing that on my master node.
>>  I got nothing.
>>  However, I grep'd port 8080 and I got the standalone UI.
>> 
>>  On Tue, Jul 26, 2016 at 12:39 AM, Chanh Le  wrote:
>>> 
>>>  You¹re running in StandAlone Mode?
>>>  Usually inside active task it will show the address of current job.
>>>  or you can check in master node by using netstat -apn | grep 4040
>>> 
>>> 
>>> 
  > On Jul 26, 2016, at 8:21 AM, Jestin Ma 
  > wrote:
  >
  > Hello, when running spark jobs, I can access the master UI (port 8080
  > one) no problem. However, I'm confused as to how to access the web UI to
 see
  > jobs/tasks/stages/etc.
  >
  > I can access the master UI at http://:8080. But port 4040
  > gives me a -connection cannot be reached-.
  >
  > Is the web UI http:// with a port of 4040?
  >
  > I'm running my Spark job on a cluster machine and submitting it to a
  > master node part of the cluster. I heard of ssh tunneling; is that
 relevant
  > here?
  >
  > Thank you!
>>> 
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 




Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread Andy Davidson
Hi Kevin

Just a heads up at the recent spark summit in S.F. There was a presentation
about streaming in 2.0. They said that streaming was not going to production
ready in 2.0.

I am not sure if the older 1.6.x version will be supported. My project will
not be able to upgrade with streaming support. We also use kafka

Andy

From:  Marco Mistroni 
Date:  Monday, July 25, 2016 at 2:33 AM
To:  kevin 
Cc:  "user @spark" , "dev.spark"

Subject:  Re: where I can find spark-streaming-kafka for spark2.0

> Hi Kevin
>   you should not need to rebuild everything.
> Instead, i believe you should launch spark-submit by specifying the kafka jar
> file in your --packages... i had to follow same when integrating spark
> streaming with flume
> 
>   have you checked this link ?
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html
> 
> 
> hth
> 
>   
> 
> On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:
>> I have compile it from source code
>> 
>> 2016-07-25 12:05 GMT+08:00 kevin :
>>> hi,all :
>>> I try to run example org.apache.spark.examples.streaming.KafkaWordCount , I
>>> got error :
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>> at 
>>> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scal
>>> a:57)
>>> at 
>>> 
org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala>>>
)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at 
>>> 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62>>>
)
>>> at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
>>> .java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at 
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ru
>>> nMain(SparkSubmit.scala:724)
>>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.spark.streaming.kafka.KafkaUtils$
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> ... 11 more
>>> 
>>> so where I can find spark-streaming-kafka for spark2.0
>> 
> 




Re: Exception in thread "dispatcher-event-loop-1" java.lang.OutOfMemoryError: Java heap space

2016-07-22 Thread Andy Davidson
Hi Ted

In general I want this application to use all available resources. I just
bumped the driver memory to 2G. I also bumped the executor memory up to 2G.

It will take a couple of hours before I know if this made a difference or
not

I am not sure if setting executor memory is a good idea. I am concerned that
this will reduce concurrency

Thanks

Andy

From:  Ted Yu <yuzhih...@gmail.com>
Date:  Friday, July 22, 2016 at 2:54 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: Exception in thread "dispatcher-event-loop-1"
java.lang.OutOfMemoryError: Java heap space

> How much heap memory do you give the driver ?
> 
> On Fri, Jul 22, 2016 at 2:17 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> Given I get a stack trace in my python notebook I am guessing the driver is
>> running out of memory?
>> 
>> My app is simple it creates a list of dataFrames from s3://, and counts each
>> one. I would not think this would take a lot of driver memory
>> 
>> I am not running my code locally. Its using 12 cores. Each node has 6G.
>> 
>> Any suggestions would be greatly appreciated
>> 
>> Andy
>> 
>> def work():
>> 
>> constituentDFS = getDataFrames(constituentDataSets)
>> 
>> results = ["{} {}".format(name, constituentDFS[name].count()) for name in
>> constituentDFS]
>> 
>> print(results)
>> 
>> return results
>> 
>> 
>> 
>> %timeit -n 1 -r 1 results = work()
>> 
>> 
>>  in (.0)  1 def work():  2
>> constituentDFS = getDataFrames(constituentDataSets)> 3 results = ["{}
>> {}".format(name, constituentDFS[name].count()) for name in constituentDFS]
>> 4 print(results)  5 return results
>> 
>> 16/07/22 17:54:38 WARN TaskSetManager: Stage 146 contains a task of very
>> large size (145 KB). The maximum recommended task size is 100 KB.
>> 
>> 16/07/22 18:39:47 WARN HeartbeatReceiver: Removing executor 2 with no recent
>> heartbeats: 153037 ms exceeds timeout 12 ms
>> 
>> Exception in thread "dispatcher-event-loop-1" java.lang.OutOfMemoryError:
>> Java heap space
>> 
>> at java.util.jar.Manifest$FastInputStream.(Manifest.java:332)
>> 
>> at java.util.jar.Manifest$FastInputStream.(Manifest.java:327)
>> 
>> at java.util.jar.Manifest.read(Manifest.java:195)
>> 
>> at java.util.jar.Manifest.(Manifest.java:69)
>> 
>> at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
>> 
>> at java.util.jar.JarFile.getManifest(JarFile.java:180)
>> 
>> at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
>> 
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
>> 
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>> 
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>> 
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>> 
>> at java.security.AccessController.doPrivileged(Native Method)
>> 
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>> 
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> 
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> 
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> 
>> at 
>> org.apache.spark.scheduler.TaskSchedulerImpl.logExecutorLoss(TaskSchedulerImp
>> l.scala:510)
>> 
>> at 
>> org.apache.spark.scheduler.TaskSchedulerImpl.executorLost(TaskSchedulerImpl.s
>> cala:473)
>> 
>> at 
>> org.apache.spark.HeartbeatReceiver$$anonfun$org$apache$spark$HeartbeatReceive
>> r$$expireDeadHosts$3.apply(HeartbeatReceiver.scala:199)
>> 
>> at 
>> org.apache.spark.HeartbeatReceiver$$anonfun$org$apache$spark$HeartbeatReceive
>> r$$expireDeadHosts$3.apply(HeartbeatReceiver.scala:195)
>> 
>> at 
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Traversa
>> bleLike.scala:772)
>> 
>> at 
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>> 
>> at 
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>> 
>> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>> 
>> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>> 
>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>> 
>> at 
>> 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771>>
)
>> 
>> at org.apache.spark.HeartbeatReceiver.org
>> <http://org.apache.spark.HeartbeatReceiver.org>
>> $apache$spark$HeartbeatReceiver$$expireDeadHosts(HeartbeatReceiver.scala:195)
>> 
>> at 
>> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(Hea
>> rtbeatReceiver.scala:118)
>> 
>> at 
>> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:
>> 104)
>> 
>> at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
>> 
>> at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>> 
>> 16/07/22 19:08:29 WARN NettyRpcEnv: Ignored message: true
>> 
>> 
> 




Exception in thread "dispatcher-event-loop-1" java.lang.OutOfMemoryError: Java heap space

2016-07-22 Thread Andy Davidson
Given I get a stack trace in my python notebook I am guessing the driver is
running out of memory?

My app is simple it creates a list of dataFrames from s3://, and counts each
one. I would not think this would take a lot of driver memory

I am not running my code locally. Its using 12 cores. Each node has 6G.

Any suggestions would be greatly appreciated

Andy

def work():

constituentDFS = getDataFrames(constituentDataSets)

results = ["{} {}".format(name, constituentDFS[name].count()) for name
in constituentDFS]

print(results)

return results



%timeit -n 1 -r 1 results = work()


 in (.0)  1 def work():  2
constituentDFS = getDataFrames(constituentDataSets)> 3 results =
["{} {}".format(name, constituentDFS[name].count()) for name in
constituentDFS]  4 print(results)  5 return results

16/07/22 17:54:38 WARN TaskSetManager: Stage 146 contains a task of very
large size (145 KB). The maximum recommended task size is 100 KB.

16/07/22 18:39:47 WARN HeartbeatReceiver: Removing executor 2 with no recent
heartbeats: 153037 ms exceeds timeout 12 ms

Exception in thread "dispatcher-event-loop-1" java.lang.OutOfMemoryError:
Java heap space

at java.util.jar.Manifest$FastInputStream.(Manifest.java:332)

at java.util.jar.Manifest$FastInputStream.(Manifest.java:327)

at java.util.jar.Manifest.read(Manifest.java:195)

at java.util.jar.Manifest.(Manifest.java:69)

at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)

at java.util.jar.JarFile.getManifest(JarFile.java:180)

at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)

at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)

at java.net.URLClassLoader.access$100(URLClassLoader.java:73)

at java.net.URLClassLoader$1.run(URLClassLoader.java:368)

at java.net.URLClassLoader$1.run(URLClassLoader.java:362)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:361)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at 
org.apache.spark.scheduler.TaskSchedulerImpl.logExecutorLoss(TaskSchedulerIm
pl.scala:510)

at 
org.apache.spark.scheduler.TaskSchedulerImpl.executorLost(TaskSchedulerImpl.
scala:473)

at 
org.apache.spark.HeartbeatReceiver$$anonfun$org$apache$spark$HeartbeatReceiv
er$$expireDeadHosts$3.apply(HeartbeatReceiver.scala:199)

at 
org.apache.spark.HeartbeatReceiver$$anonfun$org$apache$spark$HeartbeatReceiv
er$$expireDeadHosts$3.apply(HeartbeatReceiver.scala:195)

at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Travers
ableLike.scala:772)

at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)

at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)

at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)

at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:77
1)

at 
org.apache.spark.HeartbeatReceiver.org$apache$spark$HeartbeatReceiver$$expir
eDeadHosts(HeartbeatReceiver.scala:195)

at 
org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(He
artbeatReceiver.scala:118)

at 
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala
:104)

at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)

at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)

16/07/22 19:08:29 WARN NettyRpcEnv: Ignored message: true






running jupyter notebook server Re: spark and plot data

2016-07-22 Thread Andy Davidson
Hi Pseudo

I do not know much about zeppelin . What languages are you using?

I have been doing my data exploration and graphing using python mostly
because early on spark had good support for python. Its easy to collect()
data as a local PANDAS object. I think at this point R should work well. You
should be able to easily collect() your data as a R dataframe. I have not
tried to Rstudio.

I typically run the Jupiter notebook server in my data center. I find the
notebooks really nice. I typically use matplotlib to generates my graph.
There are a lot of graphing packages.

Attached is the script I use to start the notebook server. This script and
process works but is a little hacky You call it as follows


#
# on a machine in your cluster
#
$ cd dirWithNotebooks

# all the logs will be in startIPythonNotebook.sh.out
# nohup allows you to log in start your notebook server and log out.
$ nohup startIPythonNotebook.sh > startIPythonNotebook.sh.out &

#
# on you local machine
#

# because of firewalls I need to open an ssh tunnel
$ ssh -o ServerAliveInterval=120 -N -f -L localhost:8889:localhost:7000
myCluster

# connect to the notebook server using the browser of you choice
http://localhost:8889




#
# If you need to stop your notebooks server you may need to kill the server
# there is probably a cleaner way to do this
# $ ps -el | head -1; ps -efl | grep python
#

 <http://jupyter.org/> http://jupyter.org/


P.S. Jupiter is in the process of being released. The new Juypter lab alpha
was just announced it looks really sweet.



From:  pseudo oduesp <pseudo20...@gmail.com>
Date:  Friday, July 22, 2016 at 2:08 AM
To:  Andrew Davidson <a...@santacruzintegration.com>
Subject:  Re: spark and plot data

> HI andy  ,
> thanks for reply ,
> i tell it just hard to each time switch  between local concept and destributed
> concept , for example zepplin give easy way to interact with data ok , but
> it's hard to configure on huge cluster with lot of node in my case i have
> cluster with 69 nodes and i process huge volume of data with pyspark and it
> cool but when  i want to plot some chart  i get hard job to make it .
> 
> i sampling my result or aggregate  , take for example if i user randomforest
> algorithme in machine learning  i want to retrive  most importante features
> with my version alerady installed in our cluster (1.5.0) i can't get this.
> 
> do you have any solution.
> 
> Thanks 
> 
> 2016-07-21 18:44 GMT+02:00 Andy Davidson <a...@santacruzintegration.com>:
>> Hi Pseudo
>> 
>> Plotting, graphing, data visualization, report generation are common needs in
>> scientific and enterprise computing.
>> 
>> Can you tell me more about your use case? What is it about the current
>> process / workflow do you think could be improved by pushing plotting (I
>> assume you mean plotting and graphing) into spark.
>> 
>> 
>> In my personal work all the graphing is done in the driver on summary stats
>> calculated using spark. So for me using standard python libs has not been a
>> problem.
>> 
>> Andy
>> 
>> From:  pseudo oduesp <pseudo20...@gmail.com>
>> Date:  Thursday, July 21, 2016 at 8:30 AM
>> To:  "user @spark" <user@spark.apache.org>
>> Subject:  spark and plot data
>> 
>>> Hi , 
>>> i know spark  it s engine  to compute large data set but for me i work with
>>> pyspark and it s very wonderful machine
>>> 
>>> my question  we  don't have tools for ploting data each time we have to
>>> switch and go back to python for using plot.
>>> but when you have large result scatter plot or roc curve  you cant use
>>> collect to take data .
>>> 
>>> somone have propostion for plot .
>>> 
>>> thanks 
> 




startIPythonNotebook.sh
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: How to submit app in cluster mode? port 7077 or 6066

2016-07-21 Thread Andy Davidson
Thanks

Andy

From:  Saisai Shao <sai.sai.s...@gmail.com>
Date:  Thursday, July 21, 2016 at 6:11 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: How to submit app in cluster mode? port 7077 or 6066

> I think both 6066 and 7077 can be worked. 6066 is using the REST way to submit
> application, while 7077 is the legacy way. From user's aspect, it should be
> transparent and no need to worry about the difference.
> 
> * URL: spark://hw12100.local:7077
> * REST URL: spark://hw12100.local:6066 (cluster mode)
> 
> Thanks
> Saisai
> 
> On Fri, Jul 22, 2016 at 6:44 AM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> I have some very long lived streaming apps. They have been running for
>> several months. I wonder if something has changed recently? I first started
>> working with spark-1.3 . I am using the stand alone cluster manager. The way
>> I would submit my app to run in cluster mode was port 6066
>> 
>> 
>> Looking at the spark-1.6 it seems like we are supposed to use port 7077 and
>> the  new argument
>> 
>> http://spark.apache.org/docs/latest/submitting-applications.html#launching-ap
>> plications-with-spark-submit
>> * --deploy-mode: Whether to deploy your driver on the worker nodes (cluster)
>> or locally as an external client (client) (default: client) †
>> 
>> Can anyone confirm this. It took me a very long time to figure out how to get
>> things to run cluster mode.
>> 
>> Thanks
>> 
>> Andy
> 




How to submit app in cluster mode? port 7077 or 6066

2016-07-21 Thread Andy Davidson
I have some very long lived streaming apps. They have been running for
several months. I wonder if something has changed recently? I first started
working with spark-1.3 . I am using the stand alone cluster manager. The way
I would submit my app to run in cluster mode was port 6066


Looking at the spark-1.6 it seems like we are supposed to use port 7077 and
the  new argument 

http://spark.apache.org/docs/latest/submitting-applications.html#launching-a
pplications-with-spark-submit
* --deploy-mode: Whether to deploy your driver on the worker nodes (cluster)
or locally as an external client (client) (default: client) †

Can anyone confirm this. It took me a very long time to figure out how to
get things to run cluster mode.

Thanks

Andy




Re: spark and plot data

2016-07-21 Thread Andy Davidson
Hi Pseudo

Plotting, graphing, data visualization, report generation are common needs
in scientific and enterprise computing.

Can you tell me more about your use case? What is it about the current
process / workflow do you think could be improved by pushing plotting (I
assume you mean plotting and graphing) into spark.


In my personal work all the graphing is done in the driver on summary stats
calculated using spark. So for me using standard python libs has not been a
problem.

Andy

From:  pseudo oduesp 
Date:  Thursday, July 21, 2016 at 8:30 AM
To:  "user @spark" 
Subject:  spark and plot data

> Hi , 
> i know spark  it s engine  to compute large data set but for me i work with
> pyspark and it s very wonderful machine
> 
> my question  we  don't have tools for ploting data each time we have to switch
> and go back to python for using plot.
> but when you have large result scatter plot or roc curve  you cant use collect
> to take data .
> 
> somone have propostion for plot .
> 
> thanks 




Re: write and call UDF in spark dataframe

2016-07-20 Thread Andy Davidson
Hi Divya

In general you will get better performance if you can minimize your use of
UDFs. Spark 2.0/ tungsten does a lot of code generation. It will have to
treat your UDF as a block box.

Andy

From:  Rishabh Bhardwaj 
Date:  Wednesday, July 20, 2016 at 4:22 AM
To:  Rabin Banerjee 
Cc:  Divya Gehlot , "user @spark"

Subject:  Re: write and call UDF in spark dataframe

> Hi Divya,
> 
> There is already "from_unixtime" exists in org.apache.spark.sql.frunctions,
> Rabin has used that in the sql query,if you want to use it in dataframe DSL
> you can try like this,
> 
>> val new_df = df.select(from_unixtime($"time").as("newtime"))
> 
> Thanks,
> Rishabh.
> 
> On Wed, Jul 20, 2016 at 4:21 PM, Rabin Banerjee 
> wrote:
>> Hi Divya ,
>> 
>> Try,
>> 
>> val df = sqlContext.sql("select from_unixtime(ts,'-MM-dd') as `ts` from
>> mr")
>> Regards,
>> Rabin
>> 
>> On Wed, Jul 20, 2016 at 12:44 PM, Divya Gehlot 
>> wrote:
>>> Hi,
>>> Could somebody share example of writing and calling udf which converts unix
>>> tme stamp to date tiime .
>>> 
>>> 
>>> Thanks,
>>> Divya 
>> 
> 




Re: Role-based S3 access outside of EMR

2016-07-19 Thread Andy Davidson
Hi Everett

I always do my initial data exploration and all our product development in
my local dev env. I typically select a small data set and copy it to my
local machine

My main() has an optional command line argument Œ- - runLocal¹ Normally I
load data from either hdfs:/// or S3n:// . If the arg is set I read from
file:///

Sometime I use a CLI arg Œ- -dataFileURL¹

So in your case I would log into my data cluster and use ³AWS s3 cp" to copy
the data into my cluster and then use ³SCP² to copy the data from the data
center back to my local env.

Andy

From:  Everett Anderson 
Date:  Tuesday, July 19, 2016 at 2:30 PM
To:  "user @spark" 
Subject:  Role-based S3 access outside of EMR

> Hi,
> 
> When running on EMR, AWS configures Hadoop to use their EMRFS Hadoop
> FileSystem implementation for s3:// URLs and seems to install the necessary S3
> credentials properties, as well.
> 
> Often, it's nice during development to run outside of a cluster even with the
> "local" Spark master, though, which I've found to be more troublesome. I'm
> curious if I'm doing this the right way.
> 
> There are two issues -- AWS credentials and finding the right combination of
> compatible AWS SDK and Hadoop S3 FileSystem dependencies.
> 
> Credentials and Hadoop Configuration
> 
> For credentials, some guides recommend setting AWS_SECRET_ACCESS_KEY and
> AWS_ACCESS_KEY_ID environment variables or putting the corresponding
> properties in Hadoop XML config files, but it seems better practice to rely on
> machine roles and not expose these.
> 
> What I end up doing is, in code, when not running on EMR, creating a
> DefaultAWSCredentialsProviderChain
>  ultAWSCredentialsProviderChain.html>  and then installing the following
> properties in the Hadoop Configuration using it:
> 
> fs.s3.awsAccessKeyId
> fs.s3n.awsAccessKeyId
> fs.s3a.awsAccessKeyId
> fs.s3.awsSecretAccessKey
> fs.s3n.awsSecretAccessKey
> fs.s3a.awsSecretAccessKey
> 
> I also set the fs.s3.impl and fs.s3n.impl properties to
> org.apache.hadoop.fs.s3a.S3AFileSystem to force them to use the S3A
> implementation since people usually use "s3://" URIs.
> 
> SDK and File System Dependencies
> 
> Some special combination 
> of the Hadoop version, AWS SDK version, and hadoop-aws is necessary.
> 
> One working S3A combination with Spark 1.6.1 + Hadoop 2.7.x for me seems to be
> with
> 
> --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2
> 
> Is this generally what people do? Is there a better way?
> 
> I realize this isn't entirely a Spark-specific problem, but as so many people
> seem to be using S3 with Spark, I imagine this community's faced the problem a
> lot.
> 
> Thanks!
> 
> - Everett
> 




Re: Trouble while running spark at ec2 cluster

2016-07-18 Thread Andy Davidson
Hi Hassan

Typically I log on to my master to submit my app.

[ec2-user@ip-172-31-11-222 bin]$ echo $SPARK_ROOT

/root/spark



[ec2-user@ip-172-31-11-222 bin]$echo $MASTER_URL

spark://ec2-54-215-11-222.us-west-1.compute.amazonaws.com:7077



[ec2-user@ip-172-31-11-222 bin]$ $SPARK_ROOT/bin/spark-submit \

--class "com.pws.sparkStreaming.collector.StreamingKafkaCollector" \

--master $MASTER_URL







I think you might be trying to launch your application from a  machine
outside of your ec2 cluster. I do not think that is going to work when you
submit to port 7077 because the driver is going to be on your local machine.
Also you probably have a file wall issue



Andy


From:  Hassaan Chaudhry 
Date:  Friday, July 15, 2016 at 9:32 PM
To:  "user @spark" 
Subject:  Trouble while running spark at ec2 cluster

> 
> Hi 
> 
> I have launched my cluster and I am trying to submit my application to run on
> cluster but its not allowing me to connect . It prompts  the following error
> "Master endpoint 
> spark://ec2-54-187-59-117.us-west-2.compute.amazonaws.com:7077
>   was not a
> REST server." The command I use to run my application on cluster is
> 
> " /spark-1.6.1/bin/spark-submit --master spark://ec2-54-200-193-107.us-west-
> 2.compute.amazonaws.com:7077 
> --deploy-mode cluster --class BFS target/scala-
> 2.10/scalaexample_2.10-1.0.jar "
> 
> Am i missing something ? Your help will be highly appreciated .
> 
> P.S  I have even tried adding inbound rule to my master node but still no
> success.
> 
> Thanks 




/spark-ec2 script: trouble using ganglia web ui spark 1.6.1

2016-07-11 Thread Andy Davidson
I created a cluster using spark-1.6.1-bin-hadoop2.6/ec2/spark-ec2 script.
The shows ganglia started how ever I am not able to access
http://ec2-54-215-230-73.us-west-1.compute.amazonaws.com:5080/ganglia. I
have tried using the private ip from with in my data center.



I d not see anything listing on port 5080.



Is there some additional step or configuration? (my AWS firewall knowledge
is limited)



Thanks



Andy





$ grep ganglia src/main/resources/scripts/launchCluster.sh.out

Initializing ganglia

[timing] ganglia init:  00h 00m 01s

Configuring /etc/ganglia/gmond.conf

Configuring /etc/ganglia/gmetad.conf

Configuring /etc/httpd/conf.d/ganglia.conf

Setting up ganglia

RSYNC'ing /etc/ganglia to slaves...

[timing] ganglia setup:  00h 00m 03s

Ganglia started at 
http://ec2-xxx.us-west-1.compute.amazonaws.com:5080/ganglia

$ 


bash-4.2# netstat -tulpn

Active Internet connections (only servers)

Proto Recv-Q Send-Q Local Address   Foreign Address
State   PID/Program name

tcp0  0 0.0.0.0:86520.0.0.0:*
LISTEN  3832/gmetad

tcp0  0 0.0.0.0:87870.0.0.0:*
LISTEN  2584/rserver

tcp0  0 0.0.0.0:36757   0.0.0.0:*
LISTEN  2905/java

tcp0  0 0.0.0.0:50070   0.0.0.0:*
LISTEN  2905/java

tcp0  0 0.0.0.0:22  0.0.0.0:*
LISTEN  2144/sshd

tcp0  0 127.0.0.1:631   0.0.0.0:*
LISTEN  2095/cupsd

tcp0  0 127.0.0.1:7000  0.0.0.0:*
LISTEN  6512/python3.4

tcp0  0 127.0.0.1:250.0.0.0:*
LISTEN  2183/sendmail

tcp0  0 0.0.0.0:43813   0.0.0.0:*
LISTEN  3093/java

tcp0  0 172.31.22.140:9000  0.0.0.0:*
LISTEN  2905/java

tcp0  0 0.0.0.0:86490.0.0.0:*
LISTEN  3810/gmond

tcp0  0 0.0.0.0:50090   0.0.0.0:*
LISTEN  3093/java

tcp0  0 0.0.0.0:86510.0.0.0:*
LISTEN  3832/gmetad

tcp0  0 :::8080 :::*
LISTEN  23719/java

tcp0  0 :::8081 :::*
LISTEN  5588/java

tcp0  0 :::172.31.22.140:6066   :::*
LISTEN  23719/java

tcp0  0 :::172.31.22.140:6067   :::*
LISTEN  5588/java

tcp0  0 :::22   :::*
LISTEN  2144/sshd

tcp0  0 ::1:631 :::*
LISTEN  2095/cupsd

tcp0  0 :::19998:::*
LISTEN  3709/java

tcp0  0 :::1:::*
LISTEN  3709/java

tcp0  0 :::172.31.22.140:7077   :::*
LISTEN  23719/java

tcp0  0 :::172.31.22.140:7078   :::*
LISTEN  5588/java

udp0  0 0.0.0.0:86490.0.0.0:*
3810/gmond 

udp0  0 0.0.0.0:631 0.0.0.0:*
2095/cupsd 

udp0  0 0.0.0.0:38546   0.0.0.0:*
2905/java  

udp0  0 0.0.0.0:68  0.0.0.0:*
1142/dhclient  

udp0  0 172.31.22.140:123   0.0.0.0:*
2168/ntpd  

udp0  0 127.0.0.1:123   0.0.0.0:*
2168/ntpd  

udp0  0 0.0.0.0:123 0.0.0.0:*
2168/ntpd   




Re: Spark Streaming - Direct Approach

2016-07-11 Thread Andy Davidson
Hi Pradeep

I can not comment about experimental or production, how ever I recently
started a POC using direct approach.

Its been running off and on for about 2 weeks. In general it seems to work
really well. One thing that is not clear to me is how the cursor is manage.
E.G. I have my topic set to delete after 1 hr. I have had some problem that
caused huge delaying. I.E. They are running after the data is deleted. After
lots of jobs fail it seems to recover. I.E. The cursor advance to valid
position.


I am running into problems how ever I do not think it has to do with the
direct approach. I think it has to do with writing to s3.

Andy

From:  "Mail.com" 
Date:  Monday, July 11, 2016 at 1:43 PM
To:  "user @spark" 
Subject:  Spark Streaming - Direct Approach

> Hi All,
> 
> Can someone please confirm if streaming direct approach for reading Kafka is
> still experimental or can it be used for production use.
> 
> I see the documentation and talk from TD suggesting the advantages of the
> approach but docs state it is an "experimental" feature.
> 
> Please suggest
> 
> Thanks,
> Pradeep
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 




trouble accessing driver log files using rest-api

2016-07-11 Thread Andy Davidson
I am running spark-1.6.1 and the stand alone cluster manager. I am running
into performance problems with spark streaming and added some extra metrics
to my log files. I submit my app in cluster mode. (I.e. The driver runs on a
slave not master)


I am not able to get the driver log files while the app is running using the
documented rest api
 
http://spark.apache.org/docs/latest/monitoring.html#rest-api

I think the issue is the rest-api give you access to the app log files. I
need the driver log file?


$ curl  http://$host/api/v1/applications/

[ {

  "id" : "app-20160711185337-0049",

  "name" : "gnip1",

  "attempts" : [ {

"startTime" : "2016-07-11T18:53:35.318GMT",

"endTime" : "1969-12-31T23:59:59.999GMT",

"sparkUser" : "",

"completed" : false

  } ]

} ][ec2-user@ip-172-31-22-140 tmp]$



$ curl -o$outputFile http://$host/api/v1/applications/$appID/logs



$outputFile will always be an empty zip file



If I use executors/. I get info about the drivers and executors how ever no
way to Œget' the log files. The driver output does not have any executorLogs
and the workers executorLogs are version of the log files rendered in HTML
not the actual log file.




$ curl http://$host/api/v1/applications/$appID/executors [ { "id" :
"driver", "hostPort" : "172.31.23.203:33303", "rddBlocks" : 0, "memoryUsed"
: 0, "diskUsed" : 0, "activeTasks" : 0, "failedTasks" : 0, "completedTasks"
: 0, "totalTasks" : 0, "totalDuration" : 0, "totalInputBytes" : 0,
"totalShuffleRead" : 0, "totalShuffleWrite" : 0, "maxMemory" : 535953408,
"executorLogs" : { } }, { "id" : "1", "hostPort" :
"ip-172-31-23-200.us-west-1.compute.internal:51560", "rddBlocks" : 218,
"memoryUsed" : 452224280, "diskUsed" : 0, "activeTasks" : 1, "failedTasks" :
0, "completedTasks" : 27756, "totalTasks" : 27757, "totalDuration" :
1650935, "totalInputBytes" : 9619224986, "totalShuffleRead" : 0,
"totalShuffleWrite" : 507615, "maxMemory" : 535953408, "executorLogs" : {
"stdout" : 
"http://ec2-xxx.compute.amazonaws.com:8081/logPage/?appId=app-20160711185337
-0049=1=stdout", "stderr" :
"http://ec2-xxx.us-west-1.compute.amazonaws.com:8081/logPage/?appId=app-2016
0711185337-0049=1=stderr" }

Any suggestions would be greatly appreciated

Andy





WARN FileOutputCommitter: Failed to delete the temporary output directory of task: attempt_201607111453_128606_m_000000_0 - s3n://

2016-07-11 Thread Andy Davidson
I am running into serious performance problems with my spark 1.6 streaming
app. As it runs it gets slower and slower.

My app is simple. 

* It receives fairly large and complex JSON files. (twitter data)
* Converts the RDD to DataFrame
* Splits the data frame in to maybe 20 different data sets
* Writes each data set as JSON to s3
* Writing to S3 is really slow. I use an executorService to get the writes
to run in parallel

I found a lot of error log messages like the following error in my spark
streaming executor log files

Any suggestions?

Thanks

Andy

16/07/11 14:53:49 WARN FileOutputCommitter: Failed to delete the temporary
output directory of task: attempt_201607111453_128606_m_00_0 -
s3n://com.xxx/json/yyy/2016-07-11/146824482/_temporary/_attempt_20160711
1453_128606_m_00_0




spark UI what does storage memory x/y mean

2016-07-11 Thread Andy Davidson
My stream app is running into problems It seems to slow down over time. How
can I interpret the storage memory column. I wonder if I have a GC problem?
Any idea how I can get GC stats?

Thanks

Andy

Executors (3)
* Memory: 9.4 GB Used (1533.4 MB Total)
* Disk: 0.0 B Used
Executor IDAddressRDD BlocksStorage MemoryDisk UsedActive TasksFailed
TasksComplete TasksTotal TasksTask TimeInputShuffle ReadShuffle
WriteLogsThread Dump
0ip-172-31-23-202.us-west-1.compute.internal:5245628604.7 GB / 511.1 MB0.0
B04013495793499805.37 h72.9 GB84.0 B5.9 MBstdout

stderr 

Thread Dump 

1ip-172-31-23-200.us-west-1.compute.internal:5160928544.6 GB / 511.1 MB0.0
B04113493653497765.42 h72.6 GB142.0 B5.9 MBstdout

stderr 

Thread Dump 

driver172.31.23.203:4801800.0 B / 511.1 MB0.0 B0 ms0.0 B0.0 B0.0 BThread
Dump 





can I use ExectorService in my driver? was: is dataframe.write() async? Streaming performance problem

2016-07-08 Thread Andy Davidson
Hi Ewan

Currently I split my dataframe into n smaller dataframes can call
write.().json(³S3://³)

Each data frame becomes a single S3 object.

I assume for your solution to work I would need to reparation(1) each of the
smaller sets so that they are written as a single s3 object.


I am also considering using a java executorService and thread pool. Its easy
to do. Each thread would call df.write.json(³s3²://); One advantage of this
is that I do not need to make any assumptions about how spark is
implemented.

I assume the thread pool is running on the driver so the slaves do not incur
any extra overhead.

Thanks

Andy

From:  Ewan Leith <ewan.le...@realitymine.com>
Date:  Friday, July 8, 2016 at 8:52 AM
To:  Cody Koeninger <c...@koeninger.org>, Andrew Davidson
<a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  RE: is dataframe.write() async? Streaming performance problem

> Writing (or reading) small files from spark to s3 can be seriously slow.
> 
> You'll get much higher throughput by doing a df.foreachPartition(partition =>
> ...) and inside each partition, creating an aws s3 client then doing a
> partition.foreach and uploading the files using that s3 client with its own
> threadpool.
> 
> As long as you create the s3 client inside the foreachPartition, and close it
> after the partition.foreach(...) is done, you shouldn't have any issues.
> 
> Something roughly like this from the DStream docs:
> 
>   df.foreachPartition { partitionOfRecords =>
> val connection = createNewConnection()
> partitionOfRecords.foreach(record => connection.send(record))
> connection.close()
>   }
> 
> Hope this helps,
> Ewan
> 
> -----Original Message-
> From: Cody Koeninger [mailto:c...@koeninger.org]
> Sent: 08 July 2016 15:31
> To: Andy Davidson <a...@santacruzintegration.com>
> Cc: user @spark <user@spark.apache.org>
> Subject: Re: is dataframe.write() async? Streaming performance problem
> 
> Maybe obvious, but what happens when you change the s3 write to a println of
> all the data?  That should identify whether it's the issue.
> 
> count() and read.json() will involve additional tasks (run through the items
> in the rdd to count them, likewise to infer the schema) but for
> 300 records that shouldn't be much of an issue.
> 
> On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>>  I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using
>>  kafka direct stream approach. I am running into performance problems.
>>  My processing time is > than my window size. Changing window sizes,
>>  adding cores and executor memory does not change performance. I am
>>  having a lot of trouble identifying the problem by at the metrics
>>  provided for streaming apps in the spark application web UI.
>> 
>>  I think my performance problem has to with writing the data to S3.
>> 
>>  My app receives very complicated JSON. My program is simple, It sorts
>>  the data into a small set of sets and writes each set as a separate S3
>> object.
>>  The mini batch data has at most 300 events so I do not think shuffle
>>  is an issue.
>> 
>>  DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();
>> 
>>  Š Explode tagCol Š
>> 
>> 
>>  DataFrame rulesDF = activityDF.select(tagCol).distinct();
>> 
>>  Row[] rows = rulesDF.select(tagCol).collect();
>> 
>>  List tags = new ArrayList(100);
>> 
>>  for (Row row : rows) {
>> 
>>  Object tag = row.get(0);
>> 
>>  tags.add(tag.toString());
>> 
>>  }
>> 
>> 
>>  I think the for loop bellow is where the bottle neck is. Is write async() ?
>> 
>> 
>>  If not is there an easy to to vectorize/parallelize this for loop or
>>  do I have to create the threads my self?
>> 
>> 
>>  Is creating threads in spark a bad idea?
>> 
>> 
>> 
>>  for(String tag : tags) {
>> 
>>  DataFrame saveDF =
>>  activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>> 
>>  if (saveDF.count() >= 1) { // I do not think count() is an issue
>>  performance is about 34 ms
>> 
>>  String dirPath = ³s3n://myBucket" + File.separator + date +
>>  File.separator + tag + File.separator +  milliSeconds;
>> 
>>  saveDF.write().json(dirPath);
>> 
>>  }
>> 
>>  }
>> 
>> 
>>  Any suggestions would be greatly appreciated
>> 
>> 
>>  Andy
>> 
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 




Re: Multiple aggregations over streaming dataframes

2016-07-07 Thread Andy Davidson
Kafka has an interesting model that might be applicable.

You can think of kafka as enabling a queue system. Writes are called
producers, and readers are called consumers. The server is called a broker.
A ³topic² is like a named queue.

Producer are independent. They can write to a ³topic² at will. Consumers
(I.e. You nested aggregates) need to be independent of each other and the
broker. The broker receives data from produces stores it using memory and
disk. Consumer read from broker and maintain the cursor. Because the client
maintains the cursor one consumer can not impact other produces and
consumers.

I would think the tricky part for spark would to know when the data can be
deleted. In the Kakfa world each topic is allowed to define a TTL SLA. I.e.
The consumer must read the data with in a limited of window of time.

Andy

From:  Michael Armbrust 
Date:  Thursday, July 7, 2016 at 2:31 PM
To:  Arnaud Bailly 
Cc:  Sivakumaran S , "user @spark"

Subject:  Re: Multiple aggregations over streaming dataframes

> We are planning to address this issue in the future.
> 
> At a high level, we'll have to add a delta mode so that updates can be
> communicated from one operator to the next.
> 
> On Thu, Jul 7, 2016 at 8:59 AM, Arnaud Bailly  wrote:
>> Indeed. But nested aggregation does not work with Structured Streaming,
>> that's the point. I would like to know if there is workaround, or what's the
>> plan regarding this feature which seems to me quite useful. If the
>> implementation is not overtly complex and it is just a matter of manpower, I
>> am fine with devoting some time to it.
>> 
>> 
>> 
>> -- 
>> Arnaud Bailly
>> 
>> twitter: abailly
>> skype: arnaud-bailly
>> linkedin: http://fr.linkedin.com/in/arnaudbailly/
>> 
>> On Thu, Jul 7, 2016 at 2:17 PM, Sivakumaran S  wrote:
>>> Arnauld,
>>> 
>>> You could aggregate the first table and then merge it with the second table
>>> (assuming that they are similarly structured) and then carry out the second
>>> aggregation. Unless the data is very large, I don¹t see why you should
>>> persist it to disk. IMO, nested aggregation is more elegant and readable
>>> than a complex single stage.
>>> 
>>> Regards,
>>> 
>>> Sivakumaran
>>> 
>>> 
>>> 
 On 07-Jul-2016, at 1:06 PM, Arnaud Bailly  wrote:
 
 It's aggregation at multiple levels in a query: first do some aggregation
 on one tavle, then join with another table and do a second aggregation. I
 could probably rewrite the query in such a way that it does aggregation in
 one pass but that would obfuscate the purpose of the various stages.
 
 Le 7 juil. 2016 12:55, "Sivakumaran S"  a écrit :
> Hi Arnauld,
> 
> Sorry for the doubt, but what exactly is multiple aggregation? What is the
> use case?
> 
> Regards,
> 
> Sivakumaran
> 
> 
>> On 07-Jul-2016, at 11:18 AM, Arnaud Bailly 
>> wrote:
>> 
>> Hello,
>> 
>> I understand multiple aggregations over streaming dataframes is not
>> currently supported in Spark 2.0. Is there a workaround? Out of the top
>> of my head I could think of having a two stage approach:
>>  - first query writes output to disk/memory using "complete" mode
>>  - second query reads from this output
>> 
>> Does this makes sense?
>> 
>> Furthermore, I would like to understand what are the technical hurdles
>> that are preventing Spark SQL from implementing multiple aggregation
>> right now? 
>> 
>> Thanks,
>> -- 
>> Arnaud Bailly
>> 
>> twitter: abailly
>> skype: arnaud-bailly
>> linkedin: http://fr.linkedin.com/in/arnaudbailly/
> 
>>> 
>> 
> 




is dataframe.write() async? Streaming performance problem

2016-07-07 Thread Andy Davidson
I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using kafka
direct stream approach. I am running into performance problems. My
processing time is > than my window size. Changing window sizes, adding
cores and executor memory does not change performance. I am having a lot of
trouble identifying the problem by at the metrics provided for streaming
apps in the spark application web UI.

I think my performance problem has to with writing the data to S3.

My app receives very complicated JSON. My program is simple, It sorts the
data into a small set of sets and writes each set as a separate S3 object.
The mini batch data has at most 300 events so I do not think shuffle is an
issue.

DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();

Š Explode tagCol Š



DataFrame rulesDF = activityDF.select(tagCol).distinct();

Row[] rows = rulesDF.select(tagCol).collect();

List tags = new ArrayList(100);

for (Row row : rows) {

Object tag = row.get(0);

tags.add(tag.toString());

}



I think the for loop bellow is where the bottle neck is. Is write async() ?



If not is there an easy to to vectorize/parallelize this for loop or do I
have to create the threads my self?



Is creating threads in spark a bad idea?





for(String tag : tags) {

DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag));

if (saveDF.count() >= 1) { // I do not think count() is an issue performance
is about 34 ms

String dirPath = ³s3n://myBucket" + File.separator + date + File.separator +
tag + File.separator +  milliSeconds;

saveDF.write().json(dirPath);

}

}



Any suggestions would be greatly appreciated



Andy
> 
> 




Re: strange behavior when I chain data frame transformations

2016-05-13 Thread Andy Davidson
Hi Ted


Its seems really strange. Its seems like in the version were I used 2 data
frames spark added ³as(tag)². (Which is really nice. )

Odd that I got different behavior

Is this a bug?

Kind regards

Andy



From:  Ted Yu <yuzhih...@gmail.com>
Date:  Friday, May 13, 2016 at 12:38 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: strange behavior when I chain data frame transformations

> In the structure shown, tag is under element.
> 
> I wonder if that was a factor.
> 
> On Fri, May 13, 2016 at 11:49 AM, Andy Davidson
> <a...@santacruzintegration.com> wrote:
>> I am using spark-1.6.1.
>> 
>> I create a data frame from a very complicated JSON file. I would assume that
>> query planer would treat both version of my transformation chains the same
>> way.
>> 
>> 
>> // org.apache.spark.sql.AnalysisException: Cannot resolve column name "tag"
>> among (actor, body, generator, pip, id, inReplyTo, link, object, objectType,
>> postedTime, provider, retweetCount, twitter_entities, verb);
>> 
>> // DataFrame emptyDF = rawDF.selectExpr("*", ³pip.rules.tag")
>> 
>> // .filter(rawDF.col(tagCol).isNull());
>> 
>> DataFrame emptyDF1 = rawDF.selectExpr("*", ³pip.rules.tag");
>> 
>> DataFrame emptyDF =  emptyDF1.filter(emptyDF1.col(³tag").isNull());
>> 
>> 
>> 
>> Here is the schema for the gnip structure
>> 
>>  |-- pip: struct (nullable = true)
>> 
>>  ||-- _profile: struct (nullable = true)
>> 
>>  |||-- topics: array (nullable = true)
>> 
>>  ||||-- element: string (containsNull = true)
>> 
>>  ||-- rules: array (nullable = true)
>> 
>>  |||-- element: struct (containsNull = true)
>> 
>>  ||||-- tag: string (nullable = true)
>> 
>> 
>> 
>> Is this a bug ?
>> 
>> 
>> 
>> Andy
>> 
>> 
> 




strange behavior when I chain data frame transformations

2016-05-13 Thread Andy Davidson
I am using spark-1.6.1.

I create a data frame from a very complicated JSON file. I would assume that
query planer would treat both version of my transformation chains the same
way.


// org.apache.spark.sql.AnalysisException: Cannot resolve column name "tag"
among (actor, body, generator, pip, id, inReplyTo, link, object, objectType,
postedTime, provider, retweetCount, twitter_entities, verb);

// DataFrame emptyDF = rawDF.selectExpr("*", ³pip.rules.tag")

// .filter(rawDF.col(tagCol).isNull());

DataFrame emptyDF1 = rawDF.selectExpr("*", ³pip.rules.tag");

DataFrame emptyDF =  emptyDF1.filter(emptyDF1.col(³tag").isNull());



Here is the schema for the gnip structure

 |-- pip: struct (nullable = true)

 ||-- _profile: struct (nullable = true)

 |||-- topics: array (nullable = true)

 ||||-- element: string (containsNull = true)

 ||-- rules: array (nullable = true)

 |||-- element: struct (containsNull = true)

 ||||-- tag: string (nullable = true)



Is this a bug ?



Andy






How to transform a JSON string into a Java HashMap<> java.io.NotSerializableException

2016-05-11 Thread Andy Davidson
I have a streaming app that receives very complicated JSON (twitter status).
I would like to work with it as a hash map. It would be very difficult to
define a pojo for this JSON. (I can not use twitter4j)
// map json string to map

JavaRDD> jsonMapRDD = powerTrackRDD.map(new
Function>(){

private static final long serialVersionUID = 1L;



@Override

public Hashtable call(String line) throws Exception {

  //HashMap hm = JsonUtils.jsonToHashMap(line);

  //Hashtable ret = new Hashtable(hm);

  Hashtable  ret = null;

  return ret;

}});



Using the sqlContext works how ever I assume that this is going to be slow
and error prone given it likely many key/value pairs are missing



DataFrame df = sqlContext.read().json(getFilePath().toString());

df.printSchema();



Any suggestions would be greatly appriciated



Andy



org.apache.spark.SparkException: Task not serializable

at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal
a:304)

at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$
clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15
0)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:11
1)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.map(RDD.scala:323)

at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)

at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)

at 
com.pws.sparkStreaming.collector.SavePowerTrackActivityStream.test(SavePower
TrackActivityStream.java:34)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.
java:50)

at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.j
ava:12)

at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.ja
va:47)

at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.jav
a:17)

at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)

at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav
a:78)

at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav
a:57)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)

at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)

at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)

at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)

at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)

at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26
)

at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)

at org.junit.runners.ParentRunner.run(ParentRunner.java:363)

at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestRef
erence.java:86)

at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:3
8)

at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu
nner.java:459)

at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu
nner.java:675)

at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.
java:382)

at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner
.java:192)

Caused by: java.io.NotSerializableException:
com.pws.sparkStreaming.collector.SavePowerTrackActivityStream

Serialization stack:

- object not serializable (class:
com.pws.sparkStreaming.collector.SavePowerTrackActivityStream, value:
com.pws.sparkStreaming.collector.SavePowerTrackActivityStream@f438904)

- field (class: 
com.pws.sparkStreaming.collector.SavePowerTrackActivityStream$1, name:
this$0, type: class
com.pws.sparkStreaming.collector.SavePowerTrackActivityStream)

- object (class 
com.pws.sparkStreaming.collector.SavePowerTrackActivityStream$1,
com.pws.sparkStreaming.collector.SavePowerTrackActivityStream$1@3fa7df1)

- field (class: 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
fun$1, type: interface org.apache.spark.api.java.function.Function)

- object (class 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
)

at 

Re: Multiple Spark Applications that use Cassandra, how to share resources/nodes

2016-05-03 Thread Andy Davidson
Hi Tobias

I am very interested implemented rest based api on top of spark. My rest
based system would make predictions from data provided in the request using
models trained in batch. My SLA is 250 ms.

Would you mind sharing how you implemented your rest server?

I am using spark-1.6.1. I have several unit tests that create spark context,
master is set to Œlocal[4]¹. I do not think the unit test frame is going to
scale. Can each rest server have a pool of sparks contexts?


The system would like to replacing is set up as follows

Layer of dumb load balancers: l1, l2, l3
Layer of proxy servers:   p1, p2, p3, p4, p5, Š.. Pn
Layer of containers:  c1, c2, c3, Š.. Cn

Where Cn is much larger than Pn


Kind regards

Andy

P.s. There is a talk on 5/5 about spark 2.0 Hoping there is something in the
near future.
https://www.brighttalk.com/webcast/12891/202021?utm_campaign=google-calendar
_content=_source=brighttalk-portal_medium=calendar_term=

From:  Tobias Eriksson 
Date:  Tuesday, May 3, 2016 at 7:34 AM
To:  "user @spark" 
Subject:  Multiple Spark Applications that use Cassandra, how to share
resources/nodes

> Hi 
>  We are using Spark for a long running job, in fact it is a REST-server that
> does some joins with some tables in Casandra and returns the result.
> Now we need to have multiple applications running in the same Spark cluster,
> and from what I understand this is not possible, or should I say somewhat
> complicated
> 1. A Spark application takes all the resources / nodes in the cluster (we have
> 4 nodes one for each Cassandra Node)
> 2. A Spark application returns it¹s resources when it is done (exits or the
> context is closed/returned)
> 3. Sharing resources using Mesos only allows scaling down and then scaling up
> by a step-by-step policy, i.e. 2 nodes, 3 nodes, 4 nodes, Š And increases as
> the need increases
> But if this is true, I can not have several applications running in parallell,
> is that true ?
> If I use Mesos then the whole idea with one Spark Worker per Cassandra Node
> fails, as it talks directly to a node, and that is how it is so efficient.
> In this case I need all nodes, not 3 out of 4.
> 
> Any mistakes in my thinking ?
> Any ideas on how to solve this ? Should be a common problem I think
> 
> -Tobias
> 
> 




java.io.NotSerializableException: org.apache.spark.sql.types.LongType

2016-04-21 Thread Andy Davidson
I started using 
http://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#fp-gr
owth in python. It was really easy to get the frequent items set.
Unfortunately associations is not implemented in python.

Here is my python code It works great

rawJsonRDD = jsonToPythonDictionaries(sc, inputURL,
coalesceInputToNumPartions)

idsRDD = (rawJsonRDD
 # fetch the list of ids, the items are of type int
 .map(lambda ids : r[Œids'])
 # make sure ids are unique
 .map(lambda ids : list(set(ids)))
)



My Java Code generates java.io.NotSerializableException:
org.apache.spark.sql.types.LongType . It has something to do with the UDF I
wrote to make sure the ids are unique

Any idea what my bug is? I guess instead of data frames I could try to
implement this using RDD¹s I expect I¹ll run into a similar problem

Thanks in advance

Andy

 df.printSchema();


root

 |-- ids: array (nullable = true)

 ||-- element: long (containsNull = true)

 |-- updated: long (nullable = true)

 |-- userId: long (nullable = true)



6/04/21 16:26:50 Info FrequentItems: expr: UniqIdsUDF(Ids) as uniqueIds



UniqIdsUDF.register(sqlContext);



 DataFrame df2 = df.selectExpr(inputColName, expr);



/**
 * this is based on some test code I wrote that
 * that takes in a list of strings and returns a list of strings
 */
public class UniqIdsUDF implements UDF1,
Serializable {

private static final long serialVersionUID = 1L;

public static final String udfName = "UniqIdsUDF";



public static void register(SQLContext ssc) {

// TODO probably need to be careful about registering multiple times

UniqIdsUDF udf = new UniqIdsUDF();

DataType elementType = new LongType();

DataType returnType = DataTypes.createArrayType(elementType);

ssc.udf().register(udfName, udf, returnType);

}



@Override

public Long[] call(WrappedArray idsArg) throws Exception {

List ids = JavaConversions.asJavaList(idsArg);

HashSet hs = new HashSet(ids);

Iterator it = hs.iterator();

int size = hs.size();

Long[] ret = new Long[size];

for (int i = 0; i < size; i++) {

ret[i] = it.next();

}

}



Exception in thread "main" org.apache.spark.SparkException: Task not
serializable

at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal
a:304)

at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$
clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)

at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15
0)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:11
1)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)

at 
org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.s
cala:56)

at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.
scala:132)

at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.
scala:130)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15
0)

at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)

at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)

at 
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165
)

at 
org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scal
a:174)

at 
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$exec
ute$1$1.apply(DataFrame.scala:1499)

at 
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$exec
ute$1$1.apply(DataFrame.scala:1499)

at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution
.scala:56)

at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)

at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(Dat
aFrame.scala:1498)

at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataF
rame.scala:1505)

at 
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)

at 
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)

at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)

at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)

at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)

at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)

at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)

at 

custom transformer pipeline sample code

2016-04-20 Thread Andy Davidson

Someone recently asked me for a code example of how to to write a custom
pipeline transformer in Java

Enjoy, Share

Andy



https://github.com/AEDWIP/Spark-Naive-Bayes-text-classification/blob/260a6b9
b67d7da42c1d0f767417627da342c8a49/src/test/java/com/santacruzintegration/spa
rk/SparseVectorToLogicalTranformer.java#L52

https://github.com/AEDWIP/Spark-Naive-Bayes-text-classification/blob/260a6b9
b67d7da42c1d0f767417627da342c8a49/src/test/java/com/santacruzintegration/spa
rk/NaiveBayesStanfordExampleTest.java#L207




Re: Spark replacing Hadoop

2016-04-14 Thread Andy Davidson
Hi Ashok

In general if I was starting a new project and had not invested heavily in
hadoop (i.e. Had a large staff that was trained on hadoop, had a lot of
existing projects implemented on hadoop, Š) I would probably start using
spark. Its faster and easier to use

Your mileage may vary

Andy

From:  Ashok Kumar 
Reply-To:  Ashok Kumar 
Date:  Thursday, April 14, 2016 at 12:13 PM
To:  "user @spark" 
Subject:  Spark replacing Hadoop

> Hi,
> 
> I hear that some saying that Hadoop is getting old and out of date and will be
> replaced by Spark!
> 
> Does this make sense and if so how accurate is it?
> 
> Best




Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-06 Thread Andy Davidson
+1

From:  Matei Zaharia 
Date:  Tuesday, April 5, 2016 at 4:58 PM
To:  Xiangrui Meng 
Cc:  Shivaram Venkataraman , Sean Owen
, Xiangrui Meng , dev
, "user @spark" , DB Tsai

Subject:  Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

> This sounds good to me as well. The one thing we should pay attention to is
> how we update the docs so that people know to start with the spark.ml classes.
> Right now the docs list spark.mllib first and also seem more comprehensive in
> that area than in spark.ml, so maybe people naturally move towards that.
> 
> Matei
> 
>> On Apr 5, 2016, at 4:44 PM, Xiangrui Meng  wrote:
>> 
>> Yes, DB (cc'ed) is working on porting the local linear algebra library over
>> (SPARK-13944). There are also frequent pattern mining algorithms we need to
>> port over in order to reach feature parity. -Xiangrui
>> 
>> On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman
>>  wrote:
>>> Overall this sounds good to me. One question I have is that in
>>> addition to the ML algorithms we have a number of linear algebra
>>> (various distributed matrices) and statistical methods in the
>>> spark.mllib package. Is the plan to port or move these to the spark.ml
>>> 
>>> namespace in the 2.x series ?
>>> 
>>> Thanks
>>> Shivaram
>>> 
>>> On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  wrote:
 > FWIW, all of that sounds like a good plan to me. Developing one API is
 > certainly better than two.
 >
 > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng  wrote:
> >> Hi all,
> >>
> >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API
> built
> >> on top of Spark SQL¹s DataFrames. Since then the new DataFrame-based
> API has
> >> been developed under the spark.ml   package, while
> the old RDD-based API has
> >> been developed in parallel under the spark.mllib package. While it was
> >> easier to implement and experiment with new APIs under a new package,
it
> >> became harder and harder to maintain as both packages grew bigger and
> >> bigger. And new users are often confused by having two sets of APIs
> with
> >> overlapped functions.
> >>
> >> We started to recommend the DataFrame-based API over the RDD-based API
in
> >> Spark 1.5 for its versatility and flexibility, and we saw the
> development
> >> and the usage gradually shifting to the DataFrame-based API. Just
> counting
> >> the lines of Scala code, from 1.5 to the current master we added ~1
> >> lines to the DataFrame-based API while ~700 to the RDD-based API. So,
to
> >> gather more resources on the development of the DataFrame-based API and
to
> >> help users migrate over sooner, I want to propose switching RDD-based
> MLlib
> >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
> >>
> >> * We do not accept new features in the RDD-based spark.mllib package,
> unless
> >> they block implementing new features in the DataFrame-based spark.ml
> 
> >> package.
> >> * We still accept bug fixes in the RDD-based API.
> >> * We will add more features to the DataFrame-based API in the 2.x
> series to
> >> reach feature parity with the RDD-based API.
> >> * Once we reach feature parity (possibly in Spark 2.2), we will
> deprecate
> >> the RDD-based API.
> >> * We will remove the RDD-based API from the main Spark repo in Spark
> 3.0.
> >>
> >> Though the RDD-based API is already in de facto maintenance mode, this
> >> announcement will make it clear and hence important to both MLlib
> developers
> >> and users. So we¹d greatly appreciate your feedback!
> >>
> >> (As a side note, people sometimes use ³Spark ML² to refer to the
> >> DataFrame-based API or even the entire MLlib component. This also
> causes
> >> confusion. To be clear, ³Spark ML² is not an official name and there
> are no
> >> plans to rename MLlib to ³Spark ML² at this time.)
> >>
> >> Best,
> >> Xiangrui
 >
 > -
 > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 > For additional commands, e-mail: user-h...@spark.apache.org
 >
> 




Re: Saving Spark streaming RDD with saveAsTextFiles ends up creating empty files on HDFS

2016-04-05 Thread Andy Davidson
Based on my experience


if you use ³simple² streaming. (I.E. You do not use windows) after every
mini batch you will ³save² This will cause a dir in hdfs with the timestamp
as part of the path. With in the dir, a separate part file will be created
for each partition. If you used windowing you could probably write several
mini batches at one time. (I have not used windows so I am speculating what
the behavior will be)

If you are running batch processing you could easily merge many small files
if needed.



From:  Mich Talebzadeh <mich.talebza...@gmail.com>
Date:  Tuesday, April 5, 2016 at 4:17 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: Saving Spark streaming RDD with saveAsTextFiles ends up
creating empty files on HDFS

> I agree every time an OS file is created, it requires a context switch plus a
> file descriptor. It is probably more time consuming to open and close these
> files than actually doing the work.
> 
> I always wondered about performance implication of Spark streaming and
> although there are some early days results. I have yet to see any concrete P
> on this. 
> 
> My issue is that I want to use Spark streaming with Complex Event Processing
> by developing adaptors (a combination of filters and mappers) to distinguish
> signal from pedestal in real terms and only Save data to persistent storage
> (HDFS) if they are of value.
> 
> I am using Kafka upstream and that does a good job. Now I am trying to
> experiment with saving data to HDFS in one form or shape. Basically this is
> just immutable data so the lesser partition the better. I am happy to store
> the data in text, parquet or (ORC format in Hive) as long as it works.
> 
> Regards
> 
> 
> Dr Mich Talebzadeh
> 
>  
> 
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8
> Pw 
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV
> 8Pw> 
> 
>  
> 
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
>  
> 
> On 5 April 2016 at 23:59, Andy Davidson <a...@santacruzintegration.com> wrote:
>> In my experience my streaming I was getting tens of thousands of empty files
>> created in HDFS. This was crushing my systems performance when my batch jobs
>> ran over the data sets. There is a lot of over head open and closing empty
>> files.
>> 
>> I think creating empty files or keeping empty partitions around is probably a
>> bug how ever I never filed a bug report. Please file a bug report. Please
>> copy me on the Jira
>> 
>> There is also a related performance issue. I use reparation() to ensure CSV
>> files have a max number of rows. (it an product requirement to make csv files
>> more user friendly). In my experience if I do not reparation a partitions
>> with a single row of data would cause a separate part-* file to be created. I
>> wound out with large number of very small files. I have always wonder how to
>> configure partitions to get better performance. I would think we are better
>> off with a few very large partitions in most cases. I.E. Keep more stuff in
>> memory with less overhead. I was really hoping Spark would automatically
>> handle this for me
>> 
>> Andy
>> 
>> From:  Mich Talebzadeh <mich.talebza...@gmail.com>
>> Date:  Tuesday, April 5, 2016 at 3:49 PM
>> To:  Andrew Davidson <a...@santacruzintegration.com>
>> Cc:  "user @spark" <user@spark.apache.org>
>> Subject:  Re: Saving Spark streaming RDD with saveAsTextFiles ends up
>> creating empty files on HDFS
>> 
>>> Thanks Andy.
>>> 
>>> Do we know if this is a known bug or simply a feature that on the face of it
>>> Spark cannot save RDD output to a text file?
>>> 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>> 
>>>  
>>> 
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
>>> V8Pw 
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABU
>>> rV8Pw> 
>>> 
>>>  
>>> 
>>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>> 
>>>  
>>> 
>>> On 5 April 2016 at 23:35, Andy Davidson <a...@santacruzintegration.com>
>>> wrote:
>>>> Hi Mich
>>>> 
>>>> Yup I was surprised to find empty files. Its easy to work around. Note I
>>>> should probably use coalesce() and not repartition()
>>>>

Re: Saving Spark streaming RDD with saveAsTextFiles ends up creating empty files on HDFS

2016-04-05 Thread Andy Davidson
In my experience my streaming I was getting tens of thousands of empty files
created in HDFS. This was crushing my systems performance when my batch jobs
ran over the data sets. There is a lot of over head open and closing empty
files.

I think creating empty files or keeping empty partitions around is probably
a bug how ever I never filed a bug report. Please file a bug report. Please
copy me on the Jira

There is also a related performance issue. I use reparation() to ensure CSV
files have a max number of rows. (it an product requirement to make csv
files more user friendly). In my experience if I do not reparation a
partitions with a single row of data would cause a separate part-* file to
be created. I wound out with large number of very small files. I have always
wonder how to configure partitions to get better performance. I would think
we are better off with a few very large partitions in most cases. I.E. Keep
more stuff in memory with less overhead. I was really hoping Spark would
automatically handle this for me

Andy

From:  Mich Talebzadeh <mich.talebza...@gmail.com>
Date:  Tuesday, April 5, 2016 at 3:49 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: Saving Spark streaming RDD with saveAsTextFiles ends up
creating empty files on HDFS

> Thanks Andy.
> 
> Do we know if this is a known bug or simply a feature that on the face of it
> Spark cannot save RDD output to a text file?
> 
> 
> 
> Dr Mich Talebzadeh
> 
>  
> 
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8
> Pw 
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV
> 8Pw> 
> 
>  
> 
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
>  
> 
> On 5 April 2016 at 23:35, Andy Davidson <a...@santacruzintegration.com> wrote:
>> Hi Mich
>> 
>> Yup I was surprised to find empty files. Its easy to work around. Note I
>> should probably use coalesce() and not repartition()
>> 
>> In general I found I almost always need to reparation. I was getting
>> thousands of empty partitions. It was really slowing my system down.
>> 
>>private static void save(JavaDStream json, String outputURIBase) {
>> 
>> /*
>> 
>> using saveAsTestFiles will cause lots of empty directories to be
>> created.
>> 
>> DStream data = json.dstream();
>> 
>> data.saveAsTextFiles(outputURI, null);
>> 
>> */  
>> 
>> 
>> 
>> jsonTweets.foreachRDD(new VoidFunction2<JavaRDD, Time>() {
>> 
>> private static final long serialVersionUID = 1L;
>> 
>> @Override
>> 
>> public void call(JavaRDD rdd, Time time) throws Exception
>> {
>> 
>> Long count = rdd.count();
>> 
>> //if(!rdd.isEmpty()) {
>> 
>> if(count > 0) {
>> 
>> rdd = repartition(rdd, count.intValue());
>> 
>> long milliSeconds = time.milliseconds();
>> 
>> String date =
>> Utils.convertMillisecondsToDateStr(milliSeconds);
>> 
>> String dirPath = outputURIBase
>> 
>> + File.separator +  date
>> 
>> + File.separator + "tweet-" +
>> time.milliseconds();
>> 
>> rdd.saveAsTextFile(dirPath);
>> 
>> }
>> 
>> }
>> 
>> 
>> 
>> final int maxNumRowsPerFile = 200;
>> 
>> JavaRDD repartition(JavaRDD rdd, int count) {
>> 
>> long numPartisions = count / maxNumRowsPerFile + 1;
>> 
>> Long tmp = numPartisions;
>> 
>> rdd = rdd.repartition(tmp.intValue());
>> 
>> return rdd;
>> 
>> }
>> 
>> });
>> 
>>  
>> 
>> }
>> 
>> 
>> 
>> 
>> From:  Mich Talebzadeh <mich.talebza...@gmail.com>
>> Date:  Tuesday, April 5, 2016 at 3:06 PM
>> To:  "user @spark" <user@spark.apache.org>
>> Subject:  Saving Spark streaming RDD with saveAsTextFiles ends up creating
>> empty files on HDFS
>> 
>>> Spark 1.6.1
>>> 
>>> The following creates empty files. It prints lines OK with println
>>> 
>>> val result = lines

Re: Saving Spark streaming RDD with saveAsTextFiles ends up creating empty files on HDFS

2016-04-05 Thread Andy Davidson
Hi Mich

Yup I was surprised to find empty files. Its easy to work around. Note I
should probably use coalesce() and not repartition()

In general I found I almost always need to reparation. I was getting
thousands of empty partitions. It was really slowing my system down.

   private static void save(JavaDStream json, String outputURIBase)
{

/*

using saveAsTestFiles will cause lots of empty directories to be
created.

DStream data = json.dstream();

data.saveAsTextFiles(outputURI, null);

*/



jsonTweets.foreachRDD(new VoidFunction2() {

private static final long serialVersionUID = 1L;

@Override

public void call(JavaRDD rdd, Time time) throws
Exception {

Long count = rdd.count();

//if(!rdd.isEmpty()) {

if(count > 0) {

rdd = repartition(rdd, count.intValue());

long milliSeconds = time.milliseconds();

String date =
Utils.convertMillisecondsToDateStr(milliSeconds);

String dirPath = outputURIBase

+ File.separator +  date

+ File.separator + "tweet-" +
time.milliseconds();

rdd.saveAsTextFile(dirPath);

}  

}



final int maxNumRowsPerFile = 200;

JavaRDD repartition(JavaRDD rdd, int count) {

long numPartisions = count / maxNumRowsPerFile + 1;

Long tmp = numPartisions;

rdd = rdd.repartition(tmp.intValue());

return rdd;

}

});

 

}




From:  Mich Talebzadeh 
Date:  Tuesday, April 5, 2016 at 3:06 PM
To:  "user @spark" 
Subject:  Saving Spark streaming RDD with saveAsTextFiles ends up creating
empty files on HDFS

> Spark 1.6.1
> 
> The following creates empty files. It prints lines OK with println
> 
> val result = lines.filter(_.contains("ASE 15")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
> result.saveAsTextFiles("/tmp/rdd_stuff")
> 
> I am getting zero length files
> 
> drwxr-xr-x   - hduser supergroup  0 2016-04-05 23:19
> /tmp/rdd_stuff-1459894755000
> drwxr-xr-x   - hduser supergroup  0 2016-04-05 23:20
> /tmp/rdd_stuff-145989481
> 
> Any ideas?
> 
> Thanks,
> 
> Dr Mich Talebzadeh
> 
>  
> 
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8
> Pw 
>  8Pw> 
> 
>  
> 
> http://talebzadehmich.wordpress.com 
> 
>  




Re: Can spark somehow help with this usecase?

2016-04-05 Thread Andy Davidson
Hi Marco

You might consider setting up some sort of ELT pipe line. One of your stages
might be to create a file of all the FTP URL.  You could then write a spark
app that just fetches the urls and stores the data in some sort of data base
or on the file system (hdfs?)

My guess would be to maybe use the map() transform to make the FTP call. If
you are using Java or scala take a look at the apache commons FTP Client

I assume that each ftp get is independent. Maybe some one know more about
how to control the amount of concurrency. I think it will be based on the
number of partitions, works, and cores?

Andy

From:  Marco Mistroni 
Date:  Tuesday, April 5, 2016 at 9:13 AM
To:  "user @spark" 
Subject:  Can spark somehow help with this usecase?

> 
> Hi 
>  I m currently using spark to process a file containing a million of
> rows(edgar quarterly filings files)
> Each row contains some infos plus a location of a remote file which I need to
> retrieve using FTP and then process it's content.
> I want to do all 3 operations ( process filing file, fetch remote files and
> process them in ) in one go.
> I want to avoid doing the first step (processing the million row file) in
> spark and the rest (_fetching FTP and process files) offline.
> Does spark has anything that can help with the FTP fetch?
> 
> Thanks in advance and rgds
>  Marco




Re: pyspark unable to convert dataframe column to a vector: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

2016-04-04 Thread Andy Davidson
)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most
recent call last):
  File 
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/p
yspark/worker.py", line 111, in main
process()
  File 
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/p
yspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File 
"/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/
pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File 
"/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pysp
ark/sql/functions.py", line 1563, in 
func = lambda _, it: map(lambda x: returnType.toInternal(f(*x)), it)
  File "", line 28, in toSparseVector
  File 
"/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/
pyspark.zip/pyspark/mllib/linalg/__init__.py", line 827, in sparse
return SparseVector(size, *args)
  File 
"/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/
pyspark.zip/pyspark/mllib/linalg/__init__.py", line 531, in __init__
raise TypeError("indices array must be sorted")
TypeError: indices array must be sorted

at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at 
org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.ap
ply(python.scala:405)
at 
org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.ap
ply(python.scala:370)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RD
D.scala:710)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RD
D.scala:710)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)
... 1 more


From:  Jeff Zhang <zjf...@gmail.com>
Date:  Tuesday, March 29, 2016 at 10:34 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: pyspark unable to convert dataframe column to a vector: Unable
to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

> According the stack trace, it seems the HiveContext is not initialized
> correctly. Do you have any more error message ?
> 
> On Tue, Mar 29, 2016 at 9:29 AM, Andy Davidson <a...@santacruzintegratio

data frame problem preserving sort order with repartition() and coalesce()

2016-03-29 Thread Andy Davidson
I have a requirement to write my results out into a series of CSV files. No
file may have more than 100 rows of data. In the past my data was not
sorted, and I was able to use reparation() or coalesce() to ensure the file
length requirement.

I realize that reparation() cause the data to be shuffled. It appears that
changes the data ordering. So I sort the repartioned data again.

What is really strange is I no longer get the number of output files I am
expecting, and the number of lines constraint is not violated

I am using spark-1.6.1

Andy

$ for i in topTags_CSV/*.csv; do wc -l $i; done

  19 topTags_CSV/part-0.csv

  19 topTags_CSV/part-1.csv

  20 topTags_CSV/part-2.csv

  19 topTags_CSV/part-3.csv

  22 topTags_CSV/part-4.csv

  19 topTags_CSV/part-5.csv

  26 topTags_CSV/part-6.csv

  18 topTags_CSV/part-7.csv

  12 topTags_CSV/part-8.csv

  25 topTags_CSV/part-9.csv

  32 topTags_CSV/part-00010.csv

  53 topTags_CSV/part-00011.csv

  89 topTags_CSV/part-00012.csv

 146 topTags_CSV/part-00013.csv

 387 topTags_CSV/part-00014.csv

2708 topTags_CSV/part-00015.csv

   1 topTags_CSV/part-00016.csv

$ 


numRowsPerCSVFile = 100
numRows = resultDF.count()
quotient, remander = divmod(numRows, numRowsPerCSVFile)
numPartitions = (quotient + 1) if remander > 0 else quotient
​
debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})"
.format(numRows, quotient, remander, numPartitions))
print(debugStr)
​
csvDF = resultDF.coalesce(numPartitions)
​
orderByColName = "count"
csvDF = csvDF.sort(orderByColName, ascending=False)
headerArg = 'true'# if headers else 'false'
csvDF.write.save(outputDir, 'com.databricks.spark.csv', header=headerArg)
renamePartFiles(outputDir)
numRows:3598 quotient:35 remander:98 repartition(36)







Vectors.sparse exception: TypeError: indices array must be sorted

2016-03-29 Thread Andy Davidson
I am using pyspark 1.6.1 and python3

Any idea what my bug is? Clearly the indices are being sorted?

Could it be the numDimensions = 713912692155621377 and my indices are longs
not ints?

import numpy as np
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import  VectorUDT

#sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# = 3 = size
# [0,1] int indices
#[1.0, 3.0] values



def toSparseVector(pojoList) :
indices = []
for pojo in pojoList :
indices.append(pojo.id)

sortedIndices = sorted(indices)
logical = np.ones(len(sortedIndices))
vec = Vectors.sparse(numDimensions, sortedIndices,  logical)
return vec


newColName = "features"
myUDF = udf(toSparseVector, VectorUDT())
featuresDF = df.withColumn(newColName, myUDF(df["follows"]))

featuresDF.printSchema()
featuresDF.show()

root
 |-- id: string (nullable = true)
 |-- follows: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- id: long (nullable = false)
 |||-- screenName: string (nullable = false)
 |-- features: vector (nullable = true)

---
Py4JJavaError Traceback (most recent call last)
 in ()
  1 featuresDF.printSchema()
> 2 featuresDF.show()

/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspa
rk/sql/dataframe.py in show(self, n, truncate)
255 +---+-+
256 """
--> 257 print(self._jdf.showString(n, truncate))
258 
259 def __repr__(self):

/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/p
y4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814 
815 for temp_arg in temp_args:

/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspa
rk/sql/utils.py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/p
y4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client,
target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling o104.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0
(TID 212, localhost): org.apache.spark.api.python.PythonException: Traceback
(most recent call last):
  File 
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/p
yspark/worker.py", line 111, in main
process()
  File 
"/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/
pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File 
"/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/
pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File 
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/funct
ions.py", line 1563, in 
func = lambda _, it: map(lambda x: returnType.toInternal(f(*x)), it)
  File "", line 28, in toSparseVector
  File 
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/p
yspark/mllib/linalg/__init__.py", line 827, in sparse
return SparseVector(size, *args)
  File 
"/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/p
yspark/mllib/linalg/__init__.py", line 531, in __init__
raise TypeError("indices array must be sorted")
TypeError: indices array must be sorted




Re: looking for an easy to to find the max value of a column in a data frame

2016-03-29 Thread Andy Davidson
Nice

From:  Alexander Krasnukhin <the.malk...@gmail.com>
Date:  Tuesday, March 29, 2016 at 10:42 AM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: looking for an easy to to find the max value of a column in a
data frame

> You can even use the fact that pyspark has dynamic properties
> 
> rows = idDF2.select(max("col[id]").alias("max")).collect()
> firstRow = rows[0]
> max = firstRow.max
> 
> On Tue, Mar 29, 2016 at 7:14 PM, Alexander Krasnukhin <the.malk...@gmail.com>
> wrote:
>> You should be able to index columns directly either by index or column name
>> i.e.
>> 
>> from pyspark.sql.functions import max
>> 
>> rows = idDF2.select(max("col[id]")).collect()
>> firstRow = rows[0]
>> 
>> # by index
>> max = firstRow[0]
>> 
>> # by column name
>> max = firstRow["max(col[id])"]
>> 
>> On Tue, Mar 29, 2016 at 6:58 PM, Andy Davidson
>> <a...@santacruzintegration.com> wrote:
>>> Hi Alexander
>>> 
>>> Many thanks. I think the key was I needed to import that max function. Turns
>>> out you do not need to use col
>>> Df.select(max(³foo²)).show()
>>> 
>>> To get the actual value of max you still need to write more code than I
>>> would expect. I wonder if there is a easier way to work with Rows?
>>> 
>>> In [19]:
>>> from pyspark.sql.functions import max
>>> maxRow = idDF2.select(max("col[id]")).collect()
>>> max = maxRow[0].asDict()['max(col[id])']
>>> max
>>> Out[19]:
>>> 713912692155621376
>>> 
>>> From:  Alexander Krasnukhin <the.malk...@gmail.com>
>>> Date:  Monday, March 28, 2016 at 5:55 PM
>>> To:  Andrew Davidson <a...@santacruzintegration.com>
>>> Cc:  "user @spark" <user@spark.apache.org>
>>> Subject:  Re: looking for an easy to to find the max value of a column in a
>>> data frame
>>> 
>>>> e.g. select max value for column "foo":
>>>> 
>>>> from pyspark.sql.functions import max, col
>>>> df.select(max(col("foo"))).show()
>>>> 
>>>> On Tue, Mar 29, 2016 at 2:15 AM, Andy Davidson
>>>> <a...@santacruzintegration.com> wrote:
>>>>> I am using pyspark 1.6.1 and python3.
>>>>> 
>>>>> 
>>>>> Given:
>>>>> 
>>>>> idDF2 = idDF.select(idDF.id, idDF.col.id <http://idDF.col.id>  )
>>>>> idDF2.printSchema()
>>>>> idDF2.show()
>>>>> root
>>>>>  |-- id: string (nullable = true)
>>>>>  |-- col[id]: long (nullable = true)
>>>>> 
>>>>> +--+--+
>>>>> |id|   col[id]|
>>>>> +--+--+
>>>>> |1008930924| 534494917|
>>>>> |1008930924| 442237496|
>>>>> |1008930924|  98069752|
>>>>> |1008930924|2790311425|
>>>>> |1008930924|3300869821|
>>>>> 
>>>>> 
>>>>> I have to do a lot of work to get the max value
>>>>> 
>>>>> rows = idDF2.select("col[id]").describe().collect()
>>>>> hack = [s for s in rows if s.summary == 'max']
>>>>> print(hack)
>>>>> print(hack[0].summary)
>>>>> print(type(hack[0]))
>>>>> print(hack[0].asDict()['col[id]'])
>>>>> maxStr = hack[0].asDict()['col[id]']
>>>>> ttt = int(maxStr)
>>>>> numDimensions = 1 + ttt
>>>>> print(numDimensions)
>>>>> 
>>>>> Is there an easier way?
>>>>> 
>>>>> Kind regards
>>>>> 
>>>>> Andy
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> Regards,
>>>> Alexander
>> 
>> 
>> 
>> -- 
>> Regards,
>> Alexander
> 
> 
> 
> -- 
> Regards,
> Alexander




Re: looking for an easy to to find the max value of a column in a data frame

2016-03-29 Thread Andy Davidson
Hi Alexander

Many thanks. I think the key was I needed to import that max function. Turns
out you do not need to use col
Df.select(max(³foo²)).show()

To get the actual value of max you still need to write more code than I
would expect. I wonder if there is a easier way to work with Rows?

In [19]:
from pyspark.sql.functions import max
maxRow = idDF2.select(max("col[id]")).collect()
max = maxRow[0].asDict()['max(col[id])']
max
Out[19]:
713912692155621376

From:  Alexander Krasnukhin <the.malk...@gmail.com>
Date:  Monday, March 28, 2016 at 5:55 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: looking for an easy to to find the max value of a column in a
data frame

> e.g. select max value for column "foo":
> 
> from pyspark.sql.functions import max, col
> df.select(max(col("foo"))).show()
> 
> On Tue, Mar 29, 2016 at 2:15 AM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> I am using pyspark 1.6.1 and python3.
>> 
>> 
>> Given:
>> 
>> idDF2 = idDF.select(idDF.id, idDF.col.id <http://idDF.col.id>  )
>> idDF2.printSchema()
>> idDF2.show()
>> root
>>  |-- id: string (nullable = true)
>>  |-- col[id]: long (nullable = true)
>> 
>> +--+--+
>> |id|   col[id]|
>> +--+--+
>> |1008930924| 534494917|
>> |1008930924| 442237496|
>> |1008930924|  98069752|
>> |1008930924|2790311425|
>> |1008930924|3300869821|
>> 
>> 
>> I have to do a lot of work to get the max value
>> 
>> rows = idDF2.select("col[id]").describe().collect()
>> hack = [s for s in rows if s.summary == 'max']
>> print(hack)
>> print(hack[0].summary)
>> print(type(hack[0]))
>> print(hack[0].asDict()['col[id]'])
>> maxStr = hack[0].asDict()['col[id]']
>> ttt = int(maxStr)
>> numDimensions = 1 + ttt
>> print(numDimensions)
>> 
>> Is there an easier way?
>> 
>> Kind regards
>> 
>> Andy
> 
> 
> 
> -- 
> Regards,
> Alexander




Re: Sending events to Kafka from spark job

2016-03-29 Thread Andy Davidson
Hi Fanoos

I would be careful about using collect(). You need to make sure you local
computer has enough memory to hold your entire data set.

Eventually I will need to do something similar. I have to written the code
yet. My plan is to load the data into a data frame and then write a UDF that
actually publishes the Kafka

If you are using RDD¹s you could use map() or some other transform to cause
the data to be published

Andy

From:  fanooos 
Date:  Tuesday, March 29, 2016 at 4:26 AM
To:  "user @spark" 
Subject:  Re: Sending events to Kafka from spark job

> I think I find a solution but I have no idea how this affects the execution
> of the application.
> 
> At the end of the script I added  a sleep statement.
> 
> import time
> time.sleep(1)
> 
> 
> This solved the problem.
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Sending-events-to-Kafka-fr
> om-spark-job-tp26622p26624.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




pyspark unable to convert dataframe column to a vector: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

2016-03-28 Thread Andy Davidson
I am using pyspark spark-1.6.1-bin-hadoop2.6 and python3. I have a data
frame with a column I need to convert to a sparse vector. I get an exception

Any idea what my bug is?

Kind regards

Andy


Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException: java.lang.RuntimeException: Unable to
instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at 
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:20
4)

Here is my python code fragment with a more complete stack trace

# load data set
from pyspark.sql import HiveContext #,SQLContext, Row

# window functions require HiveContext (spark 2.x will not require hive)
#sqlContext = SQLContext(sc)
hiveSqlContext = HiveContext(sc)

Š

import numpy as np
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import  VectorUDT

#sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# = 3 = size
# [0,1] int indices
#[1.0, 3.0] values


"""
root
 |-- id: string (nullable = true)
 |-- samples: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- id: long (nullable = false)
 |||-- rateStr: string (nullable = false)

"""

def toSparseVector(pojoList) :
indicies = []
for pojo in pojoList :
indicies.append(pojo.id)

l = np.ones(len(indicies))
v = Vectors.spark(numDimensions, indicies,  l)
return v

myUDF = udf(toSparseVector, VectorUDT()))
features = df.withColumn(newColName, myUDF(df[³samples"]))


Py4JJavaError Traceback (most recent call last)
 in ()
 30 #myUDF = udf(lambda pojoList: labelStr if (labelStr == "noise") else
"injury", StringType())
 31 
---> 32 myUDF = udf(toSparseVector, VectorUDT()) #
 33 features = df.withColumn(newColName, myUDF(df["follows"]))

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/functi
ons.py in udf(f, returnType)
   1595 [Row(slen=5), Row(slen=3)]
   1596 """
-> 1597 return UserDefinedFunction(f, returnType)
   1598 
   1599 blacklist = ['map', 'since', 'ignore_unicode_prefix']

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/functi
ons.py in __init__(self, func, returnType, name)
   1556 self.returnType = returnType
   1557 self._broadcast = None
-> 1558 self._judf = self._create_judf(name)
   1559 
   1560 def _create_judf(self, name):

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/functi
ons.py in _create_judf(self, name)
   1567 pickled_command, broadcast_vars, env, includes =
_prepare_for_python_RDD(sc, command, self)
   1568 ctx = SQLContext.getOrCreate(sc)
-> 1569 jdt = ctx._ssql_ctx.parseDataType(self.returnType.json())
   1570 if name is None:
   1571 name = f.__name__ if hasattr(f, '__name__') else
f.__class__.__name__

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/contex
t.py in _ssql_ctx(self)
681 try:
682 if not hasattr(self, '_scala_HiveContext'):
--> 683 self._scala_HiveContext = self._get_hive_ctx()
684 return self._scala_HiveContext
685 except Py4JError as e:

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/contex
t.py in _get_hive_ctx(self)
690 
691 def _get_hive_ctx(self):
--> 692 return self._jvm.HiveContext(self._jsc.sc())
693 
694 def refreshTable(self, tableName):

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.z
ip/py4j/java_gateway.py in __call__(self, *args)
   1062 answer = self._gateway_client.send_command(command)
   1063 return_value = get_return_value(
-> 1064 answer, self._gateway_client, None, self._fqn)
   1065 
   1066 for temp_arg in temp_args:

/Users/f/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.
py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/Users/andrewdavidson/workSpace/spark/spark-1.6.1-bin-hadoop2.6/python/lib/p
y4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client,
target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException: java.lang.RuntimeException: Unable to
instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
at 

looking for an easy to to find the max value of a column in a data frame

2016-03-28 Thread Andy Davidson
I am using pyspark 1.6.1 and python3.


Given:

idDF2 = idDF.select(idDF.id, idDF.col.id )
idDF2.printSchema()
idDF2.show()
root
 |-- id: string (nullable = true)
 |-- col[id]: long (nullable = true)

+--+--+
|id|   col[id]|
+--+--+
|1008930924| 534494917|
|1008930924| 442237496|
|1008930924|  98069752|
|1008930924|2790311425|
|1008930924|3300869821|


I have to do a lot of work to get the max value

rows = idDF2.select("col[id]").describe().collect()
hack = [s for s in rows if s.summary == 'max']
print(hack)
print(hack[0].summary)
print(type(hack[0]))
print(hack[0].asDict()['col[id]'])
maxStr = hack[0].asDict()['col[id]']
ttt = int(maxStr)
numDimensions = 1 + ttt
print(numDimensions)

Is there an easier way?

Kind regards

Andy




Re: --packages configuration equivalent item name?

2016-03-28 Thread Andy Davidson
Hi Russell

I use Jupyter python notebooks a lot. Here is how I start the server

set -x # turn debugging on

#set +x # turn debugging off



# https://github.com/databricks/spark-csv

# http://spark-packages.org/package/datastax/spark-cassandra-connector

#https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_py
thon.md

# 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_pyt
hon.md#pyspark-with-data-frames



# packages are ',' seperate with no white space

extraPkgs="--packages
com.databricks:spark-csv_2.11:1.3.0,datastax:spark-cassandra-connector:1.6.0
-M1-s_2.10"



export PYSPARK_PYTHON=python3

export PYSPARK_DRIVER_PYTHON=python3

IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark $extraPkgs --conf
spark.cassandra.connection.host=ec2-54-153-102-232.us-west-1.compute.amazona
ws.com $*



From:  Russell Jurney 
Date:  Sunday, March 27, 2016 at 7:22 PM
To:  "user @spark" 
Subject:  --packages configuration equivalent item name?

> I run PySpark with CSV support like so: IPYTHON=1 pyspark --packages
> com.databricks:spark-csv_2.10:1.4.0
> 
> I don't want to type this --packages argument each time. Is there a config
> item for --packages? I can't find one in the reference at
> http://spark.apache.org/docs/latest/configuration.html
> 
> If there is no way to do this, please let me know so I can make a JIRA for
> this feature.
> 
> Thanks!
> -- 
> Russell Jurney twitter.com/rjurney 
> russell.jur...@gmail.com relato.io 




Re: pyspark sql convert long to timestamp?

2016-03-22 Thread Andy Davidson
Thanks

createdAt is a long

from_unixtime(createdAt / 1000, '-MM-dd HH:mm:ss z') as fromUnix,

Worked


From:  Akhil Das <ak...@sigmoidanalytics.com>
Date:  Monday, March 21, 2016 at 11:56 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: pyspark sql convert long to timestamp?

> Have a look at the from_unixtime() functions.
> https://spark.apache.org/docs/1.5.0/api/python/_modules/pyspark/sql/functions.
> html#from_unixtime
> 
> Thanks
> Best Regards
> 
> On Tue, Mar 22, 2016 at 4:49 AM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> Any idea how I have a col in a data frame that is of type long any idea how I
>> create a column who¹s type is time stamp?
>> 
>> The long is unix epoch in ms
>> 
>> Thanks
>> 
>> Andy
> 




pyspark sql convert long to timestamp?

2016-03-21 Thread Andy Davidson
Any idea how I have a col in a data frame that is of type long any idea how
I create a column who¹s type is time stamp?

The long is unix epoch in ms

Thanks

Andy




bug spark should not use java.sql.timestamp was: sql timestamp timezone bug

2016-03-20 Thread Andy Davidson
Here is a nice analysis of the issue from the Cassandra mail list. (Datastax
is the Databricks for Cassandra)

Should I fill a bug?

Kind regards

Andy

http://stackoverflow.com/questions/2305973/java-util-date-vs-java-sql-date
and this one 

On Fri, Mar 18, 2016 at 11:35 AM Russell Spitzer 
wrote:
> Unfortunately part of Spark SQL. They have based their type on
> java.sql.timestamp (and date) which adjust to the client timezone when
> displaying and storing.
> See discussions
> http://stackoverflow.com/questions/9202857/timezones-in-sql-date-vs-java-sql-d
> ate
> And Code
> https://github.com/apache/spark/blob/bb3b3627ac3fcd18be7fb07b6d0ba5eae0342fc3/
> sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.s
> cala#L81-L93
> 

From:  Andrew Davidson 
Date:  Thursday, March 17, 2016 at 3:25 PM
To:  Andrew Davidson , "user @spark"

Subject:  Re: sql timestamp timezone bug

> 
> For completeness. Clearly spark sql returned a different data set
> 
> In [4]:
> rawDF.selectExpr("count(row_key) as num_samples",
> "sum(count) as total",
> "max(count) as max ").show()
> +---++-+
> |num_samples|total|max|
> +---++-+
> |   2037| 3867| 67|
> +---++-+
> 
> 
> From:  Andrew Davidson 
> Date:  Thursday, March 17, 2016 at 3:02 PM
> To:  "user @spark" 
> Subject:  sql timestamp timezone bug
> 
>> I am using pyspark 1.6.0 and
>> datastax:spark-cassandra-connector:1.6.0-M1-s_2.10 to analyze time series
>> data
>> 
>> The data is originally captured by a spark streaming app and written to
>> Cassandra. The value of the timestamp comes from
>> 
>> Rdd.foreachRDD(new VoidFunction2()
>> �});
>> 
>> I am confident the time stamp is stored correctly in cassandra and that
>> the clocks on the machines in my cluster are set correctly
>> 
>> I noticed that if I used Cassandra CQLSH to select a data set between two
>> points in time the row count did not match the row count I got when I did
>> the same select in spark using SQL, It appears the spark sql assumes all
>> timestamp strings are in the local time zone.
>> 
>> 
>> Here is what I expect. (this is what is returned by CQLSH)
>> cqlsh> select
>>... count(row_key) as num_samples, sum(count) as total, max(count)
>> as max
>>... from
>>... notification.json_timeseries
>>... where
>>... row_key in (똱ed', 똟lue')
>>... and created > '2016-03-12 00:30:00+'
>>... and created <= '2016-03-12 04:30:00+'
>>... allow filtering;
>> 
>>  num_samples | total| max
>> -+--+---
>> 3242 |11277 |  17
>> 
>> 
>> Here is  my pyspark select statement. Notice the 똠reated column encodes
>> the timezone¹. I am running this on my local mac (in PST timezone) and
>> connecting to my data center (which runs on UTC) over a VPN.
>> 
>> rawDF = sqlContext.read\
>> .format("org.apache.spark.sql.cassandra")\
>> .options(table="json_timeseries", keyspace="notification")\
>> .load() 
>> 
>> 
>> rawDF.registerTempTable(tmpTableName)
>> 
>> 
>> 
>> stmnt = "select \
>> row_key, created, count, unix_timestamp(created) as unixTimeStamp, \
>> unix_timestamp(created, '-MM-dd HH:mm:ss.z') as hack, \
>> to_utc_timestamp(created, 'gmt') as gmt \
>> from \
>> rawTable \
>> where \
>> (created > '{0}') and (created <= '{1}') \
>> and \
>> (row_key = 똱ed' or row_key = 똟lue¹) \
>> )".format('2016-03-12 00:30:00+', '2016-03-12 04:30:00+')
>> 
>> rawDF = sqlCtx.sql(stmnt).cache()
>> 
>> 
>> 
>> 
>> I get a different values for row count, max, �
>> 
>> If I convert the UTC time stamp string to my local timezone the row count
>> matches the count returned by  cqlsh
>> 
>> # pst works, matches cassandra cqlsh
>> # .format('2016-03-11 16:30:00+', '2016-03-11 20:30:00+')
>> 
>> Am I doing something wrong in my pyspark code?
>> 
>> 
>> Kind regards
>> 
>> Andy
>> 
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>> 




Re: sql timestamp timezone bug

2016-03-19 Thread Andy Davidson
Hi Davies


> 
> What's the type of `created`? TimestampType?


The Œcreated¹ column in cassandra is a timestamp
https://docs.datastax.com/en/cql/3.0/cql/cql_reference/timestamp_type_r.html

In the spark data frame it is a a timestamp

> 
> If yes, when created is compared to a string, it will be casted into
> string, then compared as string, it become
> 
> cast(created, as string) > '2016-03-12 00:30:00+'
> 
> Could you try this
> 
> sqlCtx.sql("select created, cast(created as string) from rawTable").show()


I am note sure I under stand your suggestion. In my where clause the date
range is specified using string literals. I need the value of created to be
a time stamps

# http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html
stmnt = "select \
row_key, created,  cast(created as string), count,
unix_timestamp(created) as unixTimeStamp, \
unix_timestamp(created, '-MM-dd HH:mm:ss.zz') as aedwip, \
to_utc_timestamp(created, 'gmt') as gmt \
 from \
rawTable \
 where \
 (created > '{0}') and (created <= '{1}') \
 and \
 (row_key = Œred' \
or row_key = Œblue' )".format('2016-03-12
00:30:00+', '2016-03-12 04:30:00+')

rawDF = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="json_timeseries", keyspace="notification")\
.load() 
rawDF.registerTempTable(tmpTableName)
rawDF = sqlCtx.sql(stmnt).cache()


The time stamps are still not UTC they are in PST

root
 |-- row_key: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- created: string (nullable = true)
 |-- count: long (nullable = true)
 |-- unixTimeStamp: long (nullable = true)
 |-- aedwip: long (nullable = true)
 |-- gmt: timestamp (nullable = true)

+-+-+---+-+-
+--+-+
|row_key  |created  |created
|count|unixTimeStamp|aedwip|gmt  |
+-+-+---+-+-
+--+-+
|blue |2016-03-12 00:30:30.0|2016-03-12 00:30:30|2|1457771430
|1457771430|2016-03-12 00:30:30.0|
|blue |2016-03-12 00:30:45.0|2016-03-12 00:30:45|1|1457771445
|1457771445|2016-03-12 00:30:45.0|
|blue |2016-03-12 00:31:00.0|2016-03-12 00:31:00|1|1457771460
|1457771460|2016-03-12 00:31:00.0|
|

Kind regards

Andy




Re: sql timestamp timezone bug

2016-03-19 Thread Andy Davidson

For completeness. Clearly spark sql returned a different data set

In [4]:
rawDF.selectExpr("count(row_key) as num_samples",
"sum(count) as total",
"max(count) as max ").show()
+---++-+
|num_samples|total|max|
+---++-+
|   2037| 3867| 67|
+---++-+


From:  Andrew Davidson 
Date:  Thursday, March 17, 2016 at 3:02 PM
To:  "user @spark" 
Subject:  sql timestamp timezone bug

> I am using pyspark 1.6.0 and
> datastax:spark-cassandra-connector:1.6.0-M1-s_2.10 to analyze time series
> data
> 
> The data is originally captured by a spark streaming app and written to
> Cassandra. The value of the timestamp comes from
> 
> Rdd.foreachRDD(new VoidFunction2()
> �});
> 
> I am confident the time stamp is stored correctly in cassandra and that
> the clocks on the machines in my cluster are set correctly
> 
> I noticed that if I used Cassandra CQLSH to select a data set between two
> points in time the row count did not match the row count I got when I did
> the same select in spark using SQL, It appears the spark sql assumes all
> timestamp strings are in the local time zone.
> 
> 
> Here is what I expect. (this is what is returned by CQLSH)
> cqlsh> select
>... count(row_key) as num_samples, sum(count) as total, max(count)
> as max
>... from
>... notification.json_timeseries
>... where
>... row_key in (똱ed', 똟lue')
>... and created > '2016-03-12 00:30:00+'
>... and created <= '2016-03-12 04:30:00+'
>... allow filtering;
> 
>  num_samples | total| max
> -+--+---
> 3242 |11277 |  17
> 
> 
> Here is  my pyspark select statement. Notice the 똠reated column encodes
> the timezone¹. I am running this on my local mac (in PST timezone) and
> connecting to my data center (which runs on UTC) over a VPN.
> 
> rawDF = sqlContext.read\
> .format("org.apache.spark.sql.cassandra")\
> .options(table="json_timeseries", keyspace="notification")\
> .load() 
> 
> 
> rawDF.registerTempTable(tmpTableName)
> 
> 
> 
> stmnt = "select \
> row_key, created, count, unix_timestamp(created) as unixTimeStamp, \
> unix_timestamp(created, '-MM-dd HH:mm:ss.z') as hack, \
> to_utc_timestamp(created, 'gmt') as gmt \
> from \
> rawTable \
> where \
> (created > '{0}') and (created <= '{1}') \
> and \
> (row_key = 똱ed' or row_key = 똟lue¹) \
> )".format('2016-03-12 00:30:00+', '2016-03-12 04:30:00+')
> 
> rawDF = sqlCtx.sql(stmnt).cache()
> 
> 
> 
> 
> I get a different values for row count, max, �
> 
> If I convert the UTC time stamp string to my local timezone the row count
> matches the count returned by  cqlsh
> 
> # pst works, matches cassandra cqlsh
> # .format('2016-03-11 16:30:00+', '2016-03-11 20:30:00+')
> 
> Am I doing something wrong in my pyspark code?
> 
> 
> Kind regards
> 
> Andy
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




sql timestamp timezone bug

2016-03-19 Thread Andy Davidson
I am using pyspark 1.6.0 and
datastax:spark-cassandra-connector:1.6.0-M1-s_2.10 to analyze time series
data

The data is originally captured by a spark streaming app and written to
Cassandra. The value of the timestamp comes from

Rdd.foreachRDD(new VoidFunction2()
Š});

I am confident the time stamp is stored correctly in cassandra and that
the clocks on the machines in my cluster are set correctly

I noticed that if I used Cassandra CQLSH to select a data set between two
points in time the row count did not match the row count I got when I did
the same select in spark using SQL, It appears the spark sql assumes all
timestamp strings are in the local time zone.


Here is what I expect. (this is what is returned by CQLSH)
cqlsh> select
   ... count(row_key) as num_samples, sum(count) as total, max(count)
as max
   ... from
   ... notification.json_timeseries
   ... where
   ... row_key in (Œred', Œblue')
   ... and created > '2016-03-12 00:30:00+'
   ... and created <= '2016-03-12 04:30:00+'
   ... allow filtering;

 num_samples | total| max
-+--+---
3242 |11277 |  17


Here is  my pyspark select statement. Notice the Œcreated column encodes
the timezone¹. I am running this on my local mac (in PST timezone) and
connecting to my data center (which runs on UTC) over a VPN.

rawDF = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="json_timeseries", keyspace="notification")\
.load() 


rawDF.registerTempTable(tmpTableName)



stmnt = "select \
row_key, created, count, unix_timestamp(created) as unixTimeStamp, \
unix_timestamp(created, '-MM-dd HH:mm:ss.z') as hack, \
to_utc_timestamp(created, 'gmt') as gmt \
from \
rawTable \
where \
(created > '{0}') and (created <= '{1}') \
and \
(row_key = Œred' or row_key = Œblue¹) \
)".format('2016-03-12 00:30:00+', '2016-03-12 04:30:00+')

rawDF = sqlCtx.sql(stmnt).cache()




I get a different values for row count, max, Š

If I convert the UTC time stamp string to my local timezone the row count
matches the count returned by  cqlsh

# pst works, matches cassandra cqlsh
# .format('2016-03-11 16:30:00+', '2016-03-11 20:30:00+')

Am I doing something wrong in my pyspark code?


Kind regards

Andy



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



best practices: running multi user jupyter notebook server

2016-03-19 Thread Andy Davidson
We are considering deploying a notebook server for use by two kinds of users

1. interactive dashboard.
> 1. I.e. Forms allow users to select data sets and visualizations
> 2. Review real time graphs of data captured by our spark streams
2. General notebooks for Data Scientists

My concern is interactive spark jobs can can consume a lot of cluster
resource and many users may be sloppy/lazy. I.E. Just kill their browsers
instead of shutting down their notebooks cleanly

What are best practices?


Kind regards

Andy




unix_timestamp() time zone problem

2016-03-19 Thread Andy Davidson
I am using python spark 1.6 and the --packages
datastax:spark-cassandra-connector:1.6.0-M1-s_2.10

I need to convert a time stamp string into a unix epoch time stamp. The
function unix_timestamp() function assume current time zone. How ever my
string data is UTC and encodes the time zone as zero. I have not been able
to find a way to get the unix time calculated correctly. simpleDateFormat
does not have good time zone support. Any suggestions?

I could write a UDF and to adjust for time zones how ever this seems like  a
hack

I tried using to_utc_timestamp(created, 'gmt¹) how ever this creates a
timestamp. I have not been able to figure out how to convert this to a unix
time sample I.e a long representing epoch

Any suggestions?

stmnt = "select \
row_key, created, count, unix_timestamp(created) as
unixTimeStamp, \
unix_timestamp(created, '-MM-dd HH:mm:ss.z') as etc \
 from \
rawTable \
 where \
 (created > '{0}') and (created <= '{1}') \
 and \
 (row_key = Œblue' \
or row_key = Œred' \
)".format('2016-03-12 00:30:00+', '2016-03-12
04:30:00+¹)


Sample out put

root
 |-- row_key: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- count: long (nullable = true)
 |-- unixTimeStamp: long (nullable = true)
 |-- etc: long (nullable = true)

2016-03-12 00:30:30.0 should be 1457742630 not 1457771430

+-+-+-+-+--+
|row_key  |created|count|unixTimeStamp|utc|
+-+-+-+-+--+
|red|2016-03-12 00:30:30.0|2|1457771430   |1457771430|
|red|2016-03-12 00:30:45.0|1|1457771445   |1457771445|


static Column 
 unix_timestamp 
 (Column
  s)Converts time string in format -MM-dd HH:mm:ss to Unix timestamp
(in seconds), using the default timezone and the default locale, return null
if fail.
static Column 
 unix_timestamp 
 (Column
  s, java.lang.String p)Convert time string with given pattern (see
[http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html])
to Unix time stamp (in seconds), return null if fail.




what is the pyspark inverse of registerTempTable()?

2016-03-15 Thread Andy Davidson
Thanks

Andy




Re: newbie HDFS S3 best practices

2016-03-15 Thread Andy Davidson
Hi Frank

We have thousands of small files . Each file is between 6K to maybe 100k.

Conductor looks interesting

Andy

From:  Frank Austin Nothaft <fnoth...@berkeley.edu>
Date:  Tuesday, March 15, 2016 at 11:59 AM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: newbie HDFS S3 best practices

> Hard to say with #1 without knowing your application¹s characteristics; for
> #2, we use conductor <https://github.com/BD2KGenomics/conductor>  with IAM
> roles, .boto/.aws/credentials files.
> 
> Frank Austin Nothaft
> fnoth...@berkeley.edu
> fnoth...@eecs.berkeley.edu
> 202-340-0466
> 
>> On Mar 15, 2016, at 11:45 AM, Andy Davidson <a...@santacruzintegration.com>
>> wrote:
>> 
>> We use the spark-ec2 script to create AWS clusters as needed (we do not use
>> AWS EMR)
>> 1. will we get better performance if we copy data to HDFS before we run
>> instead of reading directly from S3?
>>  2. What is a good way to move results from HDFS to S3?
>> 
>> 
>> It seems like there are many ways to bulk copy to s3. Many of them require we
>> explicitly use the AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@
>> <mailto:AWS_SECRET_ACCESS_KEY@/yasemindeneme/deneme.txt> . This seems like a
>> bad idea? 
>> 
>> What would you recommend?
>> 
>> Thanks
>> 
>> Andy
>> 
>> 
> 




newbie HDFS S3 best practices

2016-03-15 Thread Andy Davidson
We use the spark-ec2 script to create AWS clusters as needed (we do not use
AWS EMR)
1. will we get better performance if we copy data to HDFS before we run
instead of reading directly from S3?
 2. What is a good way to move results from HDFS to S3?


It seems like there are many ways to bulk copy to s3. Many of them require
we explicitly use the AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@
 . This seems like a
bad idea? 

What would you recommend?

Thanks

Andy






Re: trouble with NUMPY constructor in UDF

2016-03-10 Thread Andy Davidson


In [7]:
pdf = gdf1.toPandas()
pdf['date'] = epoch2num(pdf['ms'] )
print(pdf.dtypes)
pdf
count   int64
row_keyobject
createddatetime64[ns]
ms  int64
date  float64
dtype: object
Out[7]:
countrow_keycreatedmsdate
02realDonaldTrump2016-03-09 11:44:151457552655736032.822396
11realDonaldTrump2016-03-09 11:44:301457552670736032.822569
21realDonaldTrump2016-03-09 11:44:451457552685736032.822743
33realDonaldTrump2016-03-09 11:45:001457552700736032.822917
41HillaryClinton2016-03-09 11:44:151457552655736032.822396
52HillaryClinton2016-03-09 11:44:301457552670736032.822569
61HillaryClinton2016-03-09 11:44:451457552685736032.822743

From:  Andrew Davidson <a...@santacruzintegration.com>
Date:  Thursday, March 10, 2016 at 2:52 PM
To:  Ted Yu <yuzhih...@gmail.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: trouble with NUMPY constructor in UDF

> Hi Ted
> 
> In python the data type is Œfloat64¹. I have tried using both sql FloatType
> and DoubleType how ever I get the same error
> 
> Strange
> 
> andy
> 
> From:  Ted Yu <yuzhih...@gmail.com>
> Date:  Wednesday, March 9, 2016 at 3:28 PM
> To:  Andrew Davidson <a...@santacruzintegration.com>
> Cc:  "user @spark" <user@spark.apache.org>
> Subject:  Re: trouble with NUMPY constructor in UDF
> 
>> bq. epoch2numUDF = udf(foo, FloatType())
>> 
>> Is it possible that return value from foo is not FloatType ?
>> 
>> On Wed, Mar 9, 2016 at 3:09 PM, Andy Davidson <a...@santacruzintegration.com>
>> wrote:
>>> I need to convert time stamps into a format I can use with matplotlib
>>> plot_date(). epoch2num() works fine if I use it in my driver how ever I get
>>> a numpy constructor error if use it in a UDF
>>> 
>>> Any idea what the problem is?
>>> 
>>> Thanks
>>> 
>>> Andy
>>> 
>>> P.s I am using python3 and spark-1.6
>>> 
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql.types import FloatType, DoubleType, DecimalType
>>> 
>>> 
>>> import pandas as pd
>>> import numpy as np
>>> 
>>> from matplotlib.dates import epoch2num
>>> 
>>> gdf1 = cdf1.selectExpr("count", "row_key", "created",
>>> "unix_timestamp(created) as ms")
>>> gdf1.printSchema()
>>> gdf1.show(10, truncate=False)
>>> root
>>>  |-- count: long (nullable = true)
>>>  |-- row_key: string (nullable = true)
>>>  |-- created: timestamp (nullable = true)
>>>  |-- ms: long (nullable = true)
>>> 
>>> +-+---+-+--+
>>> |count|row_key|created  |ms|
>>> +-+---+-+--+
>>> |1|HillaryClinton |2016-03-09 11:44:15.0|1457552655|
>>> |2|HillaryClinton |2016-03-09 11:44:30.0|1457552670|
>>> |1|HillaryClinton |2016-03-09 11:44:45.0|1457552685|
>>> |2|realDonaldTrump|2016-03-09 11:44:15.0|1457552655|
>>> |1|realDonaldTrump|2016-03-09 11:44:30.0|1457552670|
>>> |1|realDonaldTrump|2016-03-09 11:44:45.0|1457552685|
>>> |3|realDonaldTrump|2016-03-09 11:45:00.0|1457552700|
>>> +-+---+-+--+
>>> 
>>> 
>>> def foo(e):
>>> return epoch2num(e)
>>> 
>>> epoch2numUDF = udf(foo, FloatType())
>>> #epoch2numUDF = udf(lambda e: epoch2num(e), FloatType())
>>> #epoch2numUDF = udf(lambda e: e + 500.5, FloatType())
>>> 
>>> gdf2 = gdf1.withColumn("date", epoch2numUDF(gdf1.ms <http://gdf1.ms> ))
>>> gdf2.printSchema()
>>> gdf2.show(truncate=False)
>>> 
>>> 
>>> Py4JJavaError: An error occurred while calling o925.showString.
>>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>>> in stage 32.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>>> 32.0 (TID 91, localhost): net.razorvine.pickle.PickleException: expected
>>> zero arguments for construction of ClassDict (for numpy.dtype)
>>> at 
>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstru
>>> ctor.java:23)
>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>>> 
>>> Works fine if I use PANDAS
>>> 
>>> pdf = gdf1.toPandas()
>>> pdf['date'] = epoch2num(pdf['ms'] )
>>> 
>>> 
>> 




Re: trouble with NUMPY constructor in UDF

2016-03-10 Thread Andy Davidson
Hi Ted

In python the data type is Œfloat64¹. I have tried using both sql FloatType
and DoubleType how ever I get the same error

Strange

andy

From:  Ted Yu <yuzhih...@gmail.com>
Date:  Wednesday, March 9, 2016 at 3:28 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: trouble with NUMPY constructor in UDF

> bq. epoch2numUDF = udf(foo, FloatType())
> 
> Is it possible that return value from foo is not FloatType ?
> 
> On Wed, Mar 9, 2016 at 3:09 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> I need to convert time stamps into a format I can use with matplotlib
>> plot_date(). epoch2num() works fine if I use it in my driver how ever I get a
>> numpy constructor error if use it in a UDF
>> 
>> Any idea what the problem is?
>> 
>> Thanks
>> 
>> Andy
>> 
>> P.s I am using python3 and spark-1.6
>> 
>> from pyspark.sql.functions import udf
>> from pyspark.sql.types import FloatType, DoubleType, DecimalType
>> 
>> 
>> import pandas as pd
>> import numpy as np
>> 
>> from matplotlib.dates import epoch2num
>> 
>> gdf1 = cdf1.selectExpr("count", "row_key", "created",
>> "unix_timestamp(created) as ms")
>> gdf1.printSchema()
>> gdf1.show(10, truncate=False)
>> root
>>  |-- count: long (nullable = true)
>>  |-- row_key: string (nullable = true)
>>  |-- created: timestamp (nullable = true)
>>  |-- ms: long (nullable = true)
>> 
>> +-+---+-+--+
>> |count|row_key|created  |ms|
>> +-+---+-+--+
>> |1|HillaryClinton |2016-03-09 11:44:15.0|1457552655|
>> |2|HillaryClinton |2016-03-09 11:44:30.0|1457552670|
>> |1|HillaryClinton |2016-03-09 11:44:45.0|1457552685|
>> |2|realDonaldTrump|2016-03-09 11:44:15.0|1457552655|
>> |1|realDonaldTrump|2016-03-09 11:44:30.0|1457552670|
>> |1|realDonaldTrump|2016-03-09 11:44:45.0|1457552685|
>> |3|realDonaldTrump|2016-03-09 11:45:00.0|1457552700|
>> +-+---+-+--+
>> 
>> 
>> def foo(e):
>> return epoch2num(e)
>> 
>> epoch2numUDF = udf(foo, FloatType())
>> #epoch2numUDF = udf(lambda e: epoch2num(e), FloatType())
>> #epoch2numUDF = udf(lambda e: e + 500.5, FloatType())
>> 
>> gdf2 = gdf1.withColumn("date", epoch2numUDF(gdf1.ms <http://gdf1.ms> ))
>> gdf2.printSchema()
>> gdf2.show(truncate=False)
>> 
>> 
>> Py4JJavaError: An error occurred while calling o925.showString.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 32.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>> 32.0 (TID 91, localhost): net.razorvine.pickle.PickleException: expected zero
>> arguments for construction of ClassDict (for numpy.dtype)
>>  at 
>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstruc
>> tor.java:23)
>>  at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>>  at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>>  at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>>  at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>> 
>> Works fine if I use PANDAS
>> 
>> pdf = gdf1.toPandas()
>> pdf['date'] = epoch2num(pdf['ms'] )
>> 
>> 
> 




Re: Spark Streaming, very slow processing and increasing scheduling delay of kafka input stream

2016-03-10 Thread Andy Davidson
In my experience I would try the following


I use the standalone cluster manager. Each app gets it own performance web
page . The streaming tab is really helpful. If processing time is greater
than then your mini batch length you are going to run into performance
problems

Use the ³stages² tab to figure out where the bottleneck is

In our experience we where working with large # of empty partitions.
Reparation solved our s3 output problem

Your milage may vary

andy

From:  Todd Nist 
Date:  Thursday, March 10, 2016 at 6:03 AM
To:  Vinti Maheshwari 
Cc:  "user @spark" 
Subject:  Re: Spark Streaming, very slow processing and increasing
scheduling delay of kafka input stream

> Hi Vinti,
> 
> All of your tasks are failing based on the screen shots provided.
> 
> I think a few more details would be helpful.  Is this YARN or a Standalone
> cluster?  How much overall memory is on your cluster?  On each machine where
> workers and executors are running?  Are you using the Direct
> (KafkaUtils.createDirectStream) or Receiver (KafkaUtils.createStream)?
> 
> You may find this discussion of value on SO:
> http://stackoverflow.com/questions/28901123/org-apache-spark-shuffle-metadataf
> etchfailedexception-missing-an-output-locatio
> 
> -Todd
> 
> On Mon, Mar 7, 2016 at 5:52 PM, Vinti Maheshwari  wrote:
>> Hi,
>> 
>> My spark-streaming program seems very slow. I am using Ambari for cluster
>> setup and i am using Kafka for data input.
>> I tried to use batch size 2 secs and check pointing duration 10 secs. But as
>> i was seeing scheduling delay was keep increasing so i tried increasing the
>> batch size to 5 and then 10 secs. But it seems noting changed in respect of
>> performance.
>> 
>> My program is doing two tasks:
>> 
>> 1) Data aggregation
>> 
>> 2) Data insertion into Hbase
>> 
>> Action which took maximum time, when i called foreachRDD on Dstream object
>> (state). 
>> state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>> 
>> 
>> Program sample input coming from kafka:
>> test_id, file1, 1,1,1,1,1
>> 
>> Code snippets:
>> val parsedStream = inputStream
>>   .map(line => {
>> val splitLines = line.split(",")
>> (splitLines(1), splitLines.slice(2,
>> splitLines.length).map((_.trim.toLong)))
>>   })  
>> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>> (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>>   prev.map(_ +: current).orElse(Some(current))
>> .flatMap(as => Try(as.map(BDV(_)).reduce(_ +
>> _).toArray).toOption)
>> })
>> state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>> 
>> 
>> 
>> object Blaher {
>>   def blah(tup: (String, Array[Long])) {
>> val hConf = HBaseConfiguration.create()
>> --
>> val hTable = new HTable(hConf, tableName)
>> val thePut = new Put(Bytes.toBytes("file_data"))
>> thePut.add(Bytes.toBytes("file_counts"), Bytes.toBytes(tup._1),
>> Bytes.toBytes(tup._2.toList.toString))
>> new ImmutableBytesWritable(Bytes.toBytes("file_data"))
>> 
>> hTable.put(thePut)
>>   }
>> }
>> 
>> 
>> My Cluster Specifications:
>> 16 executors ( 1 core each and 2g memory)
>> I have attached some screenshots of running execution.
>> 
>> Anyone has idea what changes should i do to speedup the processing?
>> 
>> Thanks & Regards,
>> Vinti
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 




Re: pyspark spark-cassandra-connector java.io.IOException: Failed to open native connection to Cassandra at {192.168.1.126}:9042

2016-03-09 Thread Andy Davidson
Hi Ted and Saurahb

If I use —conf arguments with pyspark I am able to connect. Any idea how I
can set these values programmatically? (I work on a notebook server and can
not easily reconfigure the server

This works

extraPkgs="--packages com.databricks:spark-csv_2.11:1.3.0 \
--packages datastax:spark-cassandra-connector:1.6.0-M1-s_2.10"

export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=python3
IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark $extraPkgs --conf
spark.cassandra.connection.host=localhost --conf
spark.cassandra.connection.port=9043 $*


df = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="json_timeseries", keyspace="notification")\
.load()
df.printSchema()
df.show(truncate=False)


I have tried using setContext.setConf() but it does not work. It does  not
seem to have any effect

#sqlContext.setConf("spark.cassandra.connection.host","localhost")
#sqlContext.setConf("spark.cassandra.connection.port","9043")

#sqlContext.setConf("connection.host","localhost")
#sqlContext.setConf("connection.port","9043")

sqlContext.setConf("host","localhost")
sqlContext.setConf("port","9043”)

Thanks

Andy

From:  Saurabh Bajaj <bajaj.onl...@gmail.com>
Date:  Tuesday, March 8, 2016 at 9:13 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  Ted Yu <yuzhih...@gmail.com>, "user @spark" <user@spark.apache.org>
Subject:  Re: pyspark spark-cassandra-connector java.io.IOException: Failed
to open native connection to Cassandra at {192.168.1.126}:9042

> Hi Andy, 
> 
> I believe you need to set the host and port settings separately
> spark.cassandra.connection.host
> spark.cassandra.connection.port
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/referenc
> e.md#cassandra-connection-parameters
> 
> Looking at the logs, it seems your port config is not being set and it's
> falling back to default.
> Let me know if that helps.
> 
> Saurabh Bajaj
> 
> On Tue, Mar 8, 2016 at 6:25 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> Hi Ted
>> 
>> I believe by default cassandra listens on 9042
>> 
>> From:  Ted Yu <yuzhih...@gmail.com>
>> Date:  Tuesday, March 8, 2016 at 6:11 PM
>> To:  Andrew Davidson <a...@santacruzintegration.com>
>> Cc:  "user @spark" <user@spark.apache.org>
>> Subject:  Re: pyspark spark-cassandra-connector java.io.IOException: Failed
>> to open native connection to Cassandra at {192.168.1.126}:9042
>> 
>>> Have you contacted spark-cassandra-connector related mailing list ?
>>> 
>>> I wonder where the port 9042 came from.
>>> 
>>> Cheers
>>> 
>>> On Tue, Mar 8, 2016 at 6:02 PM, Andy Davidson
>>> <a...@santacruzintegration.com> wrote:
>>>> 
>>>> I am using spark-1.6.0-bin-hadoop2.6. I am trying to write a python
>>>> notebook that reads a data frame from Cassandra.
>>>> 
>>>> I connect to cassadra using an ssh tunnel running on port 9043. CQLSH works
>>>> how ever I can not figure out how to configure my notebook. I have tried
>>>> various hacks any idea what I am doing wrong
>>>> 
>>>> : java.io.IOException: Failed to open native connection to Cassandra at
>>>> {192.168.1.126}:9042
>>>> 
>>>> 
>>>> 
>>>> Thanks in advance
>>>> 
>>>> Andy
>>>> 
>>>> 
>>>> 
>>>> $ extraPkgs="--packages com.databricks:spark-csv_2.11:1.3.0 \
>>>> --packages datastax:spark-cassandra-connector:1.6.0-M1-s_2.11"
>>>> 
>>>> $ export PYSPARK_PYTHON=python3
>>>> $ export PYSPARK_DRIVER_PYTHON=python3
>>>> $ IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark $extraPkgs $*
>>>> 
>>>> 
>>>> 
>>>> In [15]:
>>>> 1
>>>> sqlContext.setConf("spark.cassandra.connection.host”,”127.0.0.1:9043
>>>> <http://127.0.0.1:9043> ")
>>>> 2
>>>> df = sqlContext.read\
>>>> 3
>>>> .format("org.apache.spark.sql.cassandra")\
>>>> 4
>>>> .options(table=“time_series", keyspace="notification")\
>>>> 5
>>>> .load()
>>>> 6
>>>> ​
>>>> 7
>>>> df.printSchema()
>>>> 8
>>>> df.show()
>>>> 

Re: pyspark spark-cassandra-connector java.io.IOException: Failed to open native connection to Cassandra at {192.168.1.126}:9042

2016-03-08 Thread Andy Davidson
Hi Ted

I believe by default cassandra listens on 9042

From:  Ted Yu <yuzhih...@gmail.com>
Date:  Tuesday, March 8, 2016 at 6:11 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: pyspark spark-cassandra-connector java.io.IOException: Failed
to open native connection to Cassandra at {192.168.1.126}:9042

> Have you contacted spark-cassandra-connector related mailing list ?
> 
> I wonder where the port 9042 came from.
> 
> Cheers
> 
> On Tue, Mar 8, 2016 at 6:02 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> 
>> I am using spark-1.6.0-bin-hadoop2.6. I am trying to write a python notebook
>> that reads a data frame from Cassandra.
>> 
>> I connect to cassadra using an ssh tunnel running on port 9043. CQLSH works
>> how ever I can not figure out how to configure my notebook. I have tried
>> various hacks any idea what I am doing wrong
>> 
>> : java.io.IOException: Failed to open native connection to Cassandra at
>> {192.168.1.126}:9042
>> 
>> 
>> 
>> Thanks in advance
>> 
>> Andy
>> 
>> 
>> 
>> $ extraPkgs="--packages com.databricks:spark-csv_2.11:1.3.0 \
>> --packages datastax:spark-cassandra-connector:1.6.0-M1-s_2.11"
>> 
>> $ export PYSPARK_PYTHON=python3
>> $ export PYSPARK_DRIVER_PYTHON=python3
>> $ IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark $extraPkgs $*
>> 
>> 
>> 
>> In [15]:
>> 1
>> sqlContext.setConf("spark.cassandra.connection.host”,”127.0.0.1:9043
>> <http://127.0.0.1:9043> ")
>> 2
>> df = sqlContext.read\
>> 3
>> .format("org.apache.spark.sql.cassandra")\
>> 4
>> .options(table=“time_series", keyspace="notification")\
>> 5
>> .load()
>> 6
>> ​
>> 7
>> df.printSchema()
>> 8
>> df.show()
>> ---Py
>> 4JJavaError Traceback (most recent call last)
>>  in ()  1
>> sqlContext.setConf("spark.cassandra.connection.host","localhost:9043")> 2
>> df = sqlContext.read.format("org.apache.spark.sql.cassandra")
>> .options(table="kv", keyspace="notification").load()  3   4
>> df.printSchema()  5
>> df.show()/Users/andrewdavidson/workSpace/spark/spark-1.6.0-bin-hadoop2.6/pyth
>> on/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
>> 137 return self._df(self._jreader.load(path))138
>> else:--> 139 return self._df(self._jreader.load())140 141
>> @since(1.4)/Users/andrewdavidson/workSpace/spark/spark-1.6.0-bin-hadoop2.6/py
>> thon/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
>> 811 answer = self.gateway_client.send_command(command)812
>> return_value = get_return_value(
>> --> 813 answer, self.gateway_client, self.target_id, self.name
>> <http://self.name> )
>> 814 815 for temp_arg in
>> temp_args:/Users/andrewdavidson/workSpace/spark/spark-1.6.0-bin-hadoop2.6/pyt
>> hon/pyspark/sql/utils.py in deco(*a, **kw) 43 def deco(*a, **kw):
>> 44 try:---> 45 return f(*a, **kw) 46 except
>> py4j.protocol.Py4JJavaError as e: 47 s =
>> e.java_exception.toString()/Users/andrewdavidson/workSpace/spark/spark-1.6.0-
>> bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in
>> get_return_value(answer, gateway_client, target_id, name)306
>> raise Py4JJavaError(
>> 307 "An error occurred while calling
>> {0}{1}{2}.\n".--> 308 format(target_id, ".", name),
>> value)
>> 309 else:310 raise Py4JError(
>> 
>> Py4JJavaError: An error occurred while calling o280.load.
>> : java.io.IOException: Failed to open native connection to Cassandra at
>> {192.168.1.126}:9042
>>  at 
>> com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$conne
>> ctor$cql$CassandraConnector$$createSession(CassandraConnector.scala:162)
>>  at 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(Cassandr
>> aConnector.scala:148)
>>  at 
>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(Cassandr
>> aConnector.scala:148)
>>  at 
>> com.datastax.spark.connector.cql

pyspark spark-cassandra-connector java.io.IOException: Failed to open native connection to Cassandra at {192.168.1.126}:9042

2016-03-08 Thread Andy Davidson

I am using spark-1.6.0-bin-hadoop2.6. I am trying to write a python notebook
that reads a data frame from Cassandra.

I connect to cassadra using an ssh tunnel running on port 9043. CQLSH works
how ever I can not figure out how to configure my notebook. I have tried
various hacks any idea what I am doing wrong

: java.io.IOException: Failed to open native connection to Cassandra at
{192.168.1.126}:9042



Thanks in advance

Andy



$ extraPkgs="--packages com.databricks:spark-csv_2.11:1.3.0 \
--packages datastax:spark-cassandra-connector:1.6.0-M1-s_2.11"

$ export PYSPARK_PYTHON=python3
$ export PYSPARK_DRIVER_PYTHON=python3
$ IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark $extraPkgs $*



In [15]:
1
sqlContext.setConf("spark.cassandra.connection.host”,”127.0.0.1:9043")
2
df = sqlContext.read\
3
.format("org.apache.spark.sql.cassandra")\
4
.options(table=“time_series", keyspace="notification")\
5
.load()
6
​
7
df.printSchema()
8
df.show()
---
Py4JJavaError Traceback (most recent call last)
 in ()
  1 
sqlContext.setConf("spark.cassandra.connection.host","localhost:9043")
> 2 df = sqlContext.read.format("org.apache.spark.sql.cassandra")
.options(table="kv", keyspace="notification").load()
  3 
  4 df.printSchema()
  5 df.show()

/Users/andrewdavidson/workSpace/spark/spark-1.6.0-bin-hadoop2.6/python/pyspa
rk/sql/readwriter.py in load(self, path, format, schema, **options)
137 return self._df(self._jreader.load(path))
138 else:
--> 139 return self._df(self._jreader.load())
140 
141 @since(1.4)

/Users/andrewdavidson/workSpace/spark/spark-1.6.0-bin-hadoop2.6/python/lib/p
y4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814 
815 for temp_arg in temp_args:

/Users/andrewdavidson/workSpace/spark/spark-1.6.0-bin-hadoop2.6/python/pyspa
rk/sql/utils.py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/Users/andrewdavidson/workSpace/spark/spark-1.6.0-bin-hadoop2.6/python/lib/p
y4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client,
target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling o280.load.
: java.io.IOException: Failed to open native connection to Cassandra at
{192.168.1.126}:9042
at 
com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$conn
ector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(Cassand
raConnector.scala:148)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(Cassand
raConnector.scala:148)
at 
com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCo
untedCache.scala:31)
at 
com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.sca
la:56)
at 
com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraCon
nector.scala:81)
at 
com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraC
onnector.scala:109)
at 
com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$.getTok
enFactory(CassandraRDDPartitioner.scala:184)
at 
org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourc
eRelation.scala:267)
at 
org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.sc
ala:57)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(Resolve
dDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at 

streaming will I loose data if spark.streaming.backpressure.enabled=true

2016-03-07 Thread Andy Davidson

http://spark.apache.org/docs/latest/streaming-programming-guide.html#deployi
ng-applications

Gives a brief discussion about max rate and back pressure

Its not clear to me what will happen. I use an unreliable reciever. Let say
me app is running and process time is less then window length. Happy happy

After a while there is a burst of data and processing time now exceeds
windows length What happens? Am I going to loose data.(I have implemented
checkpoints)

Okay time goes by data rate returns to normal processing time is < window
length again. Does the system catch up? Or is data lost?

Kind regards

Andy





how to implement and deploy robust streaming apps

2016-03-07 Thread Andy Davidson
One of the challenges we need to prepare for with streaming apps is bursty
data. Typically we need to estimate our worst case data load and make sure
we have enough capacity


It not obvious what best practices are with spark streaming.

* we have implemented check pointing as described in the prog guide
* Use stand alone cluster manager and spark-submit
* We use the mgmt console to kill drives when needed
* we plan to configure write ahead spark.streaming.backpressure.enabled to
true.
* our application runs a single unreliable receive
> * We run multiple implementation configured to partition the input

As long as our processing time is < our windowing time everything is fine

In the streaming systems I have worked on in the past we scaled out by using
load balancers and proxy farms to create buffering capacity. Its not clear
how to scale out spark

In our limited testing it seems like we have a single app configure to
receive a predefined portion of the data. Once it is stated we can not add
additional resources. Adding cores and memory does not seem increase our
capacity 


Kind regards

Andy






streaming app performance when would increasing execution size or adding more cores

2016-03-07 Thread Andy Davidson
We just deployed our first streaming apps. The next step is running them so
they run reliably

We have spend a lot of time reading the various prog guides looking at the
standalone cluster manager app performance web pages.

Looking at the streaming tab and the stages tab have been the most helpful
in tuning our app. However we do not understand the connection between
memory  and # cores will effect throughput and performance. Usually adding
memory is the cheapest way to improve performance.

When we have a single receiver call spark-submit --total-executor-cores 2.
Changing the value does not seem to change throughput. our bottle neck was
s3 write time, saveAsTextFile(). Reducing the number of partitions
dramatically reduces s3 write times.

Adding memory also does not improve performance

I would think that adding more cores would allow more concurrent tasks run.
That is to say reducing num partions would slow things down

What are best practices?

Kind regards

Andy










Re: newbie unable to write to S3 403 forbidden error

2016-02-24 Thread Andy Davidson
Hi Sabarish

We finally got S3 working. I think the real problem was that by default
spark-ec2 uses an old version of hadoop (1.0.4). The we passed
--copy-aws-credentials --hadoop-major-version=2  it started working

Kind regards

Andy


From:  Sabarish Sasidharan <sabarish.sasidha...@manthan.com>
Date:  Sunday, February 14, 2016 at 7:05 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: newbie unable to write to S3 403 forbidden error

> 
> Make sure you are using s3 bucket in same region. Also I would access my
> bucket this way s3n://bucketname/foldername.
> 
> You can test privileges using the s3 cmd line client.
> 
> Also, if you are using instance profiles you don't need to specify access and
> secret keys. No harm in specifying though.
> 
> Regards
> Sab
> On 12-Feb-2016 2:46 am, "Andy Davidson" <a...@santacruzintegration.com> wrote:
>> I am using spark 1.6.0 in a cluster created using the spark-ec2 script. I am
>> using the standalone cluster manager
>> 
>> My java streaming app is not able to write to s3. It appears to be some for
>> of permission problem.
>> 
>> Any idea what the problem might be?
>> 
>> I tried use the IAM simulator to test the policy. Everything seems okay. Any
>> idea how I can debug this problem?
>> 
>> Thanks in advance
>> 
>> Andy
>> 
>> JavaSparkContext jsc = new JavaSparkContext(conf);
>> 
>> 
>> // I did not include the full key in my email
>>// the keys do not contain Œ\¹
>>// these are the keys used to create the cluster. They belong to the
>> IAM user andy
>> jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAJREX");
>> 
>> jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
>> "uBh9v1hdUctI23uvq9qR");
>> 
>> 
>> 
>> 
>>   private static void saveTweets(JavaDStream jsonTweets, String
>> outputURI) {
>> 
>> jsonTweets.foreachRDD(new VoidFunction2<JavaRDD, Time>() {
>> 
>> private static final long serialVersionUID = 1L;
>> 
>> 
>> 
>> @Override
>> 
>> public void call(JavaRDD rdd, Time time) throws Exception
>> {
>> 
>> if(!rdd.isEmpty()) {
>> 
>> // bucket name is Œcom.pws.twitter¹ it has a folder Œjson'
>> 
>> String dirPath =
>> "s3n://s3-us-west-1.amazonaws.com/com.pws.twitter/
>> <http://s3-us-west-1.amazonaws.com/com.pws.twitter/> json² + "-" +
>> time.milliseconds();
>> 
>> rdd.saveAsTextFile(dirPath);
>> 
>> }
>> 
>> }
>> 
>> });
>> 
>> 
>> 
>> 
>> Bucket name : com.pws.titter
>> Bucket policy (I replaced the account id)
>> 
>> {
>> "Version": "2012-10-17",
>> "Id": "Policy1455148808376",
>> "Statement": [
>> {
>> "Sid": "Stmt1455148797805",
>> "Effect": "Allow",
>> "Principal": {
>> "AWS": "arn:aws:iam::123456789012:user/andy"
>> },
>> "Action": "s3:*",
>> "Resource": "arn:aws:s3:::com.pws.twitter/*"
>> }
>> ]
>> }
>> 
>> 




streaming spark is writing results to S3 a good idea?

2016-02-23 Thread Andy Davidson
Currently our stream apps write results to hdfs. We are running into
problems with HDFS becoming corrupted and running out of space. It seems
like a better solution might be to write directly to S3. Is this a good
idea?

We plan to continue to write our checkpoints to hdfs

Are there any issues to be aware of? Maybe performance or something else to
watch out for?

This is our first S3 project. Does storage just grow on on demand?

Kind regards

Andy


P.s. Turns out we are using an old version of hadoop (v 1.0.4)







spark-1.6.0-bin-hadoop2.6/ec2/spark-ec2 uses old version of hadoop

2016-02-23 Thread Andy Davidson
I do not have any hadoop legacy code. My goal is to run spark on top of
HDFS.

Recently I have been have hdfs corruption problem. I was also never able to
access S3 even though I used --copy-aws-credentials. I noticed that by
default the spark-ec2 script uses hadoop 1.0.4. I ran help and discovered
you can specify the hadoop-major version. It seems like this is still using
an old version of hadoop. I assume there are a lot of bug fixes between
hadoop 2.0.0 cdh4.2 and apache hadoop 2.6.x or 2.7.x

Any idea what I would need to do to move to a new version of hadoop hdfs?

Kind regards

Andy

[ec2-user@ip-172-31-18-23 ~]$  /root/ephemeral-hdfs/bin/hadoop version

Hadoop 2.0.0-cdh4.2.0

Subversion 
file:///var/lib/jenkins/workspace/CDH4.2.0-Packaging-Hadoop/build/cdh4/hadoo
p/2.0.0-cdh4.2.0/source/hadoop-common-project/hadoop-common -r
8bce4bd28a464e0a92950c50ba01a9deb1d85686

Compiled by jenkins on Fri Feb 15 10:42:32 PST 2013

>From source with checksum 3eefc211a14ac7b6e764d6ded2eeeb26

[ec2-user@ip-172-31-19-24 ~]$






Re: GroupedDataset needs a mapValues

2016-02-14 Thread Andy Davidson
Hi Michael

From:  Michael Armbrust 
Date:  Saturday, February 13, 2016 at 9:31 PM
To:  Koert Kuipers 
Cc:  "user @spark" 
Subject:  Re: GroupedDataset needs a mapValues

> Instead of grouping with a lambda function, you can do it with a column
> expression to avoid materializing an unnecessary tuple:
> 
> df.groupBy($"_1")


I am unfamiliar with this notation. Is there something similar for Java and
python?

Kind regards

Andy


> 
> Regarding the mapValues, you can do something similar using an Aggregator
>  20Aggregator.html> , but I agree that we should consider something lighter
> weight like the mapValues you propose.
> 
> On Sat, Feb 13, 2016 at 1:35 PM, Koert Kuipers  wrote:
>> i have a Dataset[(K, V)]
>> i would like to group by k and then reduce V using a function (V, V) => V
>> how do i do this?
>> 
>> i would expect something like:
>> val ds = Dataset[(K, V)] ds.groupBy(_._1).mapValues(_._2).reduce(f)
>> or better:
>> ds.grouped.reduce(f)  # grouped only works on Dataset[(_, _)] and i dont care
>> about java api
>> 
>> but there is no mapValues or grouped. ds.groupBy(_._1) gives me a
>> GroupedDataset[(K, (K, V))] which is inconvenient. i could carry the key
>> through the reduce operation but that seems ugly and inefficient.
>> 
>> any thoughts?
>> 
>> 
> 




Re: best practices? spark streaming writing output detecting disk full error

2016-02-12 Thread Andy Davidson
Hi Arkadiusz

Do you have any suggestions?

As an engineer I think when I get disk full errors I want the application to
terminate. Its a lot easier for ops to really there is a problem.


Andy


From:  Arkadiusz Bicz <arkadiusz.b...@gmail.com>
Date:  Friday, February 12, 2016 at 1:57 AM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: best practices? spark streaming writing output detecting disk
full error

> Hi,
> 
> You need good monitoring tools to send you alarms about disk, network
> or  applications errors, but I think it is general dev ops work not
> very specific to spark or hadoop.
> 
> BR,
> 
> Arkadiusz Bicz
> https://www.linkedin.com/in/arkadiuszbicz
> 
> On Thu, Feb 11, 2016 at 7:09 PM, Andy Davidson
> <a...@santacruzintegration.com> wrote:
>>  We recently started a Spark/Spark Streaming POC. We wrote a simple streaming
>>  app in java to collect tweets. We choose twitter because we new we get a lot
>>  of data and probably lots of burst. Good for stress testing
>> 
>>  We spun up  a couple of small clusters using the spark-ec2 script. In one
>>  cluster we wrote all the tweets to HDFS in a second cluster we write all the
>>  tweets to S3
>> 
>>  We were surprised that our HDFS file system reached 100 % of capacity in a
>>  few days. This resulted with ³all data nodes dead². We where surprised
>>  because the actually stream app continued to run. We had no idea we had a
>>  problem until a day or two after the disk became full when we noticed we
>>  where missing a lot of data.
>> 
>>  We ran into a similar problem with our s3 cluster. We had a permission
>>  problem and where un able to write any data yet our stream app continued to
>>  run
>> 
>> 
>>  Spark generated mountains of logs,We are using the stand alone cluster
>>  manager. All the log levels wind up in the ³error² log. Making it hard to
>>  find real errors and warnings using the web UI. Our app is written in Java
>>  so my guess is the write errors must be unable. I.E. We did not know in
>>  advance that they could occur . They are basically undocumented.
>> 
>> 
>> 
>>  We are a small shop. Running something like splunk would add a lot of
>>  expense and complexity for us at this stage of our growth.
>> 
>>  What are best practices
>> 
>>  Kind Regards
>> 
>>  Andy
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




Re: newbie unable to write to S3 403 forbidden error

2016-02-12 Thread Andy Davidson
Hi Igor

So I assume you are able to use s3 from spark?

Do you use rdd.saveAsTextFile() ?

How did you create your cluster? I.E. Did you use the spark-1.6.0/spark-ec2
script, EMR, or something else?


I tried several version of the url including no luck :-(

The bucket name is Œcom.ps.twitter¹. It has a folder Œson'

We have a developer support contract with amazon how ever our case has been
unassigned for several days now

Thanks

Andy

P.s. In general debugging permission problems is always difficult from the
client side. Secure servers do not want to make it easy for hackers

From:  Igor Berman <igor.ber...@gmail.com>
Date:  Friday, February 12, 2016 at 4:53 AM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: newbie unable to write to S3 403 forbidden error

>  String dirPath = "s3n://s3-us-west-1.amazonaws.com/com.pws.twitter/
> <http://s3-us-west-1.amazonaws.com/com.pws.twitter/> json²
> 
> not sure, but 
> can you try to remove s3-us-west-1.amazonaws.com
> <http://s3-us-west-1.amazonaws.com/com.pws.twitter/>  from path ?
> 
> On 11 February 2016 at 23:15, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> I am using spark 1.6.0 in a cluster created using the spark-ec2 script. I am
>> using the standalone cluster manager
>> 
>> My java streaming app is not able to write to s3. It appears to be some for
>> of permission problem.
>> 
>> Any idea what the problem might be?
>> 
>> I tried use the IAM simulator to test the policy. Everything seems okay. Any
>> idea how I can debug this problem?
>> 
>> Thanks in advance
>> 
>> Andy
>> 
>> JavaSparkContext jsc = new JavaSparkContext(conf);
>> 
>> 
>> // I did not include the full key in my email
>>// the keys do not contain Œ\¹
>>// these are the keys used to create the cluster. They belong to the
>> IAM user andy
>> jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAJREX");
>> 
>> jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
>> "uBh9v1hdUctI23uvq9qR");
>> 
>> 
>> 
>> 
>>   private static void saveTweets(JavaDStream jsonTweets, String
>> outputURI) {
>> 
>> jsonTweets.foreachRDD(new VoidFunction2<JavaRDD, Time>() {
>> 
>> private static final long serialVersionUID = 1L;
>> 
>> 
>> 
>> @Override
>> 
>> public void call(JavaRDD rdd, Time time) throws Exception
>> {
>> 
>> if(!rdd.isEmpty()) {
>> 
>> // bucket name is Œcom.pws.twitter¹ it has a folder Œjson'
>> 
>> String dirPath =
>> "s3n://s3-us-west-1.amazonaws.com/com.pws.twitter/
>> <http://s3-us-west-1.amazonaws.com/com.pws.twitter/> json² + "-" +
>> time.milliseconds();
>> 
>> rdd.saveAsTextFile(dirPath);
>> 
>> }
>> 
>> }
>> 
>> });
>> 
>> 
>> 
>> 
>> Bucket name : com.pws.titter
>> Bucket policy (I replaced the account id)
>> 
>> {
>> "Version": "2012-10-17",
>> "Id": "Policy1455148808376",
>> "Statement": [
>> {
>> "Sid": "Stmt1455148797805",
>> "Effect": "Allow",
>> "Principal": {
>> "AWS": "arn:aws:iam::123456789012:user/andy"
>> },
>> "Action": "s3:*",
>> "Resource": "arn:aws:s3:::com.pws.twitter/*"
>> }
>> ]
>> }
>> 
>> 
> 




Re: Question on Spark architecture and DAG

2016-02-12 Thread Andy Davidson


From:  Mich Talebzadeh 
Date:  Thursday, February 11, 2016 at 2:30 PM
To:  "user @spark" 
Subject:  Question on Spark architecture and DAG

> Hi,
> 
> I have used Hive on Spark engine and of course Hive tables and its pretty
> impressive comparing Hive using MR engine.
> 
>  
> 
> Let us assume that I use spark shell. Spark shell is a client that connects to
> spark master running on a host and port like below
> 
> spark-shell --master spark://50.140.197.217:7077:
> 
> Ok once I connect I create an RDD to read a text file:
> 
> val oralog = sc.textFile("/test/alert_mydb.log")
> 
> I then search for word Errors in that file
> 
> oralog.filter(line => line.contains("Errors")).collect().foreach(line =>
> println(line))
> 
>  
> 
> Questions:
> 
>  
> 1. In order to display the lines (the result set) containing word "Errors",
> the content of the file (i.e. the blocks on HDFS) need to be read into memory.
> Is my understanding correct that as per RDD notes those blocks from the file
> will be partitioned across the cluster and each node will have its share of
> blocks in memory?


Typically results are written to disk. For example look at
rdd.saveAsTextFile(). You can also use ³collect² to copy the RDD data into
the drivers local memory. You need to be careful that all the data will fit
in memory.

> 1. 
> 2. Once the result is returned back they need to be sent to the client that
> has made the connection to master. I guess this is a simple TCP operation much
> like any relational database sending the result back?


I run several spark streaming apps. One collects data, does some clean up
and publishes the results to down stream systems using activeMQ. Some of our
other apps just write on a socket

> 1. 
> 2. Once the results are returned if no request has been made to keep the data
> in memory, those blocks in memory will be discarded?

There are couple of thing to consider, for example if your batch job
completes all memory is returned. Programaticaly you make RDD persistent or
cause them to be cached in memory

> 1. 
> 2. Regardless of the storage block size on disk (128MB, 256MB etc), the memory
> pages are 2K in relational databases? Is this the case in Spark as well?
> Thanks,
> 
>  Mich Talebzadeh
> 
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8
> Pw 
>  8Pw> 
>  
> http://talebzadehmich.wordpress.com 
>  
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this message
> shall not be understood as given or endorsed by Peridale Technology Ltd, its
> subsidiaries or their employees, unless expressly so stated. It is the
> responsibility of the recipient to ensure that this email is virus free,
> therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>  
>  




org.apache.spark.sql.AnalysisException: undefined function lit;

2016-02-12 Thread Andy Davidson
I am trying to add a column with a constant value to my data frame. Any idea
what I am doing wrong?

Kind regards

Andy


 DataFrame result = Š
 String exprStr = "lit(" + time.milliseconds()+ ") as ms";

 logger.warn("AEDWIP expr: {}", exprStr);

  result.selectExpr("*", exprStr).show(false);


WARN  02:06:17 streaming-job-executor-0 c.p.f.s.s.CalculateAggregates$1 call
line:96 AEDWIP expr: lit(1455329175000) as ms

ERROR 02:06:17 JobScheduler o.a.s.Logging$class logError line:95 Error
running job streaming job 1455329175000 ms.0

org.apache.spark.sql.AnalysisException: undefined function lit;








best practices? spark streaming writing output detecting disk full error

2016-02-11 Thread Andy Davidson
We recently started a Spark/Spark Streaming POC. We wrote a simple streaming
app in java to collect tweets. We choose twitter because we new we get a lot
of data and probably lots of burst. Good for stress testing

We spun up  a couple of small clusters using the spark-ec2 script. In one
cluster we wrote all the tweets to HDFS in a second cluster we write all the
tweets to S3

We were surprised that our HDFS file system reached 100 % of capacity in a
few days. This resulted with ³all data nodes dead². We where surprised
because the actually stream app continued to run. We had no idea we had a
problem until a day or two after the disk became full when we noticed we
where missing a lot of data.

We ran into a similar problem with our s3 cluster. We had a permission
problem and where un able to write any data yet our stream app continued to
run


Spark generated mountains of logs,We are using the stand alone cluster
manager. All the log levels wind up in the ³error² log. Making it hard to
find real errors and warnings using the web UI. Our app is written in Java
so my guess is the write errors must be unable. I.E. We did not know in
advance that they could occur . They are basically undocumented.



We are a small shop. Running something like splunk would add a lot of
expense and complexity for us at this stage of our growth.

What are best practices

Kind Regards

Andy




newbie unable to write to S3 403 forbidden error

2016-02-11 Thread Andy Davidson
I am using spark 1.6.0 in a cluster created using the spark-ec2 script. I am
using the standalone cluster manager

My java streaming app is not able to write to s3. It appears to be some for
of permission problem.

Any idea what the problem might be?

I tried use the IAM simulator to test the policy. Everything seems okay. Any
idea how I can debug this problem?

Thanks in advance

Andy

JavaSparkContext jsc = new JavaSparkContext(conf);


// I did not include the full key in my email
   // the keys do not contain Œ\¹
   // these are the keys used to create the cluster. They belong to the
IAM user andy
jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAJREX");

jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
"uBh9v1hdUctI23uvq9qR");




  private static void saveTweets(JavaDStream jsonTweets, String
outputURI) {

jsonTweets.foreachRDD(new VoidFunction2() {

private static final long serialVersionUID = 1L;



@Override

public void call(JavaRDD rdd, Time time) throws
Exception {

if(!rdd.isEmpty()) {

// bucket name is Œcom.pws.twitter¹ it has a folder Œjson'

String dirPath =
"s3n://s3-us-west-1.amazonaws.com/com.pws.twitter/json² + "-" +
time.milliseconds();

rdd.saveAsTextFile(dirPath);

}  

}

});




Bucket name : com.pws.titter
Bucket policy (I replaced the account id)

{
"Version": "2012-10-17",
"Id": "Policy1455148808376",
"Statement": [
{
"Sid": "Stmt1455148797805",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:user/andy"
},
"Action": "s3:*",
"Resource": "arn:aws:s3:::com.pws.twitter/*"
}
]
}






  1   2   3   >