Re: java.lang.UnsupportedOperationException: Cannot evaluate expression: fun_nm(input[0, string, true])

2016-08-18 Thread trsell
Hi,

The stack trace suggests you're doing a join as well? and it's python..

I wonder if you're seeing this?
https://issues.apache.org/jira/browse/SPARK-17100

Are you using spark 2.0.0?

Tim


On Tue, 16 Aug 2016 at 16:58 Sumit Khanna  wrote:

> This is just the stacktrace,but where is it you ccalling the UDF?
>
> Regards,
> Sumit
>
> On 16-Aug-2016 2:20 pm, "pseudo oduesp"  wrote:
>
>> hi,
>> i cretae new columns with udf  after i try to filter this columns :
>> i get this error why ?
>>
>> : java.lang.UnsupportedOperationException: Cannot evaluate expression:
>> fun_nm(input[0, string, true])
>> at
>> org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:221)
>> at
>> org.apache.spark.sql.execution.python.PythonUDF.eval(PythonUDF.scala:27)
>> at
>> org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:408)
>> at
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(Optimizer.scala:1234)
>> at
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$55.apply(Optimizer.scala:1248)
>> at
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$55.apply(Optimizer.scala:1248)
>> at
>> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
>> at scala.collection.immutable.List.exists(List.scala:84)
>> at
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(Optimizer.scala:1248)
>> at
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$30.applyOrElse(Optimizer.scala:1264)
>> at
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$30.applyOrElse(Optimizer.scala:1262)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
>> at
>> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:268)
>> at
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(Optimizer.scala:1262)
>> at
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(Optimizer.scala:1225)
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>> at
>> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
>> at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>> at
>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>> at
>> scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
>> at
>> 

Spark streaming

2016-08-18 Thread Diwakar Dhanuskodi
Hi,

Is there a way to  specify in  createDirectStream to receive only last 'n' 
offsets of a specific topic and partition. I don't want to filter out in 
foreachRDD.  


Sent from Samsung Mobile.

Re: [Spark2] Error writing "complex" type to CSV

2016-08-18 Thread Hyukjin Kwon
Ah, BTW, there is an issue, SPARK-16216, about printing dates and
timestamps here. So please ignore the integer values for dates

2016-08-19 9:54 GMT+09:00 Hyukjin Kwon :

> Ah, sorry, I should have read this carefully. Do you mind if I ask your
> codes to test?
>
> I would like to reproduce.
>
>
> I just tested this by myself but I couldn't reproduce as below (is this
> what your doing, right?):
>
> case class ClassData(a: String, b: Date)
>
> val ds: Dataset[ClassData] = Seq(
>   ("a", Date.valueOf("1990-12-13")),
>   ("a", Date.valueOf("1990-12-13")),
>   ("a", Date.valueOf("1990-12-13"))
> ).toDF("a", "b").as[ClassData]
> ds.write.csv("/tmp/data.csv")
> spark.read.csv("/tmp/data.csv").show()
>
> prints as below:
>
> +---++
> |_c0| _c1|
> +---++
> |  a|7651|
> |  a|7651|
> |  a|7651|
> +---++
>
> ​
>
> 2016-08-19 9:27 GMT+09:00 Efe Selcuk :
>
>> Thanks for the response. The problem with that thought is that I don't
>> think I'm dealing with a complex nested type. It's just a dataset where
>> every record is a case class with only simple types as fields, strings and
>> dates. There's no nesting.
>>
>> That's what confuses me about how it's interpreting the schema. The
>> schema seems to be one complex field rather than a bunch of simple fields.
>>
>> On Thu, Aug 18, 2016, 5:07 PM Hyukjin Kwon  wrote:
>>
>>> Hi Efe,
>>>
>>> If my understanding is correct, supporting to write/read complex types
>>> is not supported because CSV format can't represent the nested types in its
>>> own format.
>>>
>>> I guess supporting them in writing in external CSV is rather a bug.
>>>
>>> I think it'd be great if we can write and read back CSV in its own
>>> format but I guess we can't.
>>>
>>> Thanks!
>>>
>>> On 19 Aug 2016 6:33 a.m., "Efe Selcuk"  wrote:
>>>
 We have an application working in Spark 1.6. It uses the databricks csv
 library for the output format when writing out.

 I'm attempting an upgrade to Spark 2. When writing with both the native
 DataFrameWriter#csv() method and with first specifying the
 "com.databricks.spark.csv" format (I suspect underlying format is the same
 but I don't know how to verify), I get the following error:

 java.lang.UnsupportedOperationException: CSV data source does not
 support struct<[bunch of field names and types]> data type

 There are 20 fields, mostly plain strings with a couple of dates. The
 source object is a Dataset[T] where T is a case class with various fields
 The line just looks like: someDataset.write.csv(outputPath)

 Googling returned this fairly recent pull request:
 https://mail-archives.apache.org/mod_mbox/spark-
 commits/201605.mbox/%3C65d35a72bd05483392857098a2635cc2@git.
 apache.org%3E

 If I'm reading that correctly, the schema shows that each record has
 one field of this complex struct type? And the validation thinks it's
 something that it can't serialize. I would expect the schema to have a
 bunch of fields in it matching the case class, so maybe there's something
 I'm misunderstanding.

 Efe

>>>
>


Re: [Spark2] Error writing "complex" type to CSV

2016-08-18 Thread Hyukjin Kwon
Ah, sorry, I should have read this carefully. Do you mind if I ask your
codes to test?

I would like to reproduce.


I just tested this by myself but I couldn't reproduce as below (is this
what your doing, right?):

case class ClassData(a: String, b: Date)

val ds: Dataset[ClassData] = Seq(
  ("a", Date.valueOf("1990-12-13")),
  ("a", Date.valueOf("1990-12-13")),
  ("a", Date.valueOf("1990-12-13"))
).toDF("a", "b").as[ClassData]
ds.write.csv("/tmp/data.csv")
spark.read.csv("/tmp/data.csv").show()

prints as below:

+---++
|_c0| _c1|
+---++
|  a|7651|
|  a|7651|
|  a|7651|
+---++

​

2016-08-19 9:27 GMT+09:00 Efe Selcuk :

> Thanks for the response. The problem with that thought is that I don't
> think I'm dealing with a complex nested type. It's just a dataset where
> every record is a case class with only simple types as fields, strings and
> dates. There's no nesting.
>
> That's what confuses me about how it's interpreting the schema. The schema
> seems to be one complex field rather than a bunch of simple fields.
>
> On Thu, Aug 18, 2016, 5:07 PM Hyukjin Kwon  wrote:
>
>> Hi Efe,
>>
>> If my understanding is correct, supporting to write/read complex types is
>> not supported because CSV format can't represent the nested types in its
>> own format.
>>
>> I guess supporting them in writing in external CSV is rather a bug.
>>
>> I think it'd be great if we can write and read back CSV in its own format
>> but I guess we can't.
>>
>> Thanks!
>>
>> On 19 Aug 2016 6:33 a.m., "Efe Selcuk"  wrote:
>>
>>> We have an application working in Spark 1.6. It uses the databricks csv
>>> library for the output format when writing out.
>>>
>>> I'm attempting an upgrade to Spark 2. When writing with both the native
>>> DataFrameWriter#csv() method and with first specifying the
>>> "com.databricks.spark.csv" format (I suspect underlying format is the same
>>> but I don't know how to verify), I get the following error:
>>>
>>> java.lang.UnsupportedOperationException: CSV data source does not
>>> support struct<[bunch of field names and types]> data type
>>>
>>> There are 20 fields, mostly plain strings with a couple of dates. The
>>> source object is a Dataset[T] where T is a case class with various fields
>>> The line just looks like: someDataset.write.csv(outputPath)
>>>
>>> Googling returned this fairly recent pull request: https://mail-
>>> archives.apache.org/mod_mbox/spark-commits/201605.mbox/%
>>> 3c65d35a72bd05483392857098a2635...@git.apache.org%3E
>>>
>>> If I'm reading that correctly, the schema shows that each record has one
>>> field of this complex struct type? And the validation thinks it's something
>>> that it can't serialize. I would expect the schema to have a bunch of
>>> fields in it matching the case class, so maybe there's something I'm
>>> misunderstanding.
>>>
>>> Efe
>>>
>>


Re: [Spark2] Error writing "complex" type to CSV

2016-08-18 Thread Efe Selcuk
Thanks for the response. The problem with that thought is that I don't
think I'm dealing with a complex nested type. It's just a dataset where
every record is a case class with only simple types as fields, strings and
dates. There's no nesting.

That's what confuses me about how it's interpreting the schema. The schema
seems to be one complex field rather than a bunch of simple fields.

On Thu, Aug 18, 2016, 5:07 PM Hyukjin Kwon  wrote:

> Hi Efe,
>
> If my understanding is correct, supporting to write/read complex types is
> not supported because CSV format can't represent the nested types in its
> own format.
>
> I guess supporting them in writing in external CSV is rather a bug.
>
> I think it'd be great if we can write and read back CSV in its own format
> but I guess we can't.
>
> Thanks!
>
> On 19 Aug 2016 6:33 a.m., "Efe Selcuk"  wrote:
>
>> We have an application working in Spark 1.6. It uses the databricks csv
>> library for the output format when writing out.
>>
>> I'm attempting an upgrade to Spark 2. When writing with both the native
>> DataFrameWriter#csv() method and with first specifying the
>> "com.databricks.spark.csv" format (I suspect underlying format is the same
>> but I don't know how to verify), I get the following error:
>>
>> java.lang.UnsupportedOperationException: CSV data source does not support
>> struct<[bunch of field names and types]> data type
>>
>> There are 20 fields, mostly plain strings with a couple of dates. The
>> source object is a Dataset[T] where T is a case class with various fields
>> The line just looks like: someDataset.write.csv(outputPath)
>>
>> Googling returned this fairly recent pull request:
>> https://mail-archives.apache.org/mod_mbox/spark-commits/201605.mbox/%3c65d35a72bd05483392857098a2635...@git.apache.org%3E
>>
>> If I'm reading that correctly, the schema shows that each record has one
>> field of this complex struct type? And the validation thinks it's something
>> that it can't serialize. I would expect the schema to have a bunch of
>> fields in it matching the case class, so maybe there's something I'm
>> misunderstanding.
>>
>> Efe
>>
>


Re: Spark 2.0.0 OOM error at beginning of RDD map on AWS

2016-08-18 Thread Arun Luthra
This might be caused by a few large Map objects that Spark is trying to
serialize. These are not broadcast variables or anything, they're just
regular objects.

Would it help if I further indexed these maps into a two-level Map i.e.
Map[String, Map[String, Int]] ? Or would this still count against me?

What if I manually split them up into numerous Map variables?

On Mon, Aug 15, 2016 at 2:12 PM, Arun Luthra  wrote:

> I got this OOM error in Spark local mode. The error seems to have been at
> the start of a stage (all of the stages on the UI showed as complete, there
> were more stages to do but had not showed up on the UI yet).
>
> There appears to be ~100G of free memory at the time of the error.
>
> Spark 2.0.0
> 200G driver memory
> local[30]
> 8 /mntX/tmp directories for spark.local.dir
> "spark.sql.shuffle.partitions", "500"
> "spark.driver.maxResultSize","500"
> "spark.default.parallelism", "1000"
>
> The line number for the error is at an RDD map operation where there are
> some potentially large Map objects that are going to be accessed by each
> record. Does it matter if they are broadcast variables or not? I imagine
> not because its in local mode they should be available in memory to every
> executor/core.
>
> Possibly related:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> ClosureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html
>
> Exception in thread "main" java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:
> 123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(
> ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(
> ObjectOutputStream.java:1877)
> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(
> ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at org.apache.spark.serializer.JavaSerializationStream.
> writeObject(JavaSerializer.scala:43)
> at org.apache.spark.serializer.JavaSerializerInstance.
> serialize(JavaSerializer.scala:100)
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:295)
> at org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> at org.apache.spark.rdd.RDD.map(RDD.scala:365)
> at abc.Abc$.main(abc.scala:395)
> at abc.Abc.main(abc.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>


Re: pyspark unable to create UDF: java.lang.RuntimeException: org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a directory: /tmp tmp

2016-08-18 Thread Felix Cheung
I think lots of components expect to have read/write permission to a /tmp 
directory on HDFS.

Glad it works out!


_
From: Andy Davidson 
>
Sent: Thursday, August 18, 2016 5:12 PM
Subject: Re: pyspark unable to create UDF: java.lang.RuntimeException: 
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a 
directory: /tmp tmp
To: Felix Cheung >, 
user @spark >



NICE CATCH!!! Many thanks.


I spent all day on this bug


The error msg report /tmp. I did not think to look on hdfs.


[ec2-user@ip-172-31-22-140 notebooks]$ hadoop fs -ls hdfs:///tmp/

Found 1 items

-rw-r--r--   3 ec2-user supergroup418 2016-04-13 22:49 hdfs:///tmp

[ec2-user@ip-172-31-22-140 notebooks]$


I have no idea how hdfs:///tmp got created. I deleted it.

This causes a bunch of exceptions. These exceptions has useful message. I was 
able to fix the problem as follows

$ hadoop fs -rmr hdfs:///tmp

Now I run the notebook. It creates hdfs:///tmp/hive but the permission are wrong

$ hadoop fs -chmod 777 hdfs:///tmp/hive


From: Felix Cheung >
Date: Thursday, August 18, 2016 at 3:37 PM
To: Andrew Davidson 
>, "user 
@spark" >
Subject: Re: pyspark unable to create UDF: java.lang.RuntimeException: 
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a 
directory: /tmp tmp

Do you have a file called tmp at / on HDFS?





On Thu, Aug 18, 2016 at 2:57 PM -0700, "Andy Davidson" 
> wrote:

For unknown reason I can not create UDF when I run the attached notebook on my 
cluster. I get the following error


Py4JJavaError: An error occurred while calling 
None.org.apache.spark.sql.hive.HiveContext.: java.lang.RuntimeException: 
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a 
directory: /tmp tmp

The notebook runs fine on my Mac

In general I am able to run non UDF spark code with out any trouble

I start the notebook server as the user “ec2-user" and uses master URL
spark://ec2-51-215-120-63.us-west-1.compute.amazonaws.com:6066



I found the following message in the notebook server log file. I have log level 
set to warn


16/08/18 21:38:45 WARN ObjectStore: Version information not found in metastore. 
hive.metastore.schema.verification is not enabled so recording the schema 
version 1.2.0

16/08/18 21:38:45 WARN ObjectStore: Failed to get database default, returning 
NoSuchObjectException


The cluster was originally created using spark-1.6.1-bin-hadoop2.6/ec2/spark-ec2



#from pyspark.sql import SQLContext, HiveContext

#sqlContext = SQLContext(sc)

​

#from pyspark.sql import DataFrame

#from pyspark.sql import functions

​

from pyspark.sql.types import StringType

from pyspark.sql.functions import udf

​

print("spark version: {}".format(sc.version))

​

import sys

print("python version: {}".format(sys.version))

spark version: 1.6.1python version: 3.4.3 (default, Apr  1 2015, 18:10:40) [GCC 
4.8.2 20140120 (Red Hat 4.8.2-16)]



# functions.lower() raises # py4j.Py4JException: Method lower([class 
java.lang.String]) does not exist# work around define a UDFtoLowerUDFRetType = 
StringType()#toLowerUDF = udf(lambda s : s.lower(), 
toLowerUDFRetType)toLowerUDF = udf(lambda s : s.lower(), StringType())

You must build Spark with Hive. Export 'SPARK_HIVE=true' and run build/sbt 
assembly

Py4JJavaErrorTraceback (most recent call last) in 
()  4 toLowerUDFRetType = StringType()  5 #toLowerUDF = 
udf(lambda s : s.lower(), toLowerUDFRetType)> 6 toLowerUDF = udf(lambda s : 
s.lower(), StringType())/root/spark/python/pyspark/sql/functions.py in udf(f, 
returnType)   1595 [Row(slen=5), Row(slen=3)]   1596 """-> 1597 
return UserDefinedFunction(f, returnType)   15981599 blacklist = ['map', 
'since', 'ignore_unicode_prefix']/root/spark/python/pyspark/sql/functions.py in 
__init__(self, func, returnType, name)   1556 self.returnType = 
returnType   1557 self._broadcast = None-> 1558 self._judf = 
self._create_judf(name)   15591560 def _create_judf(self, 
name):/root/spark/python/pyspark/sql/functions.py in _create_judf(self, name)   
1567 pickled_command, broadcast_vars, env, includes = 
_prepare_for_python_RDD(sc, command, self)   1568 ctx = 
SQLContext.getOrCreate(sc)-> 1569 jdt = 
ctx._ssql_ctx.parseDataType(self.returnType.json())   1570 if name is 
None:   1571 name = f.__name__ if hasattr(f, '__name__') else 
f.__class__.__name__/root/spark/python/pyspark/sql/context.py in 
_ssql_ctx(self)681 try:682 if not 

Re: pyspark unable to create UDF: java.lang.RuntimeException: org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a directory: /tmp tmp

2016-08-18 Thread Andy Davidson
NICE CATCH!!! Many thanks.



I spent all day on this bug



The error msg report /tmp. I did not think to look on hdfs.



[ec2-user@ip-172-31-22-140 notebooks]$ hadoop fs -ls hdfs:///tmp/

Found 1 items

-rw-r--r--   3 ec2-user supergroup418 2016-04-13 22:49 hdfs:///tmp

[ec2-user@ip-172-31-22-140 notebooks]$



I have no idea how hdfs:///tmp got created. I deleted it.

This causes a bunch of exceptions. These exceptions has useful message. I
was able to fix the problem as follows

$ hadoop fs -rmr hdfs:///tmp

Now I run the notebook. It creates hdfs:///tmp/hive but the permission are
wrong

$ hadoop fs -chmod 777 hdfs:///tmp/hive


From:  Felix Cheung 
Date:  Thursday, August 18, 2016 at 3:37 PM
To:  Andrew Davidson , "user @spark"

Subject:  Re: pyspark unable to create UDF: java.lang.RuntimeException:
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a
directory: /tmp tmp

> Do you have a file called tmp at / on HDFS?
> 
> 
> 
> 
> 
> On Thu, Aug 18, 2016 at 2:57 PM -0700, "Andy Davidson"
>  wrote:
> 
> For unknown reason I can not create UDF when I run the attached notebook on my
> cluster. I get the following error
> 
> Py4JJavaError: An error occurred while calling
> None.org.apache.spark.sql.hive.HiveContext.
> : java.lang.RuntimeException: org.apache.hadoop.fs.FileAlreadyExistsException:
> Parent path is not a directory: /tmp tmp
> 
> The notebook runs fine on my Mac
> 
> In general I am able to run non UDF spark code with out any trouble
> 
> I start the notebook server as the user “ec2-user" and uses master URL
> spark://ec2-51-215-120-63.us-west-1.compute.amazonaws.com:6066
> 
> 
> I found the following message in the notebook server log file. I have log
> level set to warn
> 
> 16/08/18 21:38:45 WARN ObjectStore: Version information not found in
> metastore. hive.metastore.schema.verification is not enabled so recording the
> schema version 1.2.0
> 
> 16/08/18 21:38:45 WARN ObjectStore: Failed to get database default, returning
> NoSuchObjectException
> 
> 
> 
> The cluster was originally created using
> spark-1.6.1-bin-hadoop2.6/ec2/spark-ec2
> 
> 
> #from pyspark.sql import SQLContext, HiveContext
> #sqlContext = SQLContext(sc)
> ​
> #from pyspark.sql import DataFrame
> #from pyspark.sql import functions
> ​
> from pyspark.sql.types import StringType
> from pyspark.sql.functions import udf
> ​
> print("spark version: {}".format(sc.version))
> ​
> import sys
> print("python version: {}".format(sys.version))
> spark version: 1.6.1
> python version: 3.4.3 (default, Apr  1 2015, 18:10:40)
> [GCC 4.8.2 20140120 (Red Hat 4.8.2-16)]
> 
> 
> 
> # functions.lower() raises # py4j.Py4JException: Method lower([class
> java.lang.String]) does not exist# work around define a UDFtoLowerUDFRetType =
> StringType()#toLowerUDF = udf(lambda s : s.lower(),
> toLowerUDFRetType)toLowerUDF = udf(lambda s : s.lower(), StringType())
> You must build Spark with Hive. Export 'SPARK_HIVE=true' and run build/sbt
> assembly
> Py4JJavaErrorTraceback (most recent call last)
>  in ()  4 toLowerUDFRetType =
> StringType()  5 #toLowerUDF = udf(lambda s : s.lower(),
> toLowerUDFRetType)> 6 toLowerUDF = udf(lambda s : s.lower(),
> StringType())/root/spark/python/pyspark/sql/functions.py in udf(f, returnType)
> 1595 [Row(slen=5), Row(slen=3)]   1596 """
> -> 1597 return UserDefinedFunction(f, returnType)   1598
>1599 blacklist = ['map', 'since',
> 'ignore_unicode_prefix']/root/spark/python/pyspark/sql/functions.py in
> __init__(self, func, returnType, name)   1556 self.returnType =
> returnType
>1557 self._broadcast = None-> 1558 self._judf =
> self._create_judf(name)   1559
>1560 def _create_judf(self,
> name):/root/spark/python/pyspark/sql/functions.py in _create_judf(self, name)
> 1567 pickled_command, broadcast_vars, env, includes =
> _prepare_for_python_RDD(sc, command, self)   1568 ctx =
> SQLContext.getOrCreate(sc)-> 1569 jdt =
> ctx._ssql_ctx.parseDataType(self.returnType.json())   1570 if name is
> None:   1571 name = f.__name__ if hasattr(f, '__name__') else
> f.__class__.__name__
> 
> /root/spark/python/pyspark/sql/context.py in _ssql_ctx(self)681
> try:682 if not hasattr(self, '_scala_HiveContext'):--> 683
> self._scala_HiveContext = self._get_hive_ctx()684 return
> self._scala_HiveContext
> 685 except Py4JError as
> e:/root/spark/python/pyspark/sql/context.py in _get_hive_ctx(self)690
> 691 def _get_hive_ctx(self):--> 692 return
> self._jvm.HiveContext(self._jsc.sc())693
> 694 def refreshTable(self,
> tableName):/root/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
> __call__(self, *args)   1062 answer =
> self._gateway_client.send_command(command)   1063 return_value 

Re: [Spark2] Error writing "complex" type to CSV

2016-08-18 Thread Hyukjin Kwon
Hi Efe,

If my understanding is correct, supporting to write/read complex types is
not supported because CSV format can't represent the nested types in its
own format.

I guess supporting them in writing in external CSV is rather a bug.

I think it'd be great if we can write and read back CSV in its own format
but I guess we can't.

Thanks!

On 19 Aug 2016 6:33 a.m., "Efe Selcuk"  wrote:

> We have an application working in Spark 1.6. It uses the databricks csv
> library for the output format when writing out.
>
> I'm attempting an upgrade to Spark 2. When writing with both the native
> DataFrameWriter#csv() method and with first specifying the
> "com.databricks.spark.csv" format (I suspect underlying format is the same
> but I don't know how to verify), I get the following error:
>
> java.lang.UnsupportedOperationException: CSV data source does not support
> struct<[bunch of field names and types]> data type
>
> There are 20 fields, mostly plain strings with a couple of dates. The
> source object is a Dataset[T] where T is a case class with various fields
> The line just looks like: someDataset.write.csv(outputPath)
>
> Googling returned this fairly recent pull request: https://mail-archives
> .apache.org/mod_mbox/spark-commits/201605.mbox/%3C65d35a7
> 2bd05483392857098a2635...@git.apache.org%3E
>
> If I'm reading that correctly, the schema shows that each record has one
> field of this complex struct type? And the validation thinks it's something
> that it can't serialize. I would expect the schema to have a bunch of
> fields in it matching the case class, so maybe there's something I'm
> misunderstanding.
>
> Efe
>


Re: Unable to see external table that is created from Hive Context in the list of hive tables

2016-08-18 Thread Mich Talebzadeh
Which Hive database did you specify in your CREATE EXTERNAL TABLE statement?

How did you specify the location of the file?

This one creates an external table testme in database test in the location
under database test

scala>  sqltext = """
 |  CREATE EXTERNAL TABLE test.testme (
 | col1 int
 | )
 | STORED AS ORC
 | LOCATION '/user/hive/warehouse/test.db/testme'
 | """
sqltext: String =
 CREATE EXTERNAL TABLE test.testme (
col1 int
)
STORED AS ORC
LOCATION '/user/hive/warehouse/test.db/testme'
scala> sql(sqltext)
res18: org.apache.spark.sql.DataFrame = []

scala> sql("select count(1) from test.testme").show
++
|count(1)|
++
|   0|
++

And you can see it in Hive as well

hive> desc test.testme;
OK
col1int
Time taken: 0.102 seconds, Fetched: 1 row(s)

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 18 August 2016 at 23:08, SRK  wrote:

> Hi,
>
> I created an external table in Spark sql using hiveContext ...something
> like
> CREATE EXTERNAL TABLE IF NOT EXISTS sampleTable stored as ORC LOCATION ...
>
> I can see the files getting created under the location I specified and able
> to query it as well... but, I don't see the table in Hive when I do show
> tables in Hive. Any idea as to why this is happening?
>
> Thanks!
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Unable-to-see-external-table-that-is-
> created-from-Hive-Context-in-the-list-of-hive-tables-tp27562.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: pyspark unable to create UDF: java.lang.RuntimeException: org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a directory: /tmp tmp

2016-08-18 Thread Felix Cheung
Do you have a file called tmp at / on HDFS?





On Thu, Aug 18, 2016 at 2:57 PM -0700, "Andy Davidson" 
> wrote:

For unknown reason I can not create UDF when I run the attached notebook on my 
cluster. I get the following error


Py4JJavaError: An error occurred while calling 
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException: org.apache.hadoop.fs.FileAlreadyExistsException: 
Parent path is not a directory: /tmp tmp

The notebook runs fine on my Mac

In general I am able to run non UDF spark code with out any trouble

I start the notebook server as the user “ec2-user" and uses master URL
spark://ec2-51-215-120-63.us-west-1.compute.amazonaws.com:6066



I found the following message in the notebook server log file. I have log level 
set to warn


16/08/18 21:38:45 WARN ObjectStore: Version information not found in metastore. 
hive.metastore.schema.verification is not enabled so recording the schema 
version 1.2.0

16/08/18 21:38:45 WARN ObjectStore: Failed to get database default, returning 
NoSuchObjectException


The cluster was originally created using spark-1.6.1-bin-hadoop2.6/ec2/spark-ec2



#from pyspark.sql import SQLContext, HiveContext

#sqlContext = SQLContext(sc)

​

#from pyspark.sql import DataFrame

#from pyspark.sql import functions

​

from pyspark.sql.types import StringType

from pyspark.sql.functions import udf

​

print("spark version: {}".format(sc.version))

​

import sys

print("python version: {}".format(sys.version))

spark version: 1.6.1
python version: 3.4.3 (default, Apr  1 2015, 18:10:40)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)]



# functions.lower() raises
# py4j.Py4JException: Method lower([class java.lang.String]) does not exist
# work around define a UDF
toLowerUDFRetType = StringType()
#toLowerUDF = udf(lambda s : s.lower(), toLowerUDFRetType)
toLowerUDF = udf(lambda s : s.lower(), StringType())


You must build Spark with Hive. Export 'SPARK_HIVE=true' and run build/sbt 
assembly


Py4JJavaErrorTraceback (most recent call last)
 in ()
  4 toLowerUDFRetType = StringType()
  5 #toLowerUDF = udf(lambda s : s.lower(), toLowerUDFRetType)
> 6 toLowerUDF = udf(lambda s : s.lower(), StringType())

/root/spark/python/pyspark/sql/functions.py in udf(f, returnType)
   1595 [Row(slen=5), Row(slen=3)]
   1596 """
-> 1597 return UserDefinedFunction(f, returnType)
   1598
   1599 blacklist = ['map', 'since', 'ignore_unicode_prefix']

/root/spark/python/pyspark/sql/functions.py in __init__(self, func, returnType, 
name)
   1556 self.returnType = returnType
   1557 self._broadcast = None
-> 1558 self._judf = self._create_judf(name)
   1559
   1560 def _create_judf(self, name):

/root/spark/python/pyspark/sql/functions.py in _create_judf(self, name)
   1567 pickled_command, broadcast_vars, env, includes = 
_prepare_for_python_RDD(sc, command, self)
   1568 ctx = SQLContext.getOrCreate(sc)
-> 1569 jdt = ctx._ssql_ctx.parseDataType(self.returnType.json())
   1570 if name is None:
   1571 name = f.__name__ if hasattr(f, '__name__') else 
f.__class__.__name__

/root/spark/python/pyspark/sql/context.py in _ssql_ctx(self)
681 try:
682 if not hasattr(self, '_scala_HiveContext'):
--> 683 self._scala_HiveContext = self._get_hive_ctx()
684 return self._scala_HiveContext
685 except Py4JError as e:

/root/spark/python/pyspark/sql/context.py in _get_hive_ctx(self)
690
691 def _get_hive_ctx(self):
--> 692 return self._jvm.HiveContext(self._jsc.sc())
693
694 def refreshTable(self, tableName):

/root/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, 
*args)
   1062 answer = self._gateway_client.send_command(command)
   1063 return_value = get_return_value(
-> 1064 answer, self._gateway_client, None, self._fqn)
   1065
   1066 for temp_arg in temp_args:

/root/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/root/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling 
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException: org.apache.hadoop.fs.FileAlreadyExistsException: 
Parent path is not a directory: /tmp tmp
at 

Unable to see external table that is created from Hive Context in the list of hive tables

2016-08-18 Thread SRK
Hi,

I created an external table in Spark sql using hiveContext ...something like
CREATE EXTERNAL TABLE IF NOT EXISTS sampleTable stored as ORC LOCATION ...

I can see the files getting created under the location I specified and able
to query it as well... but, I don't see the table in Hive when I do show
tables in Hive. Any idea as to why this is happening?

Thanks! 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-see-external-table-that-is-created-from-Hive-Context-in-the-list-of-hive-tables-tp27562.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



pyspark unable to create UDF: java.lang.RuntimeException: org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a directory: /tmp tmp

2016-08-18 Thread Andy Davidson
For unknown reason I can not create UDF when I run the attached notebook on
my cluster. I get the following error

Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException:
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a
directory: /tmp tmp

The notebook runs fine on my Mac

In general I am able to run non UDF spark code with out any trouble

I start the notebook server as the user “ec2-user" and uses master URL
spark://ec2-51-215-120-63.us-west-1.compute.amazonaws.com:6066


I found the following message in the notebook server log file. I have log
level set to warn

16/08/18 21:38:45 WARN ObjectStore: Version information not found in
metastore. hive.metastore.schema.verification is not enabled so recording
the schema version 1.2.0

16/08/18 21:38:45 WARN ObjectStore: Failed to get database default,
returning NoSuchObjectException



The cluster was originally created using
spark-1.6.1-bin-hadoop2.6/ec2/spark-ec2


#from pyspark.sql import SQLContext, HiveContext
#sqlContext = SQLContext(sc)
​
#from pyspark.sql import DataFrame
#from pyspark.sql import functions
​
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
​
print("spark version: {}".format(sc.version))
​
import sys
print("python version: {}".format(sys.version))
spark version: 1.6.1
python version: 3.4.3 (default, Apr  1 2015, 18:10:40)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)]



# functions.lower() raises
# py4j.Py4JException: Method lower([class java.lang.String]) does not exist
# work around define a UDF
toLowerUDFRetType = StringType()
#toLowerUDF = udf(lambda s : s.lower(), toLowerUDFRetType)
toLowerUDF = udf(lambda s : s.lower(), StringType())
You must build Spark with Hive. Export 'SPARK_HIVE=true' and run build/sbt
assembly
Py4JJavaErrorTraceback (most recent call last)
 in ()
  4 toLowerUDFRetType = StringType()
  5 #toLowerUDF = udf(lambda s : s.lower(), toLowerUDFRetType)
> 6 toLowerUDF = udf(lambda s : s.lower(), StringType())

/root/spark/python/pyspark/sql/functions.py in udf(f, returnType)
   1595 [Row(slen=5), Row(slen=3)]
   1596 """
-> 1597 return UserDefinedFunction(f, returnType)
   1598 
   1599 blacklist = ['map', 'since', 'ignore_unicode_prefix']

/root/spark/python/pyspark/sql/functions.py in __init__(self, func,
returnType, name)
   1556 self.returnType = returnType
   1557 self._broadcast = None
-> 1558 self._judf = self._create_judf(name)
   1559 
   1560 def _create_judf(self, name):

/root/spark/python/pyspark/sql/functions.py in _create_judf(self, name)
   1567 pickled_command, broadcast_vars, env, includes =
_prepare_for_python_RDD(sc, command, self)
   1568 ctx = SQLContext.getOrCreate(sc)
-> 1569 jdt = ctx._ssql_ctx.parseDataType(self.returnType.json())
   1570 if name is None:
   1571 name = f.__name__ if hasattr(f, '__name__') else
f.__class__.__name__

/root/spark/python/pyspark/sql/context.py in _ssql_ctx(self)
681 try:
682 if not hasattr(self, '_scala_HiveContext'):
--> 683 self._scala_HiveContext = self._get_hive_ctx()
684 return self._scala_HiveContext
685 except Py4JError as e:

/root/spark/python/pyspark/sql/context.py in _get_hive_ctx(self)
690 
691 def _get_hive_ctx(self):
--> 692 return self._jvm.HiveContext(self._jsc.sc())
693 
694 def refreshTable(self, tableName):

/root/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
__call__(self, *args)
   1062 answer = self._gateway_client.send_command(command)
   1063 return_value = get_return_value(
-> 1064 answer, self._gateway_client, None, self._fqn)
   1065 
   1066 for temp_arg in temp_args:

/root/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/root/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException:
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a
directory: /tmp tmp
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.mkdirs(FSDirectory.java:1
489)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesys
tem.java:2979)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.j

[Spark2] Error writing "complex" type to CSV

2016-08-18 Thread Efe Selcuk
We have an application working in Spark 1.6. It uses the databricks csv
library for the output format when writing out.

I'm attempting an upgrade to Spark 2. When writing with both the native
DataFrameWriter#csv() method and with first specifying the
"com.databricks.spark.csv" format (I suspect underlying format is the same
but I don't know how to verify), I get the following error:

java.lang.UnsupportedOperationException: CSV data source does not support
struct<[bunch of field names and types]> data type

There are 20 fields, mostly plain strings with a couple of dates. The
source object is a Dataset[T] where T is a case class with various fields
The line just looks like: someDataset.write.csv(outputPath)

Googling returned this fairly recent pull request: https://mail-
archives.apache.org/mod_mbox/spark-commits/201605.mbox/%
3c65d35a72bd05483392857098a2635...@git.apache.org%3E

If I'm reading that correctly, the schema shows that each record has one
field of this complex struct type? And the validation thinks it's something
that it can't serialize. I would expect the schema to have a bunch of
fields in it matching the case class, so maybe there's something I'm
misunderstanding.

Efe


Py4JJavaError: An error occurred while calling None.org.apache.spark.sql.hive.HiveContext.

2016-08-18 Thread Andy Davidson
Hi I am using python3, Java8 and spark-1.6.1. I am running my code in
Jupyter notebook

The following code runs fine on my mac

udfRetType = ArrayType(StringType(), True)
findEmojiUDF = udf(lambda s : re.findall(emojiPattern2, s), udfRetType)

retDF = (emojiSpecialDF
# convert into a list of emojis
.select("body",
findEmojiUDF(emojiSpecialDF.body).alias("listEmojis"))
# explode , convert list of emojis into separate rows
.select("*", functions.explode("listEmojis").alias("emoji"))
   )

retDF.printSchema()
retDF.show(40, truncate=False)

When I run it on my cluster I get

Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException:
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a
directory: /tmp tmp

I check the files permissions. I start my notebook server as user ec2-user


[ec2-user exploration]$ ls -ld /tmp

drwxrwxrwt 5 root root 4096 Aug 18 18:14 /tmp





In the cluster I use masterURL
spark://ec2-54-215-230-73.us-west-1.compute.amazonaws.com:6066 (all my other
spark code seems to work fine)


Bellow is the complete stack trace

Any idea what the problem is?

Thanks

Andy

You must build Spark with Hive. Export 'SPARK_HIVE=true' and run build/sbt
assembly

Py4JJavaErrorTraceback (most recent call last)
 in ()
  1 udfRetType = ArrayType(StringType(), True)
> 2 findEmojiUDF = udf(lambda s : re.findall(emojiPattern2, s),
udfRetType)
  3 
  4 retDF = (emojiSpecialDF
  5 # convert into a list of emojis

/root/spark/python/pyspark/sql/functions.py in udf(f, returnType)
   1595 [Row(slen=5), Row(slen=3)]
   1596 """
-> 1597 return UserDefinedFunction(f, returnType)
   1598 
   1599 blacklist = ['map', 'since', 'ignore_unicode_prefix']

/root/spark/python/pyspark/sql/functions.py in __init__(self, func,
returnType, name)
   1556 self.returnType = returnType
   1557 self._broadcast = None
-> 1558 self._judf = self._create_judf(name)
   1559 
   1560 def _create_judf(self, name):

/root/spark/python/pyspark/sql/functions.py in _create_judf(self, name)
   1567 pickled_command, broadcast_vars, env, includes =
_prepare_for_python_RDD(sc, command, self)
   1568 ctx = SQLContext.getOrCreate(sc)
-> 1569 jdt = ctx._ssql_ctx.parseDataType(self.returnType.json())
   1570 if name is None:
   1571 name = f.__name__ if hasattr(f, '__name__') else
f.__class__.__name__

/root/spark/python/pyspark/sql/context.py in _ssql_ctx(self)
681 try:
682 if not hasattr(self, '_scala_HiveContext'):
--> 683 self._scala_HiveContext = self._get_hive_ctx()
684 return self._scala_HiveContext
685 except Py4JError as e:

/root/spark/python/pyspark/sql/context.py in _get_hive_ctx(self)
690 
691 def _get_hive_ctx(self):
--> 692 return self._jvm.HiveContext(self._jsc.sc())
693 
694 def refreshTable(self, tableName):

/root/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
__call__(self, *args)
   1062 answer = self._gateway_client.send_command(command)
   1063 return_value = get_return_value(
-> 1064 answer, self._gateway_client, None, self._fqn)
   1065 
   1066 for temp_arg in temp_args:

/root/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/root/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.
: java.lang.RuntimeException:
org.apache.hadoop.fs.FileAlreadyExistsException: Parent path is not a
directory: /tmp tmp
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.mkdirs(FSDirectory.java:1
489)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesys
tem.java:2979)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.j
ava:2932)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java
:2911)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcS
erver.java:649)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslator
PB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:417)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNam

Re: Model Persistence

2016-08-18 Thread Nick Pentreath
Model metadata (mostly parameter values) are usually tiny. The parquet data
is most often for model coefficients. So this depends on the size of your
model, i.e. Your feature dimension.

A high-dimensional linear model can be quite large - but still typically
easy to fit into main memory on a single node. A high-dimensional
multi-layer perceptron with many layers could be quite a lot larger. An ALS
model with millions of users  items could be quite huge.

On Thu, 18 Aug 2016 at 18:00, Rich Tarro  wrote:

> The following Databricks blog on Model Persistence states "Internally, we
> save the model metadata and parameters as JSON and the data as Parquet."
>
>
> https://databricks.com/blog/2016/05/31/apache-spark-2-0-preview-machine-learning-model-persistence.html
>
>
> What data associated with a model or Pipeline is actually saved (in
> Parquet format)?
>
> What factors determine how large the the saved model or pipeline will be?
>
> Thanks.
> Rich
>


py4j.Py4JException: Method lower([class java.lang.String]) does not exist

2016-08-18 Thread Andy Davidson
I am still working on spark-1.6.1. I am using java8.

Any idea why

(df.select("*", functions.lower("rawTag").alias("tag²))

Would raise py4j.Py4JException: Method lower([class java.lang.String]) 
does
not exist

Thanks in advance

Andy

https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html?highlight=lo
wer#pyspark.sql.functions.lower


pyspark.sql.functions.lower(col)


Converts a string column to lower case.

New in version 1.5.

#import logging
import json

from pyspark.sql import SQLContext, HiveContext
sqlContext = SQLContext(sc)
#hiveContext = HiveContext(sc)

from pyspark.sql import DataFrame
from pyspark.sql import functions


Py4JErrorTraceback (most recent call last)
 in ()
 28 df = df.select("user_loc",
functions.explode("hashTags").alias("rawTag"))
 29 df.printSchema()
---> 30 (df.select("*", functions.lower("rawTag").alias("tag²))


/root/spark/python/pyspark/sql/functions.py in _(col)
 37 def _(col):
 38 sc = SparkContext._active_spark_context
---> 39 jc = getattr(sc._jvm.functions, name)(col._jc if
isinstance(col, Column) else col)
 40 return Column(jc)
 41 _.__name__ = name

/root/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
__call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814 
815 for temp_arg in temp_args:

/root/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/root/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JError(
311 "An error occurred while calling {0}{1}{2}.
Trace:\n{3}\n".
--> 312 format(target_id, ".", name, value))
313 else:
314 raise Py4JError(

Py4JError: An error occurred while calling
z:org.apache.spark.sql.functions.lower. Trace:
py4j.Py4JException: Method lower([class java.lang.String]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:360)
at py4j.Gateway.invoke(Gateway.java:254)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)





Extra string added to column name? (withColumn & expr)

2016-08-18 Thread rachmaninovquartet
Hi,

I'm trying to implement a custom one hot encoder, since I want the output to
be a specific way, suitable to theano. Basically, it will give a new column
for each distinct member of the original features and have it set to 1 if
the observation contains the specific member of the distinct feature subset.
Something like feature1.distinct1, feature1.distinct2...

Here is my attempt, which seems logically sound:

for (column <- featuresThatNeedEncoding) {

  for (j <- df.select(column).distinct().collect().toSeq) {

df = df.withColumn(column + "." + j.get(0).toString,  expr("CASE
WHEN " + column + " = '" + j.get(0).toString + "' THEN " + column + "." +
j.get(0).toString + " = '1' ELSE " + column + "." + j.get(0).toString + " =
'0' END"))   
  }
}

And some of the stack trace:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Can't
extract value from someFeature#295;
at
org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:72)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$4.apply(LogicalPlan.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$4.apply(LogicalPlan.scala:266)
at
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)

Any ideas how to resolve this or why there is a #295 after my column name?

Thanks,

Ian



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extra-string-added-to-column-name-withColumn-expr-tp27560.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Large where clause StackOverflow 1.5.2

2016-08-18 Thread rachmaninovquartet
I solved this by using a Window partitioned by 'id'. I used lead and lag to
create columns, which contained nulls in the places that I needed to delete,
in each fold. I then removed those rows with the nulls and my additional
columns. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Large-where-clause-StackOverflow-1-5-2-tp27544p27559.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Model Persistence

2016-08-18 Thread Rich Tarro
The following Databricks blog on Model Persistence states "Internally, we
save the model metadata and parameters as JSON and the data as Parquet."

https://databricks.com/blog/2016/05/31/apache-spark-2-0-preview-machine-learning-model-persistence.html


What data associated with a model or Pipeline is actually saved (in Parquet
format)?

What factors determine how large the the saved model or pipeline will be?

Thanks.
Rich


Structured Stream Behavior on failure

2016-08-18 Thread Cornelio
Hi 

I have a couple of questions.

1.- When Spark shutdowns or fails doc states that "In case of a failure or
intentional shutdown, you can recover the previous progress and state of a
previous query, and continue where it left off. " 
-To acheive this do I just need to set the checkpoint dir as "option" in my
query?  
-When recovery is done, the "batchId" number will  be the same as  before?
(Just the same before spark shutdown)

2.- Is the "addBatch" method of a Sink executed in parallel? If not, can it
be implemented in a way to execute in parallel? 
E.g. batchId-1 comes in and whle processing it (addBatch) batchId-2 comes
in, will they be executed in parallel?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Stream-Behavior-on-failure-tp27558.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Aggregations with scala pairs

2016-08-18 Thread Andrés Ivaldi
Thanks!!!

On Thu, Aug 18, 2016 at 3:35 AM, Jean-Baptiste Onofré 
wrote:

> Agreed.
>
> Regards
> JB
> On Aug 18, 2016, at 07:32, Olivier Girardot  com> wrote:
>>
>> CC'ing dev list,
>> you should open a Jira and a PR related to it to discuss it c.f.
>> https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#
>> ContributingtoSpark-ContributingCodeChanges
>>
>>
>>
>> On Wed, Aug 17, 2016 4:01 PM, Andrés Ivaldi iaiva...@gmail.com wrote:
>>
>>> Hello, I'd like to report a wrong behavior of DataSet's API, I don´t
>>> know how I can do that. My Jira account doesn't allow me to add a Issue
>>>
>>> I'm using Apache 2.0.0 but the problem came since at least version 1.4
>>> (given the doc since 1.3)
>>>
>>> The problem is simple to reporduce, also the work arround, if we apply
>>> agg over a DataSet with scala pairs over the same column, only one agg over
>>> that column is actualy used, this is because the toMap that reduce the pair
>>> values of the mane key to one and overwriting the value
>>>
>>> class
>>> https://github.com/apache/spark/blob/master/sql/core/
>>> src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
>>>
>>>
>>>  def agg(aggExpr: (String, String), aggExprs: (String, String)*):
>>> DataFrame = {
>>> agg((aggExpr +: aggExprs).toMap)
>>>   }
>>>
>>>
>>> rewrited as somthing like this should work
>>>  def agg(aggExpr: (String, String), aggExprs: (String, String)*):
>>> DataFrame = {
>>>toDF((aggExpr +: aggExprs).map { pairExpr =>
>>>   strToExpr(pairExpr._2)(df(pairExpr._1).expr)
>>> }.toSeq)
>>> }
>>>
>>>
>>> regards
>>> --
>>> Ing. Ivaldi Andres
>>>
>>
>>
>> *Olivier Girardot*   | Associé
>> o.girar...@lateral-thoughts.com
>> +33 6 24 09 17 94
>>
>


-- 
Ing. Ivaldi Andres


createDirectStream parallelism

2016-08-18 Thread Diwakar Dhanuskodi
We are using createDirectStream api  to receive messages from 48
partitioned topic. I am setting up  --num-executors 48  & --executor-cores
1 in spark-submit

All partitions were parallely received  and corresponding RDDs in
foreachRDD were executed in parallel. But when I join a transformed RDD
jsonDF (code below) with another RDD, i could see that they are not
executed  in parallell for each partitions. There were more shuffle read
and writes and no of executors executing were less than the no of
partitions. I mean, no of executors were not equal to no of partitions when
join is executing.

How could I make sure to execute join in all executors. Can anyone provide
help.

kstream.foreachRDD { rdd =>

val  jsonDF = sqlContext.read.json(rdd).toDF
.
...
val metaDF = ssc.sparkContext.textfile("file").toDF

val join  = jsonDF.join(metaDF)

join.map ().count

}


Re: Standalone executor memory is fixed while executor cores are load balanced between workers

2016-08-18 Thread Mich Talebzadeh
Can you provide some info

In your conf/spark-env.sh, what do you set these

# Options for the daemons used in the standalone deploy mode
SPARK_WORKER_CORES=? ##, total number of cores to be used by executors by
each worker
SPARK_WORKER_MEMORY=?g ##, to set how much total memory workers have to
give executors (e.g. 1000m, 2g)
SPARK_WORKER_INSTANCES=?##, to set the number of worker processes per node



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 18 August 2016 at 15:06, Petr Novak  wrote:

> Hello,
> when I set spark.executor.cores e.g. to 8 cores and spark.executor.memory
> to 8GB. It can allocate more executors with less cores for my app but each
> executors gets 8GB RAM.
>
> It is a problem because I can allocate more memory across cluster than
> expected, the worst case is 8x 1core executors, each with 8GB => 64GB RAM,
> instead of about 8GB I need for app.
>
> If I would plan spark.executor.memory to some lower amount, than I can end
> up with less executors, even a single one (if other nodes are full) which
> wouldn't have enough memory. I don't know how to configure executor memory
> in a predictable way.
>
> The only predictable way we found is to set 1 core to
> spark.executor.cores. And divide required memory for app by
> spark.cores.max. But having many JVMs for small executors doesn't look
> optimal to me.
>
> Is it a known issue or do I miss something?
>
> Many thanks,
> Petr
>
>


RE: pyspark.sql.functions.last not working as expected

2016-08-18 Thread Alexander Peletz
Is the issue that the default rangeBetween = rangeBetween(-sys.maxsize, 0)? 
That would explain the behavior below. Is this default documented somewhere?

From: Alexander Peletz [mailto:alexand...@slalom.com]
Sent: Wednesday, August 17, 2016 8:48 PM
To: user 
Subject: RE: pyspark.sql.functions.last not working as expected

So here is the test case from the commit adding the first/last methods here: 
https://github.com/apache/spark/pull/10957/commits/defcc02a8885e884d5140b11705b764a51753162



+  test("last/first with ignoreNulls") {


+val nullStr: String = null


+val df = Seq(


+  ("a", 0, nullStr),


+  ("a", 1, "x"),


+  ("a", 2, "y"),


+  ("a", 3, "z"),


+  ("a", 4, nullStr),


+  ("b", 1, nullStr),


+  ("b", 2, nullStr)).


+  toDF("key", "order", "value")


+val window = Window.partitionBy($"key").orderBy($"order")


+checkAnswer(


+  df.select(


+$"key",


+$"order",


+first($"value").over(window),


+first($"value", ignoreNulls = false).over(window),


+first($"value", ignoreNulls = true).over(window),


+last($"value").over(window),


+last($"value", ignoreNulls = false).over(window),


+last($"value", ignoreNulls = true).over(window)),


+  Seq(


+Row("a", 0, null, null, null, null, null, null),


+Row("a", 1, null, null, "x", "x", "x", "x"),


+Row("a", 2, null, null, "x", "y", "y", "y"),


+Row("a", 3, null, null, "x", "z", "z", "z"),


+Row("a", 4, null, null, "x", null, null, "z"),


+Row("b", 1, null, null, null, null, null, null),


+Row("b", 2, null, null, null, null, null, null)))


+  }



I would expect the correct results to be as follows instead of what is used 
above. Shouldn't we always return the first or last value in the partition 
based on the ordering? It looks something else is going on... can someone 
explain?

+  Seq(

+Row("a", 0, null, null, "x", null, null, "z"),

+Row("a", 1, null, null, "x", null, null, "z"),

+Row("a", 2, null, null, "x", null, null, "z"),

+Row("a", 3, null, null, "x", null, null, "z"),

+Row("a", 4, null, null, "x", null, null, "z"),

+Row("b", 1, null, null, null, null, null, null),

+Row("b", 2, null, null, null, null, null, null)))



From: Alexander Peletz [mailto:alexand...@slalom.com]
Sent: Wednesday, August 17, 2016 11:57 AM
To: user >
Subject: pyspark.sql.functions.last not working as expected

Hi,

I am using Spark 2.0 and I am getting unexpected results using the last() 
method. Has anyone else experienced this? I get the sense that last() is 
working correctly within a given data partition but not across the entire RDD. 
First() seems to work as expected so I can work around this by having a window 
that is in reverse order and use first() instead of last() but it would be 
great if last() actually worked.


Thanks,
Alexander


Alexander Peletz
Consultant

slalom

Fortune 100 Best Companies to Work For 2016
Glassdoor Best Places to Work 2016
Consulting Magazine Best Firms to Work For 2015

316 Stuart Street, Suite 300
Boston, MA 02116
706.614.5033 cell | 617.316.5400 office
alexand...@slalom.com



Standalone executor memory is fixed while executor cores are load balanced between workers

2016-08-18 Thread Petr Novak
Hello,
when I set spark.executor.cores e.g. to 8 cores and spark.executor.memory
to 8GB. It can allocate more executors with less cores for my app but each
executors gets 8GB RAM.

It is a problem because I can allocate more memory across cluster than
expected, the worst case is 8x 1core executors, each with 8GB => 64GB RAM,
instead of about 8GB I need for app.

If I would plan spark.executor.memory to some lower amount, than I can end
up with less executors, even a single one (if other nodes are full) which
wouldn't have enough memory. I don't know how to configure executor memory
in a predictable way.

The only predictable way we found is to set 1 core to spark.executor.cores.
And divide required memory for app by spark.cores.max. But having many JVMs
for small executors doesn't look optimal to me.

Is it a known issue or do I miss something?

Many thanks,
Petr


Re: SPARK MLLib - How to tie back Model.predict output to original data?

2016-08-18 Thread janardhan shetty
There is a spark-ts package developed by Sandy which has rdd version.
Not sure about the dataframe roadmap.

http://sryza.github.io/spark-timeseries/0.3.0/index.html
On Aug 18, 2016 12:42 AM, "ayan guha"  wrote:

> Thanks a lot. I resolved it using an UDF.
>
> Qs: does spark support any time series model? Is there any roadmap to know
> when a feature will be roughly available?
> On 18 Aug 2016 16:46, "Yanbo Liang"  wrote:
>
>> If you want to tie them with other data, I think the best way is to use
>> DataFrame join operation on condition that they share an identity column.
>>
>> Thanks
>> Yanbo
>>
>> 2016-08-16 20:39 GMT-07:00 ayan guha :
>>
>>> Hi
>>>
>>> Thank you for your reply. Yes, I can get prediction and original
>>> features together. My question is how to tie them back to other parts of
>>> the data, which was not in LP.
>>>
>>> For example, I have a bunch of other dimensions which are not part of
>>> features or label.
>>>
>>> Sorry if this is a stupid question.
>>>
>>> On Wed, Aug 17, 2016 at 12:57 PM, Yanbo Liang 
>>> wrote:
>>>
 MLlib will keep the original dataset during transformation, it just
 append new columns to existing DataFrame. That is you can get both
 prediction value and original features from the output DataFrame of
 model.transform.

 Thanks
 Yanbo

 2016-08-16 17:48 GMT-07:00 ayan guha :

> Hi
>
> I have a dataset as follows:
>
> DF:
> amount:float
> date_read:date
> meter_number:string
>
> I am trying to predict future amount based on past 3 weeks consumption
> (and a heaps of weather data related to date).
>
> My Labelpoint looks like
>
> label (populated from DF.amount)
> features (populated from a bunch of other stuff)
>
> Model.predict output:
> label
> prediction
>
> Now, I am trying to put together this prediction value back to meter
> number and date_read from original DF?
>
> One way to assume order of records in DF and Model.predict will be
> exactly same and zip two RDDs. But any other (possibly better) solution?
>
> --
> Best Regards,
> Ayan Guha
>


>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>


Reporting errors from spark sql

2016-08-18 Thread yael aharon
Hello,
I am working on an SQL editor which is powered by spark SQL. When the SQL
is not valid, I would like to provide the user with a line number and
column number where the first error occurred. I am having a hard time
finding a mechanism that will give me that information programmatically.

Most of the time, if an erroneous SQL statement is used, I am getting a
RuntimeException, where line number and column number are implicitly
embedded within the text of the message, but it is really error prone to
parse the message text and count the number of spaces prior to the '^'
symbol...

Sometimes, AnalysisException is used, but when I try to extract the line
and startPosition from it, they are always empty.

Any help would be greatly appreciated.
thanks!


Re: spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-18 Thread chandan prakash
Yes,
 i looked into the source code implementation.  sparkConf is serialized and
saved during checkpointing and re-created from the checkpoint directory at
time of restart. So any sparkConf parameter which you load from
application.config and set in sparkConf object in code cannot be changed
and reflected with checkpointing.  :(

Is there is any work around of reading changed sparkConf parameter value
with using checkpoiting?
p.s. i am not adding new parameter, i am just changing values of some
existing sparkConf param.

This is a common case and there must be some solution for this.

On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger  wrote:

> Checkpointing is not kafka-specific.  It encompasses metadata about the
> application.  You can't re-use a checkpoint if your application has changed.
>
> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#upgrading-application-code
>
>
> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash <
> chandanbaran...@gmail.com> wrote:
>
>> Is it possible that i use checkpoint directory to restart streaming but
>> with modified parameter value in config file (e.g.  username/password for
>> db connection)  ?
>> Thanks in advance.
>>
>> Regards,
>> Chandan
>>
>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
>> chandanbaran...@gmail.com> wrote:
>>
>>> Hi,
>>> I am using direct kafka with checkpointing of offsets same as :
>>> https://github.com/koeninger/kafka-exactly-once/blob/master/
>>> src/main/scala/example/IdempotentExample.scala
>>>
>>> I need to change some parameters like db connection params :
>>> username/password for db connection .
>>> I stopped streaming gracefully ,changed parameters in config file and
>>> restarted streaming.
>>> *Issue : changed parameters  username/password are not being considered.*
>>>
>>> *Question* :
>>> As per my understanding , Checkpointing should only save offsets of
>>> kafka partitions and not the credentials of the db connection.
>>> Why its picking old db connection params ?
>>>
>>> I am declaring params in main method and not in setUpSsc(0 method.
>>> My code is identical to that in the above program link  as below:
>>> val jdbcDriver = conf.getString("jdbc.driver")
>>> val jdbcUrl = conf.getString("jdbc.url")
>>> *val jdbcUser = conf.getString("jdbc.user")*
>>> * val jdbcPassword = conf.getString("jdbc.password")*
>>> // while the job doesn't strictly need checkpointing,
>>> // we'll checkpoint to avoid replaying the whole kafka log in case of
>>> failure
>>> val checkpointDir = conf.getString("checkpointDir")
>>> val ssc = StreamingContext.getOrCreate(
>>> checkpointDir,
>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
>>> *jdbcPassword*, checkpointDir) _
>>> )
>>>
>>>
>>>
>>> --
>>> Chandan Prakash
>>>
>>>
>>
>>
>> --
>> Chandan Prakash
>>
>>
>


-- 
Chandan Prakash


Re: JavaRDD to DataFrame fails with null pointer exception in 1.6.0

2016-08-18 Thread Aditya

Check if schema is generated correctly.

On Wednesday 17 August 2016 10:15 AM, sudhir patil wrote:


Tested with java 7 & 8 , same issue on both versions.


On Aug 17, 2016 12:29 PM, "spats" > wrote:


Cannot convert JavaRDD to DataFrame in spark 1.6.0, throws null
pointer
exception & no more details. Can't really figure out what really
happening.
Any pointer to fixes?

//convert JavaRDD to DataFrame
DataFrame schemaPeople = sqlContext.createDataFrame(people,
Person.class);

// exception with no more details
Exception in thread "main" java.lang.NullPointerException



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/JavaRDD-to-DataFrame-fails-with-null-pointer-exception-in-1-6-0-tp27547.html


Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org








Re: SparkStreaming source code

2016-08-18 Thread Adonis Settouf
Might it be this: https://github.com/apache/spark/tree/master/streaming

Software engineer at TTTech 
*My personal page *
[image: https://github.com/asettouf] 


2016-08-18 14:30 GMT+02:00 Aditya :

> Hi,
>
> I need to set up source code of Spark Streaming for exploring purpose.
> Can any one suggest the link for the Spark Streaming source code?
>
> Regards,
> Aditya Calangutkar
>
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-18 Thread Cody Koeninger
Checkpointing is not kafka-specific.  It encompasses metadata about the
application.  You can't re-use a checkpoint if your application has changed.

http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code


On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash 
wrote:

> Is it possible that i use checkpoint directory to restart streaming but
> with modified parameter value in config file (e.g.  username/password for
> db connection)  ?
> Thanks in advance.
>
> Regards,
> Chandan
>
> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
> chandanbaran...@gmail.com> wrote:
>
>> Hi,
>> I am using direct kafka with checkpointing of offsets same as :
>> https://github.com/koeninger/kafka-exactly-once/blob/master/
>> src/main/scala/example/IdempotentExample.scala
>>
>> I need to change some parameters like db connection params :
>> username/password for db connection .
>> I stopped streaming gracefully ,changed parameters in config file and
>> restarted streaming.
>> *Issue : changed parameters  username/password are not being considered.*
>>
>> *Question* :
>> As per my understanding , Checkpointing should only save offsets of kafka
>> partitions and not the credentials of the db connection.
>> Why its picking old db connection params ?
>>
>> I am declaring params in main method and not in setUpSsc(0 method.
>> My code is identical to that in the above program link  as below:
>> val jdbcDriver = conf.getString("jdbc.driver")
>> val jdbcUrl = conf.getString("jdbc.url")
>> *val jdbcUser = conf.getString("jdbc.user")*
>> * val jdbcPassword = conf.getString("jdbc.password")*
>> // while the job doesn't strictly need checkpointing,
>> // we'll checkpoint to avoid replaying the whole kafka log in case of
>> failure
>> val checkpointDir = conf.getString("checkpointDir")
>> val ssc = StreamingContext.getOrCreate(
>> checkpointDir,
>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
>> *jdbcPassword*, checkpointDir) _
>> )
>>
>>
>>
>> --
>> Chandan Prakash
>>
>>
>
>
> --
> Chandan Prakash
>
>


Re: [Spark 2.0] ClassNotFoundException is thrown when using Hive

2016-08-18 Thread Aditya

Try using --files /path/of/hive-site.xml  in spark-submit and run.

On Thursday 18 August 2016 05:26 PM, Diwakar Dhanuskodi wrote:

Hi

Can  you  cross check by providing same library path in --jars of 
spark-submit and run .



Sent from Samsung Mobile.


 Original message 
From: "颜发才(Yan Facai)" 
Date:18/08/2016 15:17 (GMT+05:30)
To: "user.spark" 
Cc:
Subject: [Spark 2.0] ClassNotFoundException is thrown when using Hive

Hi, all.

I copied hdfs-site.xml, core-site.xml and hive-site.xml to 
$SPARK_HOME/conf.
And spark-submit is used to submit task to yarn, and run as **client** 
mode.

However, ClassNotFoundException is thrown.

some details of logs are list below:
```
16/08/12 17:07:32 INFO hive.HiveUtils: Initializing 
HiveMetastoreConnection version 0.13.1 using 
file:/data0/facai/lib/hive-0.13.1/lib:file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
16/08/12 17:07:32 ERROR yarn.ApplicationMaster: User class threw 
exception: java.lang.ClassNotFoundException: 
java.lang.NoClassDefFoundError: 
org/apache/hadoop/hive/ql/session/SessionState when creating Hive 
client using classpath: file:/data0/facai/lib/hive-0.13.1/lib, 
file:/data0/facai/lib/hadoop-2.4.1/share/hadoop

```

In fact, all the jars needed by hive is  in the directory:
```Bash
[hadoop@h107713699 spark_test]$ ls /data0/facai/lib/hive-0.13.1/lib/ | 
grep hive

hive-ant-0.13.1.jar
hive-beeline-0.13.1.jar
hive-cli-0.13.1.jar
hive-common-0.13.1.jar
...
```

So, my question is:
why spark cannot find the jars needed?

Any help will be appreciate, thanks.







SparkStreaming source code

2016-08-18 Thread Aditya

Hi,

I need to set up source code of Spark Streaming for exploring purpose.
Can any one suggest the link for the Spark Streaming source code?

Regards,
Aditya Calangutkar




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: [Spark 2.0] ClassNotFoundException is thrown when using Hive

2016-08-18 Thread Diwakar Dhanuskodi
Hi

Can  you  cross check by providing same library path in --jars of spark-submit 
and run .


Sent from Samsung Mobile.

 Original message From: "颜发才(Yan Facai)" 
 Date:18/08/2016  15:17  (GMT+05:30) 
To: "user.spark"  Cc:  
Subject: [Spark 2.0] ClassNotFoundException is thrown when using 
Hive 
Hi, all.

I copied hdfs-site.xml, core-site.xml and hive-site.xml to $SPARK_HOME/conf. 
And spark-submit is used to submit task to yarn, and run as **client** mode. 
However, ClassNotFoundException is thrown.

some details of logs are list below:
```
16/08/12 17:07:32 INFO hive.HiveUtils: Initializing HiveMetastoreConnection 
version 0.13.1 using 
file:/data0/facai/lib/hive-0.13.1/lib:file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
16/08/12 17:07:32 ERROR yarn.ApplicationMaster: User class threw exception: 
java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError: 
org/apache/hadoop/hive/ql/session/SessionState when creating Hive client using 
classpath: file:/data0/facai/lib/hive-0.13.1/lib, 
file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
```

In fact, all the jars needed by hive is  in the directory:
```Bash
[hadoop@h107713699 spark_test]$ ls /data0/facai/lib/hive-0.13.1/lib/ | grep hive
hive-ant-0.13.1.jar
hive-beeline-0.13.1.jar
hive-cli-0.13.1.jar
hive-common-0.13.1.jar
...
```

So, my question is:
why spark cannot find the jars needed? 

Any help will be appreciate, thanks.



Re: Converting Dataframe to resultSet in Spark Java

2016-08-18 Thread rakesh sharma
Hi Sree


I dont think what you are trying to do is correct. DataFrame and ResultSet are 
two different types. And no strongly typed language will alow you to do that.

If your intention is to traverse the DataFrame or get the individual rows and 
columns then you must try the map function and pass anonymous function 
definitions with the required logic.


thanks

rakesh


From: Sree Eedupuganti 
Sent: Thursday, August 18, 2016 1:26:52 PM
To: user
Subject: Converting Dataframe to resultSet in Spark Java


Retrieved the data to DataFrame but i can't convert into ResultSet Is there 
any possible way how to convert...Any suggestions please...

Exception in thread "main" java.lang.ClassCastException: 
org.apache.spark.sql.DataFrame cannot be cast to 
com.datastax.driver.core.ResultSet

--
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


Re: [Spark 2.0] ClassNotFoundException is thrown when using Hive

2016-08-18 Thread Mich Talebzadeh
when you start spark-shell does it work or this issue is only with
spark-submit?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 18 August 2016 at 10:47, 颜发才(Yan Facai)  wrote:

> Hi, all.
>
> I copied hdfs-site.xml, core-site.xml and hive-site.xml to
> $SPARK_HOME/conf.
> And spark-submit is used to submit task to yarn, and run as **client**
> mode.
> However, ClassNotFoundException is thrown.
>
> some details of logs are list below:
> ```
> 16/08/12 17:07:32 INFO hive.HiveUtils: Initializing
> HiveMetastoreConnection version 0.13.1 using file:/data0/facai/lib/hive-0.1
> 3.1/lib:file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
> 16/08/12 17:07:32 ERROR yarn.ApplicationMaster: User class threw
> exception: java.lang.ClassNotFoundException:
> java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/session/SessionState
> when creating Hive client using classpath: 
> file:/data0/facai/lib/hive-0.13.1/lib,
> file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
> ```
>
> In fact, all the jars needed by hive is  in the directory:
> ```Bash
> [hadoop@h107713699 spark_test]$ ls /data0/facai/lib/hive-0.13.1/lib/ |
> grep hive
> hive-ant-0.13.1.jar
> hive-beeline-0.13.1.jar
> hive-cli-0.13.1.jar
> hive-common-0.13.1.jar
> ...
> ```
>
> So, my question is:
> why spark cannot find the jars needed?
>
> Any help will be appreciate, thanks.
>
>


Spark Streaming application failing with Token issue

2016-08-18 Thread Kamesh
Hi all,

 I am running a spark streaming application that store events into
Secure(Kerborized) HBase cluster. I launched this spark streaming
application by passing --principal and --keytab. Despite this, spark
streaming application is failing after *7days* with Token issue. Can
someone please suggest how to fix this.

*Error Message*

16/08/18 02:39:45 WARN ipc.AbstractRpcClient: Exception encountered while
connecting to the server :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
Token has expired

16/08/18 02:39:45 WARN security.UserGroupInformation:
PriviledgedActionException as:sys_bio_replicator (auth:KERBEROS)
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
Token has expired

*Environment*

Spark Version : 1.6.1

HBase version : 1.0.0

Hadoop Version : 2.6.0


Thanks & Regards
Kamesh.


[Spark 2.0] ClassNotFoundException is thrown when using Hive

2016-08-18 Thread Yan Facai
Hi, all.

I copied hdfs-site.xml, core-site.xml and hive-site.xml to
$SPARK_HOME/conf.
And spark-submit is used to submit task to yarn, and run as **client**
mode.
However, ClassNotFoundException is thrown.

some details of logs are list below:
```
16/08/12 17:07:32 INFO hive.HiveUtils: Initializing HiveMetastoreConnection
version 0.13.1 using file:/data0/facai/lib/hive-0.1
3.1/lib:file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
16/08/12 17:07:32 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError:
org/apache/hadoop/hive/ql/session/SessionState when creating Hive client
using classpath: file:/data0/facai/lib/hive-0.13.1/lib,
file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
```

In fact, all the jars needed by hive is  in the directory:
```Bash
[hadoop@h107713699 spark_test]$ ls /data0/facai/lib/hive-0.13.1/lib/ | grep
hive
hive-ant-0.13.1.jar
hive-beeline-0.13.1.jar
hive-cli-0.13.1.jar
hive-common-0.13.1.jar
...
```

So, my question is:
why spark cannot find the jars needed?

Any help will be appreciate, thanks.


Re: 2.0.1/2.1.x release dates

2016-08-18 Thread Sean Owen
Historically, minor releases happen every ~4 months, and maintenance
releases are a bit ad hoc but come about a month after the minor
release. It's up to the release manager to decide to do them but maybe
realistic to expect 2.0.1 in early September.

On Thu, Aug 18, 2016 at 10:35 AM, Adrian Bridgett  wrote:
> Just wondering if there were any rumoured release dates for either of the
> above.  I'm seeing some odd hangs with 2.0.0 and mesos (and I know that the
> mesos integration has had a bit of updating in 2.1.x).   Looking at JIRA,
> there's no suggested release date and issues seem to be added to a release
> version once resolved so the usual trick of looking at the
> resolved/unresolved ratio isn't helping :-)  The wiki only mentions 2.0.0 so
> no joy there either.
>
> Still doing testing but then I don't want to test with 2.1.x if it's going
> to be under heavy development for a while longer.
>
> Thanks for any info,
>
> Adrian
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-18 Thread chandan prakash
Is it possible that i use checkpoint directory to restart streaming but
with modified parameter value in config file (e.g.  username/password for
db connection)  ?
Thanks in advance.

Regards,
Chandan

On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash 
wrote:

> Hi,
> I am using direct kafka with checkpointing of offsets same as :
> https://github.com/koeninger/kafka-exactly-once/blob/
> master/src/main/scala/example/IdempotentExample.scala
>
> I need to change some parameters like db connection params :
> username/password for db connection .
> I stopped streaming gracefully ,changed parameters in config file and
> restarted streaming.
> *Issue : changed parameters  username/password are not being considered.*
>
> *Question* :
> As per my understanding , Checkpointing should only save offsets of kafka
> partitions and not the credentials of the db connection.
> Why its picking old db connection params ?
>
> I am declaring params in main method and not in setUpSsc(0 method.
> My code is identical to that in the above program link  as below:
> val jdbcDriver = conf.getString("jdbc.driver")
> val jdbcUrl = conf.getString("jdbc.url")
> *val jdbcUser = conf.getString("jdbc.user")*
> * val jdbcPassword = conf.getString("jdbc.password")*
> // while the job doesn't strictly need checkpointing,
> // we'll checkpoint to avoid replaying the whole kafka log in case of
> failure
> val checkpointDir = conf.getString("checkpointDir")
> val ssc = StreamingContext.getOrCreate(
> checkpointDir,
> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
> *jdbcPassword*, checkpointDir) _
> )
>
>
>
> --
> Chandan Prakash
>
>


-- 
Chandan Prakash


2.0.1/2.1.x release dates

2016-08-18 Thread Adrian Bridgett
Just wondering if there were any rumoured release dates for either of 
the above.  I'm seeing some odd hangs with 2.0.0 and mesos (and I know 
that the mesos integration has had a bit of updating in 2.1.x).   
Looking at JIRA, there's no suggested release date and issues seem to be 
added to a release version once resolved so the usual trick of looking 
at the resolved/unresolved ratio isn't helping :-)  The wiki only 
mentions 2.0.0 so no joy there either.


Still doing testing but then I don't want to test with 2.1.x if it's 
going to be under heavy development for a while longer.


Thanks for any info,

Adrian

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



GraphX VerticesRDD issue - java.lang.ArrayStoreException: java.lang.Long

2016-08-18 Thread Gerard Casey
Dear all,

I am building a graph from two JSON files.

Spark version 1.6.1

Creating Edge and Vertex RDDs from JSON files.

The vertex JSON files looks like this:

{"toid": "osgb400031043205", "index": 1, "point": [508180.748, 
195333.973]}
{"toid": "osgb400031043206", "index": 2, "point": [508163.122, 
195316.627]}
{"toid": "osgb400031043207", "index": 3, "point": [508172.075, 
195325.719]}
{"toid": "osgb400031043208", "index": 4, "point": [508513, 196023]}

val vertices_raw = sqlContext.read.json("vertices.json.gz")

val vertices = vertices_raw.rdd.map(row=> 
((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index")))

val verticesRDD: RDD[(VertexId, String)] = vertices

The edges JSON file looks like this:

{"index": 1, "term": "Private Road - Restricted Access", "nature": 
"Single Carriageway", "negativeNode": "osgb400023183407", "toid": 
"osgb400023296573", "length": 112.8275895775762, "polyline": [492019.481, 
156567.076, 492028, 156567, 492041.667, 156570.536, 492063.65, 156578.067, 
492126.5, 156602], "positiveNode": "osgb400023183409"}
{"index": 2, "term": "Private Road - Restricted Access", "nature": 
"Single Carriageway", "negativeNode": "osgb400023763485", "toid": 
"osgb400023296574", "length": 141.57731318733806, "polyline": [492144.493, 
156762.059, 492149.35, 156750, 492195.75, 156630], "positiveNode": 
"osgb400023183408"}

val edges_raw = sqlContext.read.json("edges.json.gz")

val edgesRDD = 
edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong,
 row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, 
row.getAs[Double]("length"

I have an EdgesRDD that I can inspect

[IN] edgesRDD
res10: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = 
MapPartitionsRDD[19] at map at :38
[IN] edgesRDD.foreach(println)

Edge(505125036254,505125036231,42.26548472559799)
Edge(505125651333,505125651330,29.557979625165135)
Edge(505125651329,505125651330,81.9310872300414)

I have a verticesRDD

[IN] verticesRDD
res12: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[9] at 
map at :38

[IN] verticesRDD.foreach(println)
(505125651331,343722)
(505125651332,343723)
(505125651333,343724)

I then combine these to create a graph.

[IN] val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD)
graph: org.apache.spark.graphx.Graph[String,Double] = 
org.apache.spark.graphx.impl.GraphImpl@303bbd02

I can inspect the edgesRDD within the graph object:

[IN] graph.edges.foreach(println)

Edge(505125774813,400029917080,72.9742898009203)
Edge(505125774814,505125774813,49.87951589790352)
Edge(505125775080,400029936370,69.62871049042008)

However, when I inspect the verticesRDD:

[IN] graph.vertices.foreach(println)
Is there an issue with my graph construction? 

ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 13)
java.lang.ArrayStoreException: java.lang.Long
at 
scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
at 
org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
at 
org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
at 
org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at 
org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at 
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 

Re: How to Improve Random Forest classifier accuracy

2016-08-18 Thread Jörn Franke
Depends on your data...
How did you split training and test set?
How does the model fit to the data?

You could try of course also to have more data to fed into the model
Have you considered alternative machine learning models?

I do not think this is a Spark problem, but you should ask the machine learning 
specializing in your data and random forrest.


> On 18 Aug 2016, at 10:31, 陈哲  wrote:
> 
> Hi All
>I using spark ml Random Forest classifier, I have only two label 
> categories (1, 0) ,about 30 features and data size over 100, 000. I run the 
> spark JavaRandomForestClassifierExample code, the model came out with the 
> results (I make some change, show more detail result):
> Test Error = 0.022321731460750338
> Prediction results label = 1 count:951
> Prediction results label = 0 count:13788
> Prediction results predictedLabel = 1 and label = 1 count:682
> Prediction results predictedLabel = 1 and label = 0 count:60
> Prediction results predictedLabel = 0 and label = 1 count:269
> Prediction Right = 0.7171398527865405
> Prediction Miss= 0.28286014721345953
> Prediction Wrong= 0.004351610095735422
> 
> I need to some advice about how to improve the accuracy , I tried to change 
> classifier attributes , some like maxdepth, maxbins but doesn't change much.
> do I have to give more features ? or there is other ways to improve this ?
> 
> Thanks
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to Improve Random Forest classifier accuracy

2016-08-18 Thread 陈哲
Hi All
   I using spark ml Random Forest classifier, I have only two label
categories (1, 0) ,about 30 features and data size over 100, 000. I run the
spark JavaRandomForestClassifierExample code, the model came out with the
results (I make some change, show more detail result):
Test Error = 0.022321731460750338
Prediction results label = 1 count:951
Prediction results label = 0 count:13788
Prediction results predictedLabel = 1 and label = 1 count:682
Prediction results predictedLabel = 1 and label = 0 count:60
Prediction results predictedLabel = 0 and label = 1 count:269
Prediction Right = 0.7171398527865405
Prediction Miss= 0.28286014721345953
Prediction Wrong= 0.004351610095735422

I need to some advice about how to improve the accuracy , I tried to change
classifier attributes , some like maxdepth, maxbins but doesn't change much.
do I have to give more features ? or there is other ways to improve this ?

Thanks


How to Improve Random Forest classifier accuracy

2016-08-18 Thread 陈哲
Hi All


Converting Dataframe to resultSet in Spark Java

2016-08-18 Thread Sree Eedupuganti
Retrieved the data to DataFrame but i can't convert into ResultSet Is
there any possible way how to convert...Any suggestions please...

Exception in thread "main" java.lang.ClassCastException:
org.apache.spark.sql.DataFrame cannot be cast to
com.datastax.driver.core.ResultSet
-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


Re: SPARK MLLib - How to tie back Model.predict output to original data?

2016-08-18 Thread ayan guha
Thanks a lot. I resolved it using an UDF.

Qs: does spark support any time series model? Is there any roadmap to know
when a feature will be roughly available?
On 18 Aug 2016 16:46, "Yanbo Liang"  wrote:

> If you want to tie them with other data, I think the best way is to use
> DataFrame join operation on condition that they share an identity column.
>
> Thanks
> Yanbo
>
> 2016-08-16 20:39 GMT-07:00 ayan guha :
>
>> Hi
>>
>> Thank you for your reply. Yes, I can get prediction and original features
>> together. My question is how to tie them back to other parts of the data,
>> which was not in LP.
>>
>> For example, I have a bunch of other dimensions which are not part of
>> features or label.
>>
>> Sorry if this is a stupid question.
>>
>> On Wed, Aug 17, 2016 at 12:57 PM, Yanbo Liang  wrote:
>>
>>> MLlib will keep the original dataset during transformation, it just
>>> append new columns to existing DataFrame. That is you can get both
>>> prediction value and original features from the output DataFrame of
>>> model.transform.
>>>
>>> Thanks
>>> Yanbo
>>>
>>> 2016-08-16 17:48 GMT-07:00 ayan guha :
>>>
 Hi

 I have a dataset as follows:

 DF:
 amount:float
 date_read:date
 meter_number:string

 I am trying to predict future amount based on past 3 weeks consumption
 (and a heaps of weather data related to date).

 My Labelpoint looks like

 label (populated from DF.amount)
 features (populated from a bunch of other stuff)

 Model.predict output:
 label
 prediction

 Now, I am trying to put together this prediction value back to meter
 number and date_read from original DF?

 One way to assume order of records in DF and Model.predict will be
 exactly same and zip two RDDs. But any other (possibly better) solution?

 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-18 Thread chandan prakash
Hi,
I am using direct kafka with checkpointing of offsets same as :
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/IdempotentExample.scala

I need to change some parameters like db connection params :
username/password for db connection .
I stopped streaming gracefully ,changed parameters in config file and
restarted streaming.
*Issue : changed parameters  username/password are not being considered.*

*Question* :
As per my understanding , Checkpointing should only save offsets of kafka
partitions and not the credentials of the db connection.
Why its picking old db connection params ?

I am declaring params in main method and not in setUpSsc(0 method.
My code is identical to that in the above program link  as below:
val jdbcDriver = conf.getString("jdbc.driver")
val jdbcUrl = conf.getString("jdbc.url")
*val jdbcUser = conf.getString("jdbc.user")*
* val jdbcPassword = conf.getString("jdbc.password")*
// while the job doesn't strictly need checkpointing,
// we'll checkpoint to avoid replaying the whole kafka log in case of
failure
val checkpointDir = conf.getString("checkpointDir")
val ssc = StreamingContext.getOrCreate(
checkpointDir,
setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
*jdbcPassword*, checkpointDir) _
)



-- 
Chandan Prakash


Re: Spark MLlib question: load model failed with exception:org.json4s.package$MappingException: Did not find value which can be converted into java.lang.String

2016-08-18 Thread Yanbo Liang
It looks like you mixed use ALS in spark.ml and spark.mllib package.
You can train the model by either one, meanwhile, you should use the
corresponding save/load functions.
You can not train/save the model by spark.mllib ALS, and then use spark.ml
ALS to load the model. It will throw exceptions.

I saw you use Pipeline to train ALS under spark.ml package. Then you should
use PipelineModel.load to read the model and get corresponding stage in the
pipeline as the ALSModel.

We strongly recommend you to use the spark.ml package which is the primary
API of MLlib. The spark.mllib package is in maintenance mode. So do all
your work under the same APIs.

Thanks
Yanbo

2016-08-17 1:30 GMT-07:00 :

> Hello guys:
>  I have a problem in loading recommend model. I have 2 models, one is
> good(able to get recommend result) and another is not working. I checked
> these 2 models, both are  MatrixFactorizationModel object. But in the
> metadata, one is a PipelineModel and another is a MatrixFactorizationModel.
> Is below exception caused by this?
>
> here are my stack trace:
> Exception in thread "main" org.json4s.package$MappingException: Did not
> find value which can be converted into java.lang.String
> at org.json4s.reflect.package$.fail(package.scala:96)
> at org.json4s.Extraction$.convert(Extraction.scala:554)
> at org.json4s.Extraction$.extract(Extraction.scala:331)
> at org.json4s.Extraction$.extract(Extraction.scala:42)
> at org.json4s.ExtractableJsonAstNode.extract(
> ExtractableJsonAstNode.scala:21)
> at org.apache.spark.mllib.util.Loader$.loadMetadata(
> modelSaveLoad.scala:131)
> at org.apache.spark.mllib.recommendation.
> MatrixFactorizationModel$.load(MatrixFactorizationModel.scala:330)
> at org.brave.spark.ml.RecommandForMultiUsers$.main(
> RecommandForMultiUsers.scala:55)
> at org.brave.spark.ml.RecommandForMultiUsers.main(
> RecommandForMultiUsers.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
> SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(
> SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.
> scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> The attached files are my codes, FYI.
> RecommandForMultiUsers.scala:55 is :
> val model = MatrixFactorizationModel.load(sc, modelpath)
>
>
> 
>
> ThanksBest regards!
> San.Luo
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: SPARK MLLib - How to tie back Model.predict output to original data?

2016-08-18 Thread Yanbo Liang
If you want to tie them with other data, I think the best way is to use
DataFrame join operation on condition that they share an identity column.

Thanks
Yanbo

2016-08-16 20:39 GMT-07:00 ayan guha :

> Hi
>
> Thank you for your reply. Yes, I can get prediction and original features
> together. My question is how to tie them back to other parts of the data,
> which was not in LP.
>
> For example, I have a bunch of other dimensions which are not part of
> features or label.
>
> Sorry if this is a stupid question.
>
> On Wed, Aug 17, 2016 at 12:57 PM, Yanbo Liang  wrote:
>
>> MLlib will keep the original dataset during transformation, it just
>> append new columns to existing DataFrame. That is you can get both
>> prediction value and original features from the output DataFrame of
>> model.transform.
>>
>> Thanks
>> Yanbo
>>
>> 2016-08-16 17:48 GMT-07:00 ayan guha :
>>
>>> Hi
>>>
>>> I have a dataset as follows:
>>>
>>> DF:
>>> amount:float
>>> date_read:date
>>> meter_number:string
>>>
>>> I am trying to predict future amount based on past 3 weeks consumption
>>> (and a heaps of weather data related to date).
>>>
>>> My Labelpoint looks like
>>>
>>> label (populated from DF.amount)
>>> features (populated from a bunch of other stuff)
>>>
>>> Model.predict output:
>>> label
>>> prediction
>>>
>>> Now, I am trying to put together this prediction value back to meter
>>> number and date_read from original DF?
>>>
>>> One way to assume order of records in DF and Model.predict will be
>>> exactly same and zip two RDDs. But any other (possibly better) solution?
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Spark SQL 1.6.1 issue

2016-08-18 Thread Teik Hooi Beh
Hi, does the version have to be the same down to the minor version, e.g.
1.6.1 and 1.6.2 will give this issue

On Thu, Aug 18, 2016 at 6:27 PM, Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> your executors/driver must not have the multiple versions of spark in
> classpath, it may come from the cassandra connector check the pom
> dependencies of the version you fetched and if it's compatible with your
> spark version.
>
>
>
> On Thu, Aug 18, 2016 6:05 AM, thbeh th...@thbeh.com wrote:
>
>> Running the query below I have been hitting - local class incompatible
>>
>> exception, anyone know the cause?
>>
>>
>> val rdd = csc.cassandraSql("""select *, concat('Q', d_qoy) as qoy from
>>
>> store_sales join date_dim on ss_sold_date_sk = d_date_sk join item on
>>
>> ss_item_sk =
>>
>> i_item_sk""").groupBy("i_category").pivot("qoy").agg(
>> round(sum("ss_sales_price")/100,2))
>>
>>
>> The source data is from TPCDS test data and I am running in Zeppelin.
>>
>>
>>
>> /INFO [2016-08-18 03:15:58,429] ({task-result-getter-2}
>>
>> Logging.scala[logInfo]:58) - Lost task 3.0 in stage 3.0 (TID 52) on
>> executor
>>
>> ceph5.example.my: java.io.InvalidClassException
>>
>> (org.apache.spark.sql.catalyst.expressions.Literal; local class
>>
>> incompatible: stream classdesc serialVersionUID = 3305180847846277455,
>> local
>>
>> class serialVersionUID = -4259705229845269663) [duplicate 1]
>>
>> INFO [2016-08-18 03:15:58,429] ({task-result-getter-3}
>>
>> Logging.scala[logInfo]:58) - Lost task 2.0 in stage 3.0 (TID 51) on
>> executor
>>
>> ceph5.example.my: java.io.InvalidClassException
>>
>> (org.apache.spark.sql.catalyst.expressions.Literal; local class
>>
>> incompatible: stream classdesc serialVersionUID = 3305180847846277455,
>> local
>>
>> class serialVersionUID = -4259705229845269663) [duplicate 2]
>>
>> INFO [2016-08-18 03:15:58,430] ({task-result-getter-3}
>>
>> Logging.scala[logInfo]:58) - Lost task 6.0 in stage 3.0 (TID 55) on
>> executor
>>
>> ceph5.example.my: java.io.InvalidClassException
>>
>> (org.apache.spark.sql.catalyst.expressions.Literal; local class
>>
>> incompatible: stream classdesc serialVersionUID = 3305180847846277455,
>> local
>>
>> class serialVersionUID = -4259705229845269663) [duplicate 3]/
>>
>>
>> Thanks
>>
>>
>>
>>
>> --
>>
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Spark-SQL-1-6-1-issue-tp27554.html
>>
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>>
>> -
>>
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>>
>
> *Olivier Girardot* | Associé
> o.girar...@lateral-thoughts.com
> +33 6 24 09 17 94
>


Re: error when running spark from oozie launcher

2016-08-18 Thread tkg_cangkul

you're right jean,

it's mismatch library in default oozie sharelib with my spark version. 
i've replace it and it works normally now.


thx for your help Jean.

On 18/08/16 13:37, Jean-Baptiste Onofré wrote:


It sounds like a mismatch in the spark version ship in oozie and the 
runtime one.


Regards
JB

On Aug 18, 2016, at 07:36, tkg_cangkul > wrote:


hi olivier, thx for your reply.

this is the full stacktrace :

Failing Oozie Launcher, Main class
[org.apache.oozie.action.hadoop.SparkMain], main() threw
exception,

org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V
java.lang.NoSuchMethodError:

org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V

On 18/08/16 13:28, Olivier Girardot wrote:

this is not the full stacktrace, please post the full stacktrace
if you want some help



On Wed, Aug 17, 2016 7:24 PM, tkg_cangkul yuza.ras...@gmail.com
 wrote:

hi i try to submit job spark with oozie. but i've got one
problem here.
when i submit the same job. sometimes my job succeed but
sometimes my job was failed.

i've got this error message when the job was failed :


org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V


anyone can help me to solve this? i've try to set
-XX:MaxPermSize=512m  -XX:PermSize=256m in spark. driver.
extraJavaOptions properties but this not help enough for me. 




**Olivier Girardot** ** | Associé
o.girar...@lateral-thoughts.com

+33 6 24 09 17 94






Re: VectorUDT with spark.ml.linalg.Vector

2016-08-18 Thread Yanbo Liang
@Michal

Yes, we have public VectorUDT in spark.mllib package at 1.6, and this class
is still existing in 2.0.
And from 2.0, we provide a new VectorUDT in spark.ml package and make it
private temporary (will be public in the near future).
Since from 2.0, spark.mllib package will be in maintenance mode, so we
strongly recommend users to use the DataFrame-based spark.ml API.

Thanks
Yanbo

2016-08-17 11:46 GMT-07:00 Michał Zieliński :

> I'm using Spark 1.6.2 for Vector-based UDAF and this works:
>
> def inputSchema: StructType = new StructType().add("input", new
> VectorUDT())
>
> Maybe it was made private in 2.0
>
> On 17 August 2016 at 05:31, Alexey Svyatkovskiy 
> wrote:
>
>> Hi Yanbo,
>>
>> Thanks for your reply. I will keep an eye on that pull request.
>> For now, I decided to just put my code inside org.apache.spark.ml to be
>> able to access private classes.
>>
>> Thanks,
>> Alexey
>>
>> On Tue, Aug 16, 2016 at 11:13 PM, Yanbo Liang  wrote:
>>
>>> It seams that VectorUDT is private and can not be accessed out of Spark
>>> currently. It should be public but we need to do some refactor before make
>>> it public. You can refer the discussion at https://github.com/apache/s
>>> park/pull/12259 .
>>>
>>> Thanks
>>> Yanbo
>>>
>>> 2016-08-16 9:48 GMT-07:00 alexeys :
>>>
 I am writing an UDAF to be applied to a data frame column of type Vector
 (spark.ml.linalg.Vector). I rely on spark/ml/linalg so that I do not
 have to
 go back and forth between dataframe and RDD.

 Inside the UDAF, I have to specify a data type for the input, buffer,
 and
 output (as usual). VectorUDT is what I would use with
 spark.mllib.linalg.Vector:
 https://github.com/apache/spark/blob/master/mllib/src/main/s
 cala/org/apache/spark/mllib/linalg/Vectors.scala

 However, when I try to import it from spark.ml instead: import
 org.apache.spark.ml.linalg.VectorUDT
 I get a runtime error (no errors during the build):

 class VectorUDT in package linalg cannot be accessed in package
 org.apache.spark.ml.linalg

 Is it expected/can you suggest a workaround?

 I am using Spark 2.0.0

 Thanks,
 Alexey



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/VectorUDT-with-spark-ml-linalg-Vector-
 tp27542.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>
>


Re: error when running spark from oozie launcher

2016-08-18 Thread Jean-Baptiste Onofré
It sounds like a mismatch in the spark version ship in oozie and the runtime 
one.

Regards
JB



On Aug 18, 2016, 07:36, at 07:36, tkg_cangkul  wrote:
>hi olivier, thx for your reply.
>
>this is the full stacktrace :
>
>Failing Oozie Launcher, Main class 
>[org.apache.oozie.action.hadoop.SparkMain], main() threw exception, 
>org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V
>java.lang.NoSuchMethodError: 
>org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V
>
>On 18/08/16 13:28, Olivier Girardot wrote:
>> this is not the full stacktrace, please post the full stacktrace if 
>> you want some help
>>
>>
>>
>> On Wed, Aug 17, 2016 7:24 PM, tkg_cangkul yuza.ras...@gmail.com 
>>  wrote:
>>
>> hi i try to submit job spark with oozie. but i've got one problem
>> here.
>> when i submit the same job. sometimes my job succeed but
>sometimes
>> my job was failed.
>>
>> i've got this error message when the job was failed :
>>
>>
>org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V
>>
>> anyone can help me to solve this? i've try to set
>> -XX:MaxPermSize=512m  -XX:PermSize=256m in
>> spark.driver.extraJavaOptions properties but this not help enough
>> for me. 
>>
>>
>>
>> **Olivier Girardot| Associé
>> o.girar...@lateral-thoughts.com
>
>> +33 6 24 09 17 94


Re: error when running spark from oozie launcher

2016-08-18 Thread tkg_cangkul

hi olivier, thx for your reply.

this is the full stacktrace :

Failing Oozie Launcher, Main class 
[org.apache.oozie.action.hadoop.SparkMain], main() threw exception, 
org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V
java.lang.NoSuchMethodError: 
org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V


On 18/08/16 13:28, Olivier Girardot wrote:
this is not the full stacktrace, please post the full stacktrace if 
you want some help




On Wed, Aug 17, 2016 7:24 PM, tkg_cangkul yuza.ras...@gmail.com 
 wrote:


hi i try to submit job spark with oozie. but i've got one problem
here.
when i submit the same job. sometimes my job succeed but sometimes
my job was failed.

i've got this error message when the job was failed :


org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V

anyone can help me to solve this? i've try to set
-XX:MaxPermSize=512m  -XX:PermSize=256m in
spark.driver.extraJavaOptions properties but this not help enough
for me. 




**Olivier Girardot| Associé
o.girar...@lateral-thoughts.com 
+33 6 24 09 17 94




Re: Aggregations with scala pairs

2016-08-18 Thread Jean-Baptiste Onofré
Agreed.

Regards
JB



On Aug 18, 2016, 07:32, at 07:32, Olivier Girardot 
 wrote:
>CC'ing dev list, you should open a Jira and a PR related to it to
>discuss it c.f.
>https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingCodeChanges
>
>
>
>
>
>On Wed, Aug 17, 2016 4:01 PM, Andrés Ivaldi iaiva...@gmail.com wrote:
>Hello, I'd like to report a wrong behavior of DataSet's API, I don´t
>know how I
>can do that. My Jira account doesn't allow me to add a Issue
>I'm using Apache 2.0.0 but the problem came since at least version 1.4
>(given
>the doc since 1.3)
>The problem is simple to reporduce, also the work arround, if we apply
>agg over
>a DataSet with scala pairs over the same column, only one agg over that
>column
>is actualy used, this is because the toMap that reduce the pair values
>of the
>mane key to one and overwriting the value
>class 
>https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
>
>
>def agg(aggExpr: (String, String), aggExprs: (String, String)*):
>DataFrame = {
>agg((aggExpr +: aggExprs).toMap)
>}
>rewrited as somthing like this should work def agg(aggExpr: (String,
>String), aggExprs: (String, String)*): DataFrame = {
>toDF((aggExpr +: aggExprs).map { pairExpr =>
>strToExpr(pairExpr._2)(df(pairExpr._1).expr) }.toSeq) }
>
>regards --
>Ing. Ivaldi Andres
>
>
>Olivier Girardot | Associé
>o.girar...@lateral-thoughts.com
>+33 6 24 09 17 94


Re: Aggregations with scala pairs

2016-08-18 Thread Olivier Girardot
CC'ing dev list, you should open a Jira and a PR related to it to discuss it 
c.f.
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingCodeChanges





On Wed, Aug 17, 2016 4:01 PM, Andrés Ivaldi iaiva...@gmail.com wrote:
Hello, I'd like to report a wrong behavior of DataSet's API, I don´t know how I
can do that. My Jira account doesn't allow me to add a Issue
I'm using Apache 2.0.0 but the problem came since at least version 1.4 (given
the doc since 1.3)
The problem is simple to reporduce, also the work arround, if we apply agg over
a DataSet with scala pairs over the same column, only one agg over that column
is actualy used, this is because the toMap that reduce the pair values of the
mane key to one and overwriting the value
class 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala


def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
agg((aggExpr +: aggExprs).toMap)
}
rewrited as somthing like this should work def agg(aggExpr: (String, String), 
aggExprs: (String, String)*): DataFrame = {
toDF((aggExpr +: aggExprs).map { pairExpr => 
strToExpr(pairExpr._2)(df(pairExpr._1).expr) }.toSeq) }

regards --
Ing. Ivaldi Andres


Olivier Girardot | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Spark DF CacheTable method. Will it save data to disk?

2016-08-18 Thread Olivier Girardot
that's another "pipeline" step to add whereas when using persist is just
relevant during the lifetime of your jobs and not in HDFS but in the local disk
of your executors.





On Wed, Aug 17, 2016 5:56 PM, neil90 neilp1...@icloud.com wrote:
>From the spark


documentation(http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)

yes you can use persist on a dataframe instead of cache. All cache is, is a

shorthand for the default persist storage level "MEMORY_ONLY". If you want

to persist the dataframe to disk you should do

dataframe.persist(StorageLevel.DISK_ONLY).




IMO If reads are expensive against the DB and your afraid of failure why not

just save the data as a parquet on your cluster in hive and read from there?










--

View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DF-CacheTable-method-Will-it-save-data-to-disk-tp27533p27551.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org









Olivier Girardot | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: error when running spark from oozie launcher

2016-08-18 Thread Olivier Girardot
this is not the full stacktrace, please post the full stacktrace if you want
some help





On Wed, Aug 17, 2016 7:24 PM, tkg_cangkul yuza.ras...@gmail.com wrote:
hi i try to submit job spark with oozie. but i've got one problem here.
when i submit the same job. sometimes my job succeed but sometimes my job was
failed.

i've got this error message when the job was failed :


org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V

anyone can help me to solve this? i've try to set -XX:MaxPermSize=512m
-XX:PermSize=256m in spark. driver. extraJavaOptions properties but this not 
help enough for me.

Olivier Girardot | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Spark SQL 1.6.1 issue

2016-08-18 Thread Olivier Girardot
your executors/driver must not have the multiple versions of spark in classpath,
it may come from the cassandra connector check the pom dependencies of the
version you fetched and if it's compatible with your spark version.





On Thu, Aug 18, 2016 6:05 AM, thbeh th...@thbeh.com wrote:
Running the query below I have been hitting - local class incompatible

exception, anyone know the cause?




val rdd = csc.cassandraSql("""select *, concat('Q', d_qoy) as qoy from

store_sales join date_dim on ss_sold_date_sk = d_date_sk join item on

ss_item_sk =


i_item_sk""").groupBy("i_category").pivot("qoy").agg(round(sum("ss_sales_price")/100,2))




The source data is from TPCDS test data and I am running in Zeppelin.







/INFO [2016-08-18 03:15:58,429] ({task-result-getter-2}

Logging.scala[logInfo]:58) - Lost task 3.0 in stage 3.0 (TID 52) on executor

ceph5.example.my: java.io.InvalidClassException

(org.apache.spark.sql.catalyst.expressions.Literal; local class

incompatible: stream classdesc serialVersionUID = 3305180847846277455, local

class serialVersionUID = -4259705229845269663) [duplicate 1]

INFO [2016-08-18 03:15:58,429] ({task-result-getter-3}

Logging.scala[logInfo]:58) - Lost task 2.0 in stage 3.0 (TID 51) on executor

ceph5.example.my: java.io.InvalidClassException

(org.apache.spark.sql.catalyst.expressions.Literal; local class

incompatible: stream classdesc serialVersionUID = 3305180847846277455, local

class serialVersionUID = -4259705229845269663) [duplicate 2]

INFO [2016-08-18 03:15:58,430] ({task-result-getter-3}

Logging.scala[logInfo]:58) - Lost task 6.0 in stage 3.0 (TID 55) on executor

ceph5.example.my: java.io.InvalidClassException

(org.apache.spark.sql.catalyst.expressions.Literal; local class

incompatible: stream classdesc serialVersionUID = 3305180847846277455, local

class serialVersionUID = -4259705229845269663) [duplicate 3]/




Thanks










--

View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-6-1-issue-tp27554.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org









Olivier Girardot | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94