Heap Settings for History Server

2017-07-31 Thread N Sa
Hi folks,

I couldn't find much literature on this so I figured I could ask here.

Does anyone have experience in tuning the memory settings and interval
times of the Spark History Server?
Let's say I have 500 applications at 0.5 G each with a
*spark.history.fs.update.interval*  of 400s.
Is there a direct memory correlation that can help me set an optimum value?

Looking for some advice if anyone has tuned the History Server to render
large amounts of applications.

Thanks.
-- 
Regards,
Neelesh S. Salian


RE: SPARK Issue in Standalone cluster

2017-07-31 Thread Mahesh Sawaiker
Gourav,
Riccardo’s answer is spot on.
What is happening is one node of spark is writing to its own directory and 
telling a slave to read the data from there, when the slave goes to read it, 
the part is not found.

Check the folder 
Users/gouravsengupta/Development/spark/sparkdata/test1/part-1-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet
 on the slave.
The reason it ran on spark 1.5 may have been because the executor ran on the 
driver itself. There is not much use to a set up where you don’t have some kind 
of distributed file system, so I would encourage you to use hdfs, or a mounted 
file system shared by all nodes.

Regards,
Mahesh


From: Gourav Sengupta [mailto:gourav.sengu...@gmail.com]
Sent: Monday, July 31, 2017 9:54 PM
To: Riccardo Ferrari
Cc: user
Subject: Re: SPARK Issue in Standalone cluster

Hi Riccardo,

I am grateful for your kind response.

Also I am sure that your answer is completely wrong and errorneous. SPARK must 
be having a method so that different executors do not pick up the same files to 
process. You also did not answer the question why was the processing successful 
in SPARK 1.5 and not in SPARK 2.2.

Also the exact same directory is is present across in both the nodes.

I feel quite facinated when individuals respond before even understanding the 
issue, or trying out the code.

It will be of great help if someone could kindly read my email and help me 
figure out the issue.


Regards,
Gourav Sengupta



On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari 
> wrote:
Hi Gourav,

The issue here is the location where you're trying to write/read from 
:/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
When dealing with clusters all the paths and resources should be available to 
all executors (and driver), and that is reason why you generally use HDFS, S3, 
NFS or any shared file system.

Spark assumes your data is generally available to all nodes and does not tries 
to pick up the data from a selected node, it rather tries to write/read in 
parallel from the executor nodes. Also given its control logic there is no way 
(read. you should not care) to know what executor is doing what task.

Hope it helps,
Riccardo

On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta 
> wrote:
Hi,

I am working by creating a native SPARK standalone cluster 
(https://spark.apache.org/docs/2.2.0/spark-standalone.html)

Therefore I  do not have a HDFS.


EXERCISE:
Its the most fundamental and simple exercise. Create a sample SPARK dataframe 
and then write it to a location and then read it back.

SETTINGS:
So after I have installed SPARK in two physical systems with the same:
1. SPARK version,
2. JAVA version,
3. PYTHON_PATH
4. SPARK_HOME
5. PYSPARK_PYTHON
the user in both the systems is the root user therefore there are no permission 
issues anywhere.

I am able to start:
1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate computers)

After that I can see in the spark UI (at port 8080) two workers.


CODE:
Then I run the following code:

==
import findspark
import os
os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.master("spark://mastersystem.local:7077")
.appName("gouravtest")
.enableHiveSupport()
.getOrCreate())
import pandas, numpy
testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(1, 4), 
columns=list('ABCD')))
testdf.cache()
testdf.count()
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test2").count()
==


ERROR I (in above code):
ERROR in line: 
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
This line does not fail or report any error. But when I am looking at the stage 
in spark Application UI the error reported for one of the slave node which is 
not in the same system as the master node is mentioned below. The writing on 
the slave node which is in the same physical system as the Master happens 
correctly. (NOTE: slave node basically the worker and master node the driver)
--

0 (TID 41). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
'attempt_20170731001928_0002_m_06_0' to 
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_06

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
attempt_20170731001928_0002_m_06_0: Committed

17/07/31 00:19:29 INFO Executor: 

Re: ClassNotFoundException for Workers

2017-07-31 Thread Noppanit Charassinvichai
I've included that in my build file for the fat jar already.


libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.11.155"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.155"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-core" % "1.11.155"

Not sure if I need special configuration?

On Tue, 25 Jul 2017 at 04:17 周康  wrote:

> Ensure com.amazonaws.services.s3.AmazonS3ClientBuilder in your classpath
> which include your application jar and attached executor jars.
>
> 2017-07-20 6:12 GMT+08:00 Noppanit Charassinvichai :
>
>> I have this spark job which is using S3 client in mapPartition. And I get
>> this error
>>
>> Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times,
>> most recent failure: Lost task 0.3 in stage 3.0 (TID 74,
>> ip-10-90-78-177.ec2.internal, executor 11): java.lang.NoClassDefFoundError:
>> Could not initialize class com.amazonaws.services.s3.AmazonS3ClientBuilder
>> +details
>> Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times,
>> most recent failure: Lost task 0.3 in stage 3.0 (TID 74,
>> ip-10-90-78-177.ec2.internal, executor 11): java.lang.NoClassDefFoundError:
>> Could not initialize class com.amazonaws.services.s3.AmazonS3ClientBuilder
>> at SparrowOrc$$anonfun$1.apply(sparrowOrc.scala:49)
>> at SparrowOrc$$anonfun$1.apply(sparrowOrc.scala:46)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> This is my code
>> val jsonRows = sqs.mapPartitions(partitions => {
>>   val s3Client = AmazonS3ClientBuilder.standard().withCredentials(new
>> DefaultCredentialsProvider).build()
>>
>>   val txfm = new LogLine2Json
>>   val log = Logger.getLogger("parseLog")
>>
>>   partitions.flatMap(messages => {
>> val sqsMsg = Json.parse(messages)
>> val bucketName =
>> Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"",
>> "")
>> val key =
>> Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"",
>> "")
>> val obj = s3Client.getObject(new GetObjectRequest(bucketName,
>> key))
>> val stream = obj.getObjectContent()
>>
>> scala.io.Source.fromInputStream(stream).getLines().map(line => {
>>   try {
>> txfm.parseLine(line)
>>   }
>>   catch {
>> case e: Throwable => {
>>   log.info(line); "{}";
>> }
>>   }
>> }).filter(line => line != "{}")
>>   })
>> })
>>
>> This is my build.sbt
>>
>> name := "sparrow-to-orc"
>>
>> version := "0.1"
>>
>> scalaVersion := "2.11.8"
>>
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.1.0" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
>> % "provided"
>>
>> libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.7.3" %
>> "provided"
>> libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.3" %
>> "provided"
>> libraryDependencies += "com.cn" %% "sparrow-clf-parser" % "1.1-SNAPSHOT"
>>
>> libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.11.155"
>> libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.155"
>> libraryDependencies += "com.amazonaws" % "aws-java-sdk-core" % "1.11.155"
>>
>> libraryDependencies += "com.github.seratch" %% "awscala" % "0.6.+"
>> libraryDependencies += "com.typesafe.play" %% "play-json" % "2.6.0"
>> dependencyOverrides ++= Set("com.fasterxml.jackson.core" %
>> "jackson-databind" % "2.6.0")
>>
>>
>>
>> 

Re: SPARK Issue in Standalone cluster

2017-07-31 Thread Gourav Sengupta
Hi Riccardo,

I am grateful for your kind response.

Also I am sure that your answer is completely wrong and errorneous. SPARK
must be having a method so that different executors do not pick up the same
files to process. You also did not answer the question why was the
processing successful in SPARK 1.5 and not in SPARK 2.2.

Also the exact same directory is is present across in both the nodes.

I feel quite facinated when individuals respond before even understanding
the issue, or trying out the code.

It will be of great help if someone could kindly read my email and help me
figure out the issue.


Regards,
Gourav Sengupta



On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari 
wrote:

> Hi Gourav,
>
> The issue here is the location where you're trying to write/read from :
> /Users/gouravsengupta/Development/spark/sparkdata/test1/p...
> When dealing with clusters all the paths and resources should be available
> to all executors (and driver), and that is reason why you generally use
> HDFS, S3, NFS or any shared file system.
>
> Spark assumes your data is generally available to all nodes and does not
> tries to pick up the data from a selected node, it rather tries to
> write/read in parallel from the executor nodes. Also given its control
> logic there is no way (read. you should not care) to know what executor is
> doing what task.
>
> Hope it helps,
> Riccardo
>
> On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> I am working by creating a native SPARK standalone cluster (
>> https://spark.apache.org/docs/2.2.0/spark-standalone.html)
>>
>> Therefore I  do not have a HDFS.
>>
>>
>> EXERCISE:
>> Its the most fundamental and simple exercise. Create a sample SPARK
>> dataframe and then write it to a location and then read it back.
>>
>> SETTINGS:
>> So after I have installed SPARK in two physical systems with the same:
>> 1. SPARK version,
>> 2. JAVA version,
>> 3. PYTHON_PATH
>> 4. SPARK_HOME
>> 5. PYSPARK_PYTHON
>> the user in both the systems is the root user therefore there are no
>> permission issues anywhere.
>>
>> I am able to start:
>> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
>> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
>> computers)
>>
>> After that I can see in the spark UI (at port 8080) two workers.
>>
>>
>> CODE:
>> Then I run the following code:
>>
>> ==
>> import findspark
>> import os
>> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Develop
>> ment/spark/spark/'
>> findspark.init()
>> import pyspark
>> from pyspark.sql import SparkSession
>> spark = (SparkSession.builder
>> .master("spark://mastersystem.local:7077")
>> .appName("gouravtest")
>> .enableHiveSupport()
>> .getOrCreate())
>> import pandas, numpy
>> testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(1,
>> 4), columns=list('ABCD')))
>> testdf.cache()
>> testdf.count()
>> testdf.write.save("/Users/gouravsengupta/Development/spark/
>> sparkdata/test2")
>> spark.read.load("/Users/gouravsengupta/Development/spark/
>> sparkdata/test2").count()
>> ==
>>
>>
>> ERROR I (in above code):
>> ERROR in line: testdf.write.save("/Users/gour
>> avsengupta/Development/spark/sparkdata/test2")
>> This line does not fail or report any error. But when I am looking at the
>> stage in spark Application UI the error reported for one of the slave node
>> which is not in the same system as the master node is mentioned below. The
>> writing on the slave node which is in the same physical system as the
>> Master happens correctly. (NOTE: slave node basically the worker and master
>> node the driver)
>> 
>> --
>>
>> 0 (TID 41). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
>> 'attempt_20170731001928_0002_m_06_0' to 
>> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_06
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
>> attempt_20170731001928_0002_m_06_0: Committed
>> 17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 
>> 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
>> 'attempt_20170731001928_0002_m_28_0' to 
>> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_28
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
>> attempt_20170731001928_0002_m_28_0: Committed
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
>> 'attempt_20170731001928_0002_m_21_0' to 
>> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_21
>> 17/07/31 00:19:29 

Re: ALSModel.load not working on pyspark 2.1.0

2017-07-31 Thread Cristian Garcia
Thanks Irving,

The problem was that I was using spark in cluster mode and had to resort to
HDFS to properly save/load the model.

On Mon, Jul 31, 2017 at 9:09 AM Irving Duran  wrote:

> I think the problem is because you are calling "model2 =
> ALSModel.load("/models/als")" instead of "model2 = 
> *model*.load("/models/als")".
> See my working sample below.
>
> >>> model.save('/models/als.test')
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> >>> model2 = model.load('/models/als.test')
> >>> model
> ALS_4324a1082d889dd1f0e4
> >>> model2
> ALS_4324a1082d889dd1f0e4
>
>
> Thank You,
>
> Irving Duran
>
> On Sat, Jul 29, 2017 at 2:57 PM, Cristian Garcia 
> wrote:
>
>> This code is not working:
>>
>> 
>> from pyspark.ml.evaluation import RegressionEvaluator
>> from pyspark.ml.recommendation import ALS, ALSModel
>> from pyspark.sql import Row
>>
>> als = ALS(maxIter=10, regParam=0.01, userCol="user_id",
>> itemCol="movie_id", ratingCol="rating")
>> model = als.fit(training)
>>
>> model.save("/models/als")
>>
>> model2 = ALSModel.load("/models/als") # <-- error here
>> =
>>
>>
>>
>> Gives rise to this error:
>> =
>>
>> ---Py4JJavaError
>>  Traceback (most recent call 
>> last) in ()> 1 m2 = 
>> ALSModel.load("/models/als")
>> /usr/local/spark/python/pyspark/ml/util.py in load(cls, path)251 def 
>> load(cls, path):252 """Reads an ML instance from the input path, 
>> a shortcut of `read().load(path)`."""--> 253 return 
>> cls.read().load(path)254 255
>> /usr/local/spark/python/pyspark/ml/util.py in load(self, path)192
>>  if not isinstance(path, basestring):193 raise 
>> TypeError("path should be a basestring, got type %s" % type(path))--> 194
>>  java_obj = self._jread.load(path)195 if not 
>> hasattr(self._clazz, "_from_java"):196 raise 
>> NotImplementedError("This Java ML type cannot be loaded into Python 
>> currently: %r"
>> /usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
>> __call__(self, *args)   1131 answer = 
>> self.gateway_client.send_command(command)   1132 return_value = 
>> get_return_value(-> 1133 answer, self.gateway_client, 
>> self.target_id, self.name)   11341135 for temp_arg in temp_args:
>> /usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 
>> def deco(*a, **kw): 62 try:---> 63 return f(*a, 
>> **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 
>> s = e.java_exception.toString()
>> /usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
>> get_return_value(answer, gateway_client, target_id, name)317 
>> raise Py4JJavaError(318 "An error occurred while 
>> calling {0}{1}{2}.\n".--> 319 format(target_id, ".", 
>> name), value)320 else:321 raise 
>> Py4JError(
>> Py4JJavaError: An error occurred while calling o337.load.
>> : java.lang.UnsupportedOperationException: empty collection
>>  at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1370)
>>  at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>  at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>  at org.apache.spark.rdd.RDD.first(RDD.scala:1367)
>>  at 
>> org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:379)
>>  at 
>> org.apache.spark.ml.recommendation.ALSModel$ALSModelReader.load(ALS.scala:317)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>  at py4j.Gateway.invoke(Gateway.java:280)
>>  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>  at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>  at py4j.GatewayConnection.run(GatewayConnection.java:214)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>> =
>>
>
>


Re: ALSModel.load not working on pyspark 2.1.0

2017-07-31 Thread Irving Duran
I think the problem is because you are calling "model2 =
ALSModel.load("/models/als")" instead of "model2 =
*model*.load("/models/als")".
See my working sample below.

>>> model.save('/models/als.test')
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
>>> model2 = model.load('/models/als.test')
>>> model
ALS_4324a1082d889dd1f0e4
>>> model2
ALS_4324a1082d889dd1f0e4


Thank You,

Irving Duran

On Sat, Jul 29, 2017 at 2:57 PM, Cristian Garcia 
wrote:

> This code is not working:
>
> 
> from pyspark.ml.evaluation import RegressionEvaluator
> from pyspark.ml.recommendation import ALS, ALSModel
> from pyspark.sql import Row
>
> als = ALS(maxIter=10, regParam=0.01, userCol="user_id",
> itemCol="movie_id", ratingCol="rating")
> model = als.fit(training)
>
> model.save("/models/als")
>
> model2 = ALSModel.load("/models/als") # <-- error here
> =
>
>
>
> Gives rise to this error:
> =
>
> ---Py4JJavaError
>  Traceback (most recent call 
> last) in ()> 1 m2 = 
> ALSModel.load("/models/als")
> /usr/local/spark/python/pyspark/ml/util.py in load(cls, path)251 def 
> load(cls, path):252 """Reads an ML instance from the input path, 
> a shortcut of `read().load(path)`."""--> 253 return 
> cls.read().load(path)254 255
> /usr/local/spark/python/pyspark/ml/util.py in load(self, path)192 
> if not isinstance(path, basestring):193 raise TypeError("path 
> should be a basestring, got type %s" % type(path))--> 194 java_obj = 
> self._jread.load(path)195 if not hasattr(self._clazz, 
> "_from_java"):196 raise NotImplementedError("This Java ML 
> type cannot be loaded into Python currently: %r"
> /usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
> __call__(self, *args)   1131 answer = 
> self.gateway_client.send_command(command)   1132 return_value = 
> get_return_value(-> 1133 answer, self.gateway_client, 
> self.target_id, self.name)   11341135 for temp_arg in temp_args:
> /usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def 
> deco(*a, **kw): 62 try:---> 63 return f(*a, **kw) 
> 64 except py4j.protocol.Py4JJavaError as e: 65 s = 
> e.java_exception.toString()
> /usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)317  
>raise Py4JJavaError(318 "An error occurred while 
> calling {0}{1}{2}.\n".--> 319 format(target_id, ".", 
> name), value)320 else:321 raise Py4JError(
> Py4JJavaError: An error occurred while calling o337.load.
> : java.lang.UnsupportedOperationException: empty collection
>   at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1370)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.first(RDD.scala:1367)
>   at 
> org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:379)
>   at 
> org.apache.spark.ml.recommendation.ALSModel$ALSModelReader.load(ALS.scala:317)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at py4j.Gateway.invoke(Gateway.java:280)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at py4j.GatewayConnection.run(GatewayConnection.java:214)
>   at java.lang.Thread.run(Thread.java:748)
>
> =
>


transactional data in sparksql

2017-07-31 Thread luohui20001
hello guys: I have some transactional data as attached file 1.txt. A 
sequence of a single operation 1 followed by a few operations 0 is a transation 
here. The transcations, which sum(Amount) of operation 0 is less than the 
sum(Amount) of operation 1, need to be found out. 
 There are serveral questions here:1. To deal with this kind of 
transaction, What is the most sensible way?Does UDAF help? Or does sparksql 
provide transactional support? I remembered that hive has some kind of support 
towards transaction, like 
https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions#HiveTransactions-GrammarChanges.
2.The data has been sorted by timestamp. How about to get those transactions 
with a time period ,like 24hours.
 thank you.



 

ThanksBest regards!
San.Luo
|Account|Operation| Timestamp| Amount|
+---+-+--+---+
| 13|1|1400017208| 674.33|
| 13|0|1400102650|  73.86|
| 13|1|1400130576|1155.48|
| 13|1|1400165378|  96.04|
| 13|0|1400245724| 173.84|
| 13|0|1400258007| 852.29|
| 13|1|1400265065|2085.32|
| 13|0|1400329127|  429.3|
| 13|0|1400383007|  611.2|
| 13|1|1400428342|1629.76|
| 13|0|1400457645| 490.55|
| 13|1|1400516552| 369.54|
| 13|1|1400618678|1316.05|
| 13|0|1400655615| 573.71|
| 13|0|1400696930| 877.16|
| 13|0|1400732011| 105.51|
| 13|0|1400751612|1512.23|
| 13|0|1400761888| 414.36|
| 13|0|1400814042|  36.52|
| 13|0|1400831895| 611.15|
+---+-+--+---+
only showing top 20 rows

SQL£ºselect * from r where Account=13 limit 20
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: can I do spark-submit --jars [s3://bucket/folder/jar_file]? or --jars

2017-07-31 Thread 周康
When using spark-submit, the application jar along with any jars included
with the --jars option will be automatically transferred to the cluster.
URLs supplied after --jars must be separated by commas. That list is
included on the driver and executor classpaths. Directory expansion does
not work with --jars.

Spark uses the following URL scheme to allow different strategies for
disseminating jars:

   - *file:* - Absolute paths and file:/ URIs are served by the driver’s
   HTTP file server, and every executor pulls the file from the driver HTTP
   server.
   - *hdfs:*, *http:*, *https:*, *ftp:* - these pull down files and JARs
   from the URI as expected
   - *local:* - a URI starting with local:/ is expected to exist as a local
   file on each worker node. This means that no network IO will be incurred,
   and works well for large files/JARs that are pushed to each worker, or
   shared via NFS, GlusterFS, etc.

>From the documentation,i wonder s3 url format may not have been support.

2017-07-29 4:52 GMT+08:00 Richard Xin :

> Can we add extra library (jars on S3) to spark-submit?
> if yes, how? such as --jars, extraClassPath, extraLibPath
> Thanks,
> Richard
>


Re: Spark parquet file read problem !

2017-07-31 Thread serkan taş
Thank you very much.

Schema merge fixed the structure problem but the fields with same name but 
different type still is an issue i should work on.

Android için Outlook uygulamasını edinin



Kimden: 萝卜丝炒饭
Gönderildi: 31 Temmuz Pazartesi 11:16
Konu: Re: Spark parquet file read problem !
Kime: serkan taş, pandees waran
Bilgi: user@spark.apache.org


please add the schemaMerge to the option.

---Original---
From: "serkan taş"
Date: 2017/7/31 13:54:14
To: "pandees waran";
Cc: "user@spark.apache.org";
Subject: Re: Spark parquet file read problem !

I checked and realised that the schema of the files different with some missing 
fields and some fields with same  name but different type.

How may i overcome the issue?

Android için Outlook uygulamasını edinin

From: pandees waran 
Sent: Sunday, July 30, 2017 7:12:55 PM
To: serkan taş
Cc: user@spark.apache.org
Subject: Re: Spark parquet file read problem !

I have encountered the similar error when the schema / datatypes are 
conflicting in those 2 parquet files. Are you sure that the 2 individual files 
are in the same structure with similar datatypes. If not you have to fix this 
by enforcing the default values for the missing values to make the structure 
and data types identical.

Sent from my iPhone

On Jul 30, 2017, at 8:11 AM, serkan taş 
> wrote:

Hi,

I have a problem while reading parquet files located in hdfs.


If i read the files individually nothing wrong and i can get the file content.

parquetFile = 
spark.read.parquet(“hdfs://xxx/20170719/part-0-3a9c226f-4fef-44b8-996b-115a2408c746.snappy.parquet")

and

parquetFile = 
spark.read.parquet(“hdfs://xxx/20170719/part-0-17262890-a56c-42e2-b299-2b10354da184.snappy.parquet”)


But when i try to read the folder,

This is how i read the folder :

parquetFile = spark.read.parquet(“hdfs://xxx/20170719/“)


than i get the below exception :

Note : Only these two files are in the folder. Please find the parquet files 
attached.


Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 
5, localhost, executor driver): org.apache.parquet.io.ParquetDecodingException: 
Can not read value at 1 in block 0 in file 
hdfs://xxx/20170719/part-0-3a9c226f-4fef-44b8-996b-115a2408c746.snappy.parquet
   at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
   at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
   at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
   at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
   at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:166)
   at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
   at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
   at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
   at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
   at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
   at 
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
   at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
   at 
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.MutableAny cannot be cast to 
org.apache.spark.sql.catalyst.expressions.MutableLong
   at 
org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.setLong(SpecificInternalRow.scala:295)
   at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.setLong(ParquetRowConverter.scala:164)
   at 
org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addLong(ParquetRowConverter.scala:86)
   at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:274)
   at 

Re: SPARK Issue in Standalone cluster

2017-07-31 Thread Riccardo Ferrari
Hi Gourav,

The issue here is the location where you're trying to write/read from :
/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
When dealing with clusters all the paths and resources should be available
to all executors (and driver), and that is reason why you generally use
HDFS, S3, NFS or any shared file system.

Spark assumes your data is generally available to all nodes and does not
tries to pick up the data from a selected node, it rather tries to
write/read in parallel from the executor nodes. Also given its control
logic there is no way (read. you should not care) to know what executor is
doing what task.

Hope it helps,
Riccardo

On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta 
wrote:

> Hi,
>
> I am working by creating a native SPARK standalone cluster (
> https://spark.apache.org/docs/2.2.0/spark-standalone.html)
>
> Therefore I  do not have a HDFS.
>
>
> EXERCISE:
> Its the most fundamental and simple exercise. Create a sample SPARK
> dataframe and then write it to a location and then read it back.
>
> SETTINGS:
> So after I have installed SPARK in two physical systems with the same:
> 1. SPARK version,
> 2. JAVA version,
> 3. PYTHON_PATH
> 4. SPARK_HOME
> 5. PYSPARK_PYTHON
> the user in both the systems is the root user therefore there are no
> permission issues anywhere.
>
> I am able to start:
> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
> computers)
>
> After that I can see in the spark UI (at port 8080) two workers.
>
>
> CODE:
> Then I run the following code:
>
> ==
> import findspark
> import os
> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/
> Development/spark/spark/'
> findspark.init()
> import pyspark
> from pyspark.sql import SparkSession
> spark = (SparkSession.builder
> .master("spark://mastersystem.local:7077")
> .appName("gouravtest")
> .enableHiveSupport()
> .getOrCreate())
> import pandas, numpy
> testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(1,
> 4), columns=list('ABCD')))
> testdf.cache()
> testdf.count()
> testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
> spark.read.load("/Users/gouravsengupta/Development/
> spark/sparkdata/test2").count()
> ==
>
>
> ERROR I (in above code):
> ERROR in line: testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
> This line does not fail or report any error. But when I am looking at the
> stage in spark Application UI the error reported for one of the slave node
> which is not in the same system as the master node is mentioned below. The
> writing on the slave node which is in the same physical system as the
> Master happens correctly. (NOTE: slave node basically the worker and master
> node the driver)
> 
> --
>
> 0 (TID 41). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_06_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_06
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_06_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_28_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_28
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_28_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_21_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_21
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_21_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 
> 2103 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_18_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_18
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_18_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 

Re: Spark parquet file read problem !

2017-07-31 Thread ??????????
please add the schemaMerge to the option.


 
---Original---
From: "serkan ta?0?6"
Date: 2017/7/31 13:54:14
To: "pandees waran";
Cc: "user@spark.apache.org";
Subject: Re: Spark parquet file read problem !


  I checked and realised that the schema of the files different with some 
missing fields and some fields with same  name but different type.
 
 
  How may i overcome the issue?
 
 
   Android i?0?4in Outlook uygulamas?0?3n?0?3 edinin
 
 
 
 From: pandees waran 
 Sent: Sunday, July 30, 2017 7:12:55 PM
 To: serkan ta?0?6
 Cc: user@spark.apache.org
 Subject: Re: Spark parquet file read problem !  
 
  I have encountered the similar error when the schema / datatypes are 
conflicting in those 2 parquet files. Are you sure that the 2 individual files 
are in the same structure with similar datatypes. If not you have to fix this 
by enforcing the default values  for the missing values to make the structure 
and data types identical.
 
 Sent from my iPhone
 
 On Jul 30, 2017, at 8:11 AM, serkan ta?0?6  wrote:
 
 
   Hi, 
 
 I have a problem while reading parquet files located in hdfs.
 
 
 
 
 If i read the files individually nothing wrong and i can get the file content. 
 
 
   parquetFile = 
spark.read.parquet(??hdfs://xxx/20170719/part-0-3a9c226f-4fef-44b8-996b-115a2408c746.snappy.parquet")
  
 
  and
  
 
  parquetFile = 
spark.read.parquet(??hdfs://xxx/20170719/part-0-17262890-a56c-42e2-b299-2b10354da184.snappy.parquet??)
 
  
 
  
 
 
 But when i try to read the folder, 
 
 
 This is how i read the folder : 
 
 
 parquetFile = spark.read.parquet(??hdfs://xxx/20170719/??)
 
 
 
 
 than i get the below exception :
 
 
 Note : Only these two files are in the folder. Please find the parquet files 
attached.
 
 
 
 
   Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
  : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 
(TID 5, localhost, executor driver):  
org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in 
block 0 in file 
hdfs://xxx/20170719/part-0-3a9c226f-4fef-44b8-996b-115a2408c746.snappy.parquet
 at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
 at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
 at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:166)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
 at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
 at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893)
 at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
 at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
 at 
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
 at 
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
  Caused by: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.MutableAny cannot be cast to 
org.apache.spark.sql.catalyst.expressions.MutableLong
 at 
org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.setLong(SpecificInternalRow.scala:295)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.setLong(ParquetRowConverter.scala:164)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addLong(ParquetRowConverter.scala:86)
 at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:274)
 at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:371)
 at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
 at 

Running several spark actions in parallel

2017-07-31 Thread Guy Harmach
Hi,

I need to run a batch job written in Java that executes several SQL statements 
on different hive tables, and then process each partition result set in a 
foreachPartition() operator.
I'd like to run these actions in parallel.
I saw there are two approaches for achieving this:

1.   Using the java.util.concurrent package e.g. Future/ForkJoinPool

2.Transforming my Dataset to JavaRDD and using the 
foreachPartitionAsync() on the RDD.

Can you please recommend the best way to achieve this using one of these 
options, or suggest a better approach?

Thanks, Guy
This message and the information contained herein is proprietary and 
confidential and subject to the Amdocs policy statement,

you may review at https://www.amdocs.com/about/email-disclaimer