RE: Spark and Kafka integration

2017-01-12 Thread Phadnis, Varun
Cool! Thanks for your inputs Jacek and Mark!

From: Mark Hamstra [mailto:m...@clearstorydata.com]
Sent: 13 January 2017 12:59
To: Phadnis, Varun 
Cc: user@spark.apache.org
Subject: Re: Spark and Kafka integration

See "API compatibility" in http://spark.apache.org/versioning-policy.html

While code that is annotated as Experimental is still a good faith effort to 
provide a stable and useful API, the fact is that we're not yet confident 
enough that we've got the public API in exactly the form that we want to commit 
to maintaining until at least the next major release.  That means that the API 
may change in the next minor/feature-level release (but it shouldn't in a 
patch/bugfix-level release), which would require that your source code be 
rewritten to use the new API.  In the most extreme case, we may decide that the 
experimental code didn't work out the way we wanted, so it could be withdrawn 
entirely.  Complete withdrawal of the Kafka code is unlikely, but it may well 
change in incompatible way with future releases even before Spark 3.0.0.

On Thu, Jan 12, 2017 at 5:57 AM, Phadnis, Varun 
> wrote:
Hello,

We are using  Spark 2.0 with Kafka 0.10.

As I understand, much of the API packaged in the following dependency we are 
targeting is marked as “@Experimental”


org.apache.spark
spark-streaming-kafka-0-10_2.11
2.0.0


What are implications of this being marked as experimental? Are they stable 
enough for production?

Thanks,
Varun




Re: Re: Re: how to change datatype by useing StructType

2017-01-12 Thread lk_spark
a better way to answer my question:  use GenericRow instead of Row

val rows: RDD[Row] = spark.sparkContext.textFile("/sourcedata/test/test1").map {
  line =>
{
  val attributes: Array[String] = line.split(",")
  val ab = ArrayBuffer[Any]()
  
  for (i <- 0 until schemaType.length) {
if (schemaType(i).equalsIgnoreCase("int")) {
  ab += attributes(i).toInt
} else if (schemaType(i).equalsIgnoreCase("long")) {
  ab += attributes(i).toLong
} else {
  ab += attributes(i)
}
  }

  new GenericRow(ab.toArray)
}
}

2017-01-13 

lk_spark 



发件人:"lk_spark"
发送时间:2017-01-13 09:49
主题:Re: Re: Re: how to change datatype by useing StructType
收件人:"Nicholas Hakobian"
抄送:"user.spark"

Thank you Nicholas , if the sourcedata was csv format ,CSV reader  works well.

2017-01-13 

lk_spark 



发件人:Nicholas Hakobian 
发送时间:2017-01-13 08:35
主题:Re: Re: Re: how to change datatype by useing StructType
收件人:"lk_spark"
抄送:"ayan guha","user.spark"

Have you tried the native CSV reader (in spark 2) or the Databricks CSV reader 
(in 1.6). 


If your format is in a CSV like format it'll load it directly into a DataFrame. 
Its possible you have some rows where types are inconsistent.


Nicholas Szandor Hakobian, Ph.D. 
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com




On Thu, Jan 12, 2017 at 1:52 AM, lk_spark  wrote:

I have try like this:
  
  val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*")
  val rowRDD = peopleRDD.map(_.split(",")).map(attributes => {
  val ab = ArrayBuffer[Any]()
  for (i <- 0 until schemaType.length) {
if (schemaType(i).equalsIgnoreCase("int")) {
  ab += attributes(i).toInt
} else if (schemaType(i).equalsIgnoreCase("long")) {
  ab += attributes(i).toLong
} else {
  ab += attributes(i)
}
  }
  Row(ab.toArray)
})

val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF .show

I got error:
 Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a valid 
external type for schema of string
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)

all the file was Any, what should I do?



2017-01-12 

lk_spark 



发件人:"lk_spark"
发送时间:2017-01-12 14:38
主题:Re: Re: how to change datatype by useing StructType
收件人:"ayan guha","user.spark"
抄送:

yes, field year is in my data:

data:
  kevin,30,2016
  shen,30,2016
  kai,33,2016
  wei,30,2016

this will not work 
   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 
Row(attributes(0),attributes(1),attributes(2)))
but I need read data by configurable.
2017-01-12 

lk_spark 



发件人:ayan guha 
发送时间:2017-01-12 14:34
主题:Re: how to change datatype by useing StructType
收件人:"lk_spark","user.spark"
抄送:

Do you have year in your data?

On Thu, 12 Jan 2017 at 5:24 pm, lk_spark  wrote:





















hi,all



I have a txt file ,and I want to process it as dataframe 

:







data like this :



   name1,30



   name2,18







val schemaString = "name age year"


val xMap=new 

scala.collection.mutable.HashMap[String,DataType]()


xMap.put("name", StringType)
xMap.put("age", 

IntegerType)
xMap.put("year", 

IntegerType)

val fields = 

schemaString.split(" ").map(fieldName => StructField(fieldName, 

xMap.get(fieldName).get, nullable = true))
val schema = 

StructType(fields)

val peopleRDD = 

spark.sparkContext.textFile("/sourcedata/test/test*")


//spark.read.schema(schema).text("/sourcedata/test/test*")



val rowRDD = peopleRDD.map(_.split(",")).map(attributes 

=> Row(attributes(0),attributes(1))







// Apply the schema to the RDD
val 

peopleDF = spark.createDataFrame(rowRDD, schema)  







but when I write it to table or show it I will got 

error:











   Caused by: java.lang.RuntimeException: Error while encoding: 

java.lang.RuntimeException: java.lang.String is not a valid external type for 

schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top 

level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 


Re: Sporadic ClassNotFoundException with Kryo

2017-01-12 Thread Nirmal Fernando
I faced a similar issue and had to do two things;

1. Submit Kryo jar with the spark-submit
2. Set spark.executor.userClassPathFirst true in Spark conf

On Fri, Nov 18, 2016 at 7:39 PM, chrism 
wrote:

> Regardless of the different ways we have tried deploying a jar together
> with
> Spark, when running a Spark Streaming job with Kryo as serializer on top of
> Mesos, we sporadically get the following error (I have truncated a bit):
>
> /16/11/18 08:39:10 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
> java.lang.RuntimeException: org.apache.spark.SparkException: Failed to
> register classes with Kryo
>   at
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSeria
> lizer.scala:129)
>   at
> org.apache.spark.serializer.KryoSerializerInstance.borrowKry
> o(KryoSerializer.scala:274)
> ...
>   at
> org.apache.spark.serializer.SerializerManager.dataSerializeS
> tream(SerializerManager.scala:125)
>   at
> org.apache.spark.storage.BlockManager$$anonfun$dropFromMemor
> y$3.apply(BlockManager.scala:1265)
>   at
> org.apache.spark.storage.BlockManager$$anonfun$dropFromMemor
> y$3.apply(BlockManager.scala:1261)
> ...
> Caused by: java.lang.ClassNotFoundException: cics.udr.compound_ran_udr
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)/
>
> where "cics.udr.compound_ran_udr" is a class provided by us in a jar.
>
> We know that the jar containing "cics.udr.compound_ran_udr" is being
> deployed and works because it is listed in the "Environment" tab in the
> GUI,
> and calculations using this class succeed.
>
> We have tried the following methods of deploying the jar containing the
> class
>  * Through --jars in spark-submit
>  * Through SparkConf.setJar
>  * Through spark.driver.extraClassPath and spark.executor.extraClassPath
>  * By having it as the main jar used by spark-submit
> with no luck. The logs (see attached) recognize that the jar is being added
> to the classloader.
>
> We have tried registering the class using
>  * SparkConf.registerKryoClasses.
>  * spark.kryo.classesToRegister
> with no luck.
>
> We are running on Mesos and the jar has been deployed on every machine on
> the local file system in the same location.
>
> I would be very grateful for any help or ideas :)
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Sporadic-ClassNotFoundException-with-K
> ryo-tp28104.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 

Thanks & regards,
Nirmal

Associate Technical Lead - Data Technologies Team, WSO2 Inc.
Mobile: +94715779733 <+94%2071%20577%209733>
Blog: http://nirmalfdo.blogspot.com/


Re: Data frame writing

2017-01-12 Thread Rajendra Bhat
initially my there is no dir, directory which created by spark job. it
should empty while job execution. df write itself create first file and
trying to overwrite it seems.



On Fri, Jan 13, 2017 at 11:42 AM, Amrit Jangid 
wrote:

> Hi Rajendra,
>
> It says your directory is not empty *s3n://**buccketName/cip/daily_date.*
>
> Try to use save *mode. eg *
>
> df.write.mode(SaveMode.Overwrite).partitionBy("date").f
> ormat("com.databricks.spark.csv").option("delimiter",
> "#").option("codec", "org.apache.hadoop.io.compress
> .GzipCodec").save("s3n://buccketName/cip/daily_date" )
>
>  Hope it helps.
>
> Regards
> Amrit
>
>
>
> On Fri, Jan 13, 2017 at 11:32 AM, Rajendra Bhat 
> wrote:
>
>> Hi team,
>>
>> I am reading N number of csv and writing file based date partition. date
>> is one column, it has integer value(ex 20170101)
>>
>>
>>  val df = spark.read
>> .format("com.databricks.spark.csv")
>> .schema(schema)
>> .option("delimiter","#")
>> .option("nullValue","")
>> .option("treatEmptyValuesAsNulls","true")
>> .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
>>
>> .load(filename)
>> 
>> df.write.partitionBy("date").format("com.databricks.spark.csv").option("delimiter",
>> "#").option("codec", "org.apache.hadoop.io.compress
>> .GzipCodec").save("s3n://buccketName/cip/daily_date" )
>>
>> above code troughs bellow error, in middle of execution.
>> s3n://buccketName/cip/daily_date empty location while intilize job.
>>
>> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already 
>> exists: 
>> s3n:///cip/daily_date/date=20110418/part-r-00082-912033b1-a278-46a8-bf8d-0f97f493e3d8.csv.gz
>>  at 
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:405)
>>  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
>>  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:894)
>>  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:791)
>>  at 
>> org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
>>  at 
>> org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.(CSVRelation.scala:191)
>>  at 
>> org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:169)
>>  at 
>> org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131)
>>
>>  ... 14 more
>>
>> Please suggest why this error is coming and suggest solution
>>
>> Thanks and
>> Regards
>>
>> --
>> Thanks and
>> Regards
>>
>> Rajendra Bhat
>>
>
>
>
> --
>
> Regards,
> Amrit
> Data Team
>



-- 
Thanks and
Regards

Rajendra Bhat


Re: Data frame writing

2017-01-12 Thread Amrit Jangid
Hi Rajendra,

It says your directory is not empty *s3n://**buccketName/cip/daily_date.*

Try to use save *mode. eg *

df.write.mode(SaveMode.Overwrite).partitionBy("date").f
ormat("com.databricks.spark.csv").option("delimiter", "#").option("codec", "
org.apache.hadoop.io.compress.GzipCodec").save("s3n://buccketName/cip/daily_date"
)

 Hope it helps.

Regards
Amrit



On Fri, Jan 13, 2017 at 11:32 AM, Rajendra Bhat 
wrote:

> Hi team,
>
> I am reading N number of csv and writing file based date partition. date
> is one column, it has integer value(ex 20170101)
>
>
>  val df = spark.read
> .format("com.databricks.spark.csv")
> .schema(schema)
> .option("delimiter","#")
> .option("nullValue","")
> .option("treatEmptyValuesAsNulls","true")
> .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
>
> .load(filename)
> 
> df.write.partitionBy("date").format("com.databricks.spark.csv").option("delimiter",
> "#").option("codec", "org.apache.hadoop.io.compress
> .GzipCodec").save("s3n://buccketName/cip/daily_date" )
>
> above code troughs bellow error, in middle of execution.
> s3n://buccketName/cip/daily_date empty location while intilize job.
>
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already 
> exists: 
> s3n:///cip/daily_date/date=20110418/part-r-00082-912033b1-a278-46a8-bf8d-0f97f493e3d8.csv.gz
>   at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:405)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:894)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:791)
>   at 
> org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
>   at 
> org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.(CSVRelation.scala:191)
>   at 
> org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:169)
>   at 
> org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131)
>
>  ... 14 more
>
> Please suggest why this error is coming and suggest solution
>
> Thanks and
> Regards
>
> --
> Thanks and
> Regards
>
> Rajendra Bhat
>



-- 

Regards,
Amrit
Data Team


Data frame writing

2017-01-12 Thread Rajendra Bhat
Hi team,

I am reading N number of csv and writing file based date partition. date is
one column, it has integer value(ex 20170101)


 val df = spark.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("delimiter","#")
.option("nullValue","")
.option("treatEmptyValuesAsNulls","true")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")

.load(filename)

df.write.partitionBy("date").format("com.databricks.spark.csv").option("delimiter",
"#").option("codec", "org.apache.hadoop.io.compress.GzipCodec").save("
s3n://buccketName/cip/daily_date" )

above code troughs bellow error, in middle of execution.
s3n://buccketName/cip/daily_date empty location while intilize job.

Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File
already exists:
s3n:///cip/daily_date/date=20110418/part-r-00082-912033b1-a278-46a8-bf8d-0f97f493e3d8.csv.gz
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:405)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:894)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:791)
at 
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
at 
org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.(CSVRelation.scala:191)
at 
org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:169)
at 
org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131)

 ... 14 more

Please suggest why this error is coming and suggest solution

Thanks and
Regards

-- 
Thanks and
Regards

Rajendra Bhat


Re: Can't load a RandomForestClassificationModel in Spark job

2017-01-12 Thread Sumona Routh
Yes, I save it to S3 in a different process. It is actually the
RandomForestClassificationModel.load method (passed an s3 path) where I run
into problems.
When you say you load it during map stages, do you mean that you are able
to directly load a model from inside of a transformation? When I try this,
it passes the function to a worker, and the load method itself appears to
attempt to create a new SparkContext, which causes an NPE downstream
(because creating a SparkContext on the worker is not an appropriate thing
to do, according to various threads I've found).

Maybe there is a different load function I should be using?

Thanks!
Sumona

On Thu, Jan 12, 2017 at 6:26 PM ayan guha  wrote:

> Hi
>
> Given training and predictions are two different applications, I typically
> save model objects to hdfs and load it back during prediction map stages.
>
> Best
> Ayan
>
> On Fri, 13 Jan 2017 at 5:39 am, Sumona Routh  wrote:
>
> Hi all,
> I've been working with Spark mllib 2.0.2 RandomForestClassificationModel.
>
> I encountered two frustrating issues and would really appreciate some
> advice:
>
> 1)  RandomForestClassificationModel is effectively not serializable (I
> assume it's referencing something that can't be serialized, since it itself
> extends serializable), so I ended up with the well-known exception:
> org.apache.spark.SparkException: Task not serializable.
> Basically, my original intention was to pass the model as a parameter
>
> because which model we use is dynamic based on what record we are
>
> predicting on.
>
> Has anyone else encountered this? Is this currently being addressed? I
> would expect objects from Spark's own libraries be able to be used
> seamlessly in their applications without these types of exceptions.
>
> 2) The RandomForestClassificationModel.load method appears to hang
> indefinitely when executed from inside a map function (which I assume is
> passed to the executor). So, I basically cannot load a model from a worker.
> We have multiple "profiles" that use differently trained models, which are
> accessed from within a map function to run predictions on different sets of
> data.
> The thread that is hanging has this as the latest (most pertinent) code:
>
> org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:391)
> Looking at the code in github, it appears that it is calling sc.textFile.
> I could not find anything stating that this particular function would not
> work from within a map function.
>
> Are there any suggestions as to how I can get this model to work on a real
> production job (either by allowing it to be serializable and passed around
> or loaded from a worker)?
>
> I've extenisvely POCed this model (saving, loading, transforming,
> training, etc.), however this is the first time I'm attempting to use it
> from within a real application.
>
> Sumona
>
>


Re: Re: Re: how to change datatype by useing StructType

2017-01-12 Thread lk_spark
Thank you Nicholas , if the sourcedata was csv format ,CSV reader  works well.

2017-01-13 

lk_spark 



发件人:Nicholas Hakobian 
发送时间:2017-01-13 08:35
主题:Re: Re: Re: how to change datatype by useing StructType
收件人:"lk_spark"
抄送:"ayan guha","user.spark"

Have you tried the native CSV reader (in spark 2) or the Databricks CSV reader 
(in 1.6).


If your format is in a CSV like format it'll load it directly into a DataFrame. 
Its possible you have some rows where types are inconsistent.


Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com




On Thu, Jan 12, 2017 at 1:52 AM, lk_spark  wrote:

I have try like this:
  
  val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*")
  val rowRDD = peopleRDD.map(_.split(",")).map(attributes => {
  val ab = ArrayBuffer[Any]()
  for (i <- 0 until schemaType.length) {
if (schemaType(i).equalsIgnoreCase("int")) {
  ab += attributes(i).toInt
} else if (schemaType(i).equalsIgnoreCase("long")) {
  ab += attributes(i).toLong
} else {
  ab += attributes(i)
}
  }
  Row(ab.toArray)
})

val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF .show

I got error:
 Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a valid 
external type for schema of string
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)

all the file was Any, what should I do?



2017-01-12 

lk_spark 



发件人:"lk_spark"
发送时间:2017-01-12 14:38
主题:Re: Re: how to change datatype by useing StructType
收件人:"ayan guha","user.spark"
抄送:

yes, field year is in my data:

data:
  kevin,30,2016
  shen,30,2016
  kai,33,2016
  wei,30,2016

this will not work 
   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 
Row(attributes(0),attributes(1),attributes(2)))
but I need read data by configurable.
2017-01-12 

lk_spark 



发件人:ayan guha 
发送时间:2017-01-12 14:34
主题:Re: how to change datatype by useing StructType
收件人:"lk_spark","user.spark"
抄送:

Do you have year in your data?

On Thu, 12 Jan 2017 at 5:24 pm, lk_spark  wrote:





















hi,all



I have a txt file ,and I want to process it as dataframe 

:







data like this :



   name1,30



   name2,18







val schemaString = "name age year"


val xMap=new 

scala.collection.mutable.HashMap[String,DataType]()


xMap.put("name", StringType)
xMap.put("age", 

IntegerType)
xMap.put("year", 

IntegerType)

val fields = 

schemaString.split(" ").map(fieldName => StructField(fieldName, 

xMap.get(fieldName).get, nullable = true))
val schema = 

StructType(fields)

val peopleRDD = 

spark.sparkContext.textFile("/sourcedata/test/test*")


//spark.read.schema(schema).text("/sourcedata/test/test*")



val rowRDD = peopleRDD.map(_.split(",")).map(attributes 

=> Row(attributes(0),attributes(1))







// Apply the schema to the RDD
val 

peopleDF = spark.createDataFrame(rowRDD, schema)  







but when I write it to table or show it I will got 

error:











   Caused by: java.lang.RuntimeException: Error while encoding: 

java.lang.RuntimeException: java.lang.String is not a valid external type for 

schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top 

level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true) AS name#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, 

true], top level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true)







   if I change my code it will work:



   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 

Row(attributes(0),attributes(1).toInt)



   but this is not a good idea .







2017-01-12







lk_spark 

Re: Can't load a RandomForestClassificationModel in Spark job

2017-01-12 Thread ayan guha
Hi

Given training and predictions are two different applications, I typically
save model objects to hdfs and load it back during prediction map stages.

Best
Ayan
On Fri, 13 Jan 2017 at 5:39 am, Sumona Routh  wrote:

> Hi all,
> I've been working with Spark mllib 2.0.2 RandomForestClassificationModel.
>
> I encountered two frustrating issues and would really appreciate some
> advice:
>
> 1)  RandomForestClassificationModel is effectively not serializable (I
> assume it's referencing something that can't be serialized, since it itself
> extends serializable), so I ended up with the well-known exception:
> org.apache.spark.SparkException: Task not serializable.
> Basically, my original intention was to pass the model as a parameter
>
> because which model we use is dynamic based on what record we are
>
> predicting on.
>
> Has anyone else encountered this? Is this currently being addressed? I
> would expect objects from Spark's own libraries be able to be used
> seamlessly in their applications without these types of exceptions.
>
> 2) The RandomForestClassificationModel.load method appears to hang
> indefinitely when executed from inside a map function (which I assume is
> passed to the executor). So, I basically cannot load a model from a worker.
> We have multiple "profiles" that use differently trained models, which are
> accessed from within a map function to run predictions on different sets of
> data.
> The thread that is hanging has this as the latest (most pertinent) code:
>
> org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:391)
> Looking at the code in github, it appears that it is calling sc.textFile.
> I could not find anything stating that this particular function would not
> work from within a map function.
>
> Are there any suggestions as to how I can get this model to work on a real
> production job (either by allowing it to be serializable and passed around
> or loaded from a worker)?
>
> I've extenisvely POCed this model (saving, loading, transforming,
> training, etc.), however this is the first time I'm attempting to use it
> from within a real application.
>
> Sumona
>


Any suggestions for dbScan

2017-01-12 Thread shobhit gupta
Hi Everyone,

Is there any suggestion for dbScan scala implementation?
My application code is running on Spark 2.0 but any suggestion is fine.

-- 
Regards ,

Shobhit G


problem detecting FAILED state with SparkLauncher and SparkAppHandle

2017-01-12 Thread adam kramer
Hi All -

I'm having an issue with detecting a failed Spark application state when
using the startApplication method and SparkAppHandle with the SparkLauncher
in Spark 2.0.1.

Previous I had used a Java Process to waitFor it to return an non-zero exit
code to detect failure which worked. But when the same app fails with a
non-zero exit code, the SparkAppHandle (final) State returned is FINISHED,
not FAILED. Which is the same result when the process/app exits with a 0
exit code.

Has anyone else experience or solved this issue?

Thanks,
Adam


Re: Re: Re: how to change datatype by useing StructType

2017-01-12 Thread Nicholas Hakobian
Have you tried the native CSV reader (in spark 2) or the Databricks CSV
reader (in 1.6).

If your format is in a CSV like format it'll load it directly into a
DataFrame. Its possible you have some rows where types are inconsistent.

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Thu, Jan 12, 2017 at 1:52 AM, lk_spark  wrote:

> I have try like this:
>
>   val peopleRDD = spark.sparkContext.textFile("/
> sourcedata/test/test*")
>   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => {
>   val ab = ArrayBuffer[Any]()
>   for (i <- 0 until schemaType.length) {
> if (schemaType(i).equalsIgnoreCase("int")) {
>   ab += attributes(i).toInt
> } else if (schemaType(i).equalsIgnoreCase("long")) {
>   ab += attributes(i).toLong
> } else {
>   ab += attributes(i)
> }
>   }
>   Row(ab.toArray)
> })
>
> val peopleDF = spark.createDataFrame(rowRDD, schema)
> peopleDF .show
>
> I got error:
>  Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a
> valid external type for schema of string
>   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection.apply_0$(Unknown Source)
>   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection.apply(Unknown Source)
>   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.
> toRow(ExpressionEncoder.scala:290)
> all the file was Any, what should I do?
>
>
>
> 2017-01-12
> --
> lk_spark
> --
>
> *发件人:*"lk_spark"
> *发送时间:*2017-01-12 14:38
> *主题:*Re: Re: how to change datatype by useing StructType
> *收件人:*"ayan guha","user.spark"
> *抄送:*
>
> yes, field year is in my data:
>
> data:
>   kevin,30,2016
>   shen,30,2016
>   kai,33,2016
>   wei,30,2016
>
> this will not work
>val rowRDD = peopleRDD.map(_.split(",")).map(attributes =>
> Row(attributes(0),attributes(1),attributes(2)))
> but I need read data by configurable.
> 2017-01-12
> --
> lk_spark
> --
>
> *发件人:*ayan guha 
> *发送时间:*2017-01-12 14:34
> *主题:*Re: how to change datatype by useing StructType
> *收件人:*"lk_spark","user.spark"
> *抄送:*
>
> Do you have year in your data?
> On Thu, 12 Jan 2017 at 5:24 pm, lk_spark  wrote:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> hi,all
>>
>>
>> I have a txt file ,and I want to process it as dataframe
>>
>> :
>>
>>
>>
>>
>>
>> data like this :
>>
>>
>>name1,30
>>
>>
>>name2,18
>>
>>
>>
>>
>>
>> val schemaString = "name age year"
>>
>>
>> val xMap=new
>>
>> scala.collection.mutable.HashMap[String,DataType]()
>>
>>
>> xMap.put("name", StringType)
>> xMap.put("age",
>>
>> IntegerType)
>> xMap.put("year",
>>
>> IntegerType)
>>
>> val fields =
>>
>> schemaString.split(" ").map(fieldName => StructField(fieldName,
>>
>> xMap.get(fieldName).get, nullable = true))
>> val schema =
>>
>> StructType(fields)
>>
>> val peopleRDD =
>>
>> spark.sparkContext.textFile("/sourcedata/test/test*")
>>
>>
>> //spark.read.schema(schema).text("/sourcedata/test/test*")
>>
>>
>>
>> val rowRDD = peopleRDD.map(_.split(",")).map(attributes
>>
>> => Row(attributes(0),attributes(1))
>>
>>
>>
>>
>>
>> // Apply the schema to the RDD
>> val
>>
>> peopleDF = spark.createDataFrame(rowRDD, schema)
>>
>>
>>
>>
>>
>> but when I write it to table or show it I will got
>>
>> error:
>>
>>
>>
>>
>>
>>
>>
>>
>>Caused by: java.lang.RuntimeException: Error while encoding:
>>
>> java.lang.RuntimeException: java.lang.String is not a valid external type
>> for
>>
>> schema of int
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top
>>
>> level row object).isNullAt) null else staticinvoke(class
>>
>> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>>
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>>
>> org.apache.spark.sql.Row, true], top level row object), 0, name),
>> StringType),
>>
>> true) AS name#1
>> +- if (assertnotnull(input[0, org.apache.spark.sql.Row,
>>
>> true], top level row object).isNullAt) null else staticinvoke(class
>>
>> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>>
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>>
>> org.apache.spark.sql.Row, true], top level row object), 0, name),
>> StringType),
>>
>> true)
>>
>>
>>
>>
>>
>>if I change my code it will work:
>>
>>
>>val rowRDD = peopleRDD.map(_.split(",")).map(attributes =>
>>
>> Row(attributes(0),attributes(1).toInt)
>>
>>
>>but this is not a good idea .
>>
>>
>>
>>
>> 2017-01-12
>>
>>
>> --
>>
>>
>> lk_spark
>>
>


Re: spark locality

2017-01-12 Thread Michael Gummelt
If the executor reports a different hostname inside the CNI container, then
no, I don't think so.

On Thu, Jan 12, 2017 at 2:28 PM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> So even if I make the Spark executors run on the same node as Casssandra
> nodes, I am not sure each worker will connect to c* nodes on the same mesos
> agent ?
>
> 2017-01-12 21:13 GMT+01:00 Michael Gummelt :
>
>> The code in there w/ docs that reference CNI doesn't actually run when
>> CNI is in effect, and doesn't have anything to do with locality.  It's just
>> making Spark work in a no-DNS environment
>>
>> On Thu, Jan 12, 2017 at 12:04 PM, vincent gromakowski <
>> vincent.gromakow...@gmail.com> wrote:
>>
>>> I have found this but I am not sure how it can help...
>>> https://github.com/mesosphere/spark-build/blob/a9efef8850976
>>> f787956660262f3b77cd636f3f5/conf/spark-env.sh
>>>
>>>
>>> 2017-01-12 20:16 GMT+01:00 Michael Gummelt :
>>>
 That's a good point. I hadn't considered the locality implications of
 CNI yet.  I think tasks are placed based on the hostname reported by the
 executor, which in a CNI container will be different than the
 HDFS/Cassandra hostname.  I'm not aware of anyone running Spark+CNI in prod
 yet, either.

 However, locality in Mesos isn't great right now anyway.  Executors are
 placed w/o regard to locality.  Locality is only taken into account when
 tasks are assigned to executors.  So if you get a locality-poor executor
 placement, you'll also have locality poor task placement.  It could be
 better.

 On Thu, Jan 12, 2017 at 7:55 AM, vincent gromakowski <
 vincent.gromakow...@gmail.com> wrote:

> Hi all,
> Does anyone have experience running Spark on Mesos with CNI (ip per
> container) ?
> How would Spark use IP or hostname for data locality with backend
> framework like HDFS or Cassandra ?
>
> V
>



 --
 Michael Gummelt
 Software Engineer
 Mesosphere

>>>
>>>
>>
>>
>> --
>> Michael Gummelt
>> Software Engineer
>> Mesosphere
>>
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: spark locality

2017-01-12 Thread vincent gromakowski
So even if I make the Spark executors run on the same node as Casssandra
nodes, I am not sure each worker will connect to c* nodes on the same mesos
agent ?

2017-01-12 21:13 GMT+01:00 Michael Gummelt :

> The code in there w/ docs that reference CNI doesn't actually run when CNI
> is in effect, and doesn't have anything to do with locality.  It's just
> making Spark work in a no-DNS environment
>
> On Thu, Jan 12, 2017 at 12:04 PM, vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> I have found this but I am not sure how it can help...
>> https://github.com/mesosphere/spark-build/blob/a9efef8850976
>> f787956660262f3b77cd636f3f5/conf/spark-env.sh
>>
>>
>> 2017-01-12 20:16 GMT+01:00 Michael Gummelt :
>>
>>> That's a good point. I hadn't considered the locality implications of
>>> CNI yet.  I think tasks are placed based on the hostname reported by the
>>> executor, which in a CNI container will be different than the
>>> HDFS/Cassandra hostname.  I'm not aware of anyone running Spark+CNI in prod
>>> yet, either.
>>>
>>> However, locality in Mesos isn't great right now anyway.  Executors are
>>> placed w/o regard to locality.  Locality is only taken into account when
>>> tasks are assigned to executors.  So if you get a locality-poor executor
>>> placement, you'll also have locality poor task placement.  It could be
>>> better.
>>>
>>> On Thu, Jan 12, 2017 at 7:55 AM, vincent gromakowski <
>>> vincent.gromakow...@gmail.com> wrote:
>>>
 Hi all,
 Does anyone have experience running Spark on Mesos with CNI (ip per
 container) ?
 How would Spark use IP or hostname for data locality with backend
 framework like HDFS or Cassandra ?

 V

>>>
>>>
>>>
>>> --
>>> Michael Gummelt
>>> Software Engineer
>>> Mesosphere
>>>
>>
>>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


Is RAND() in SparkSQL deterministic when used on MySql data sources?

2017-01-12 Thread Gabriele Del Prete
Hi all,

We need to use the rand() function in Scala Spark SQL in our
application, but we discovered that it behavior was not deterministic, that
is, different invocations with the same  would result in different
values. This is documented in some bugs, for example:
https://issues.apache.org/jira/browse/SPARK-1 and it has to do with
partitioning.

So we refactored it by moving the rand() function from a query using Parquet
files on S3 as a datasource, to another query that we run on MySQL (still
using the Spark SLQ Scala API), assuming that MySQL quesries do not get
parallelized. Can we indeed safely assume that now rand() will be
deterministic, or does the source of non-deterministic behavior lie in the
Spark SQL engine rather than the specific datasource ?

Gabriele



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-RAND-in-SparkSQL-deterministic-when-used-on-MySql-data-sources-tp28302.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark locality

2017-01-12 Thread Michael Gummelt
The code in there w/ docs that reference CNI doesn't actually run when CNI
is in effect, and doesn't have anything to do with locality.  It's just
making Spark work in a no-DNS environment

On Thu, Jan 12, 2017 at 12:04 PM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> I have found this but I am not sure how it can help...
> https://github.com/mesosphere/spark-build/blob/
> a9efef8850976f787956660262f3b77cd636f3f5/conf/spark-env.sh
>
>
> 2017-01-12 20:16 GMT+01:00 Michael Gummelt :
>
>> That's a good point. I hadn't considered the locality implications of CNI
>> yet.  I think tasks are placed based on the hostname reported by the
>> executor, which in a CNI container will be different than the
>> HDFS/Cassandra hostname.  I'm not aware of anyone running Spark+CNI in prod
>> yet, either.
>>
>> However, locality in Mesos isn't great right now anyway.  Executors are
>> placed w/o regard to locality.  Locality is only taken into account when
>> tasks are assigned to executors.  So if you get a locality-poor executor
>> placement, you'll also have locality poor task placement.  It could be
>> better.
>>
>> On Thu, Jan 12, 2017 at 7:55 AM, vincent gromakowski <
>> vincent.gromakow...@gmail.com> wrote:
>>
>>> Hi all,
>>> Does anyone have experience running Spark on Mesos with CNI (ip per
>>> container) ?
>>> How would Spark use IP or hostname for data locality with backend
>>> framework like HDFS or Cassandra ?
>>>
>>> V
>>>
>>
>>
>>
>> --
>> Michael Gummelt
>> Software Engineer
>> Mesosphere
>>
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: spark locality

2017-01-12 Thread vincent gromakowski
I have found this but I am not sure how it can help...
https://github.com/mesosphere/spark-build/blob/a9efef8850976f787956660262f3b77cd636f3f5/conf/spark-env.sh


2017-01-12 20:16 GMT+01:00 Michael Gummelt :

> That's a good point. I hadn't considered the locality implications of CNI
> yet.  I think tasks are placed based on the hostname reported by the
> executor, which in a CNI container will be different than the
> HDFS/Cassandra hostname.  I'm not aware of anyone running Spark+CNI in prod
> yet, either.
>
> However, locality in Mesos isn't great right now anyway.  Executors are
> placed w/o regard to locality.  Locality is only taken into account when
> tasks are assigned to executors.  So if you get a locality-poor executor
> placement, you'll also have locality poor task placement.  It could be
> better.
>
> On Thu, Jan 12, 2017 at 7:55 AM, vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> Hi all,
>> Does anyone have experience running Spark on Mesos with CNI (ip per
>> container) ?
>> How would Spark use IP or hostname for data locality with backend
>> framework like HDFS or Cassandra ?
>>
>> V
>>
>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


Re: Spark and Kafka integration

2017-01-12 Thread Mark Hamstra
See "API compatibility" in http://spark.apache.org/versioning-policy.html

While code that is annotated as Experimental is still a good faith effort
to provide a stable and useful API, the fact is that we're not yet
confident enough that we've got the public API in exactly the form that we
want to commit to maintaining until at least the next major release.  That
means that the API may change in the next minor/feature-level release (but
it shouldn't in a patch/bugfix-level release), which would require that
your source code be rewritten to use the new API.  In the most extreme
case, we may decide that the experimental code didn't work out the way we
wanted, so it could be withdrawn entirely.  Complete withdrawal of the
Kafka code is unlikely, but it may well change in incompatible way with
future releases even before Spark 3.0.0.

On Thu, Jan 12, 2017 at 5:57 AM, Phadnis, Varun 
wrote:

> Hello,
>
>
>
> We are using  Spark 2.0 with Kafka 0.10.
>
>
>
> As I understand, much of the API packaged in the following dependency we
> are targeting is marked as “@Experimental”
>
>
>
> 
>
> org.apache.spark
>
> spark-streaming-kafka-0-10_2.11
>
> 2.0.0
>
> 
>
>
>
> What are implications of this being marked as experimental? Are they
> stable enough for production?
>
>
>
> Thanks,
>
> Varun
>
>
>


Re: spark locality

2017-01-12 Thread Michael Gummelt
That's a good point. I hadn't considered the locality implications of CNI
yet.  I think tasks are placed based on the hostname reported by the
executor, which in a CNI container will be different than the
HDFS/Cassandra hostname.  I'm not aware of anyone running Spark+CNI in prod
yet, either.

However, locality in Mesos isn't great right now anyway.  Executors are
placed w/o regard to locality.  Locality is only taken into account when
tasks are assigned to executors.  So if you get a locality-poor executor
placement, you'll also have locality poor task placement.  It could be
better.

On Thu, Jan 12, 2017 at 7:55 AM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Hi all,
> Does anyone have experience running Spark on Mesos with CNI (ip per
> container) ?
> How would Spark use IP or hostname for data locality with backend
> framework like HDFS or Cassandra ?
>
> V
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


Can't load a RandomForestClassificationModel in Spark job

2017-01-12 Thread Sumona Routh
Hi all,
I've been working with Spark mllib 2.0.2 RandomForestClassificationModel.

I encountered two frustrating issues and would really appreciate some
advice:

1)  RandomForestClassificationModel is effectively not serializable (I
assume it's referencing something that can't be serialized, since it itself
extends serializable), so I ended up with the well-known exception:
org.apache.spark.SparkException: Task not serializable.
Basically, my original intention was to pass the model as a parameter
because which model we use is dynamic based on what record we are
predicting on.

Has anyone else encountered this? Is this currently being addressed? I
would expect objects from Spark's own libraries be able to be used
seamlessly in their applications without these types of exceptions.

2) The RandomForestClassificationModel.load method appears to hang
indefinitely when executed from inside a map function (which I assume is
passed to the executor). So, I basically cannot load a model from a worker.
We have multiple "profiles" that use differently trained models, which are
accessed from within a map function to run predictions on different sets of
data.
The thread that is hanging has this as the latest (most pertinent) code:
org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:391)
Looking at the code in github, it appears that it is calling sc.textFile. I
could not find anything stating that this particular function would not
work from within a map function.

Are there any suggestions as to how I can get this model to work on a real
production job (either by allowing it to be serializable and passed around
or loaded from a worker)?

I've extenisvely POCed this model (saving, loading, transforming, training,
etc.), however this is the first time I'm attempting to use it from within
a real application.

Sumona


Re: H2O DataFrame to Spark RDD/DataFrame

2017-01-12 Thread Nicholas Sharkey
Page 33 of the Sparkling Water Booklet:

http://docs.h2o.ai/h2o/latest-stable/h2o-docs/booklets/SparklingWaterBooklet.pdf

df = sqlContext.read.format("h2o").option("key",frame.frame_id).load()

df = sqlContext.read.format("h2o").load(frame.frame_id)

On Thu, Jan 12, 2017 at 1:17 PM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi there,
>
> Is there any way to convert an H2O DataFrame to equivalent Spark RDD or
> DataFrame? I found a good documentation on "*Machine Learning with
> Sparkling Water: H2O + Spark*" here at.
> 
>
> However, it discusses how to convert a Spark RDD or DaataFrame to H2O
> DatFrame but not the vice-versa.
>
>
>
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>


H2O DataFrame to Spark RDD/DataFrame

2017-01-12 Thread Md. Rezaul Karim
Hi there,

Is there any way to convert an H2O DataFrame to equivalent Spark RDD or
DataFrame? I found a good documentation on "*Machine Learning with
Sparkling Water: H2O + Spark*" here at.


However, it discusses how to convert a Spark RDD or DaataFrame to H2O
DatFrame but not the vice-versa.




Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html



Re: mysql and Spark jdbc

2017-01-12 Thread Jorge Machado
Nice it worked !! 

thx 

Jorge Machado
www.jmachado.me





> On 12 Jan 2017, at 17:46, Asher Krim  wrote:
> 
> Have you tried using an alias? You should be able to replace 
> ("dbtable”,"sometable") with ("dbtable”,"SELECT utc_timestamp AS my_timestamp 
> FROM sometable")
> 
> -- 
> Asher Krim
> Senior Software Engineer
> 
> 
> On Thu, Jan 12, 2017 at 10:49 AM, Jorge Machado  > wrote:
> Hi Guys, 
> 
> I’m having a issue loading data with a jdbc connector
> My line of code is : 
> 
> val df = 
> sqlContext.read.format("jdbc").option("url","jdbc:mysql://localhost:3306 
> <>").option("driver","com.mysql.jdbc.Driver").option("dbtable”,"sometable").option("user”,"superuser").option("password”,"supersafe").option("partitionColumn","id").option("lowerBound","1325376000").option("upperBound","1483228800").option("numPartitions","20").load()
> 
> when I do : df.filter("last_value IS NOT NULL ").filter("utc_timestamp <=  
> 1347369839").count 
> or df.filter("last_value IS NOT NULL ").filter(“ `utc_timestamp` <=  
> 1347369839").count
> 
> on the mysql logs I see : SELECT `last_value`,`utc_timestamp` FROM sometable 
> WHERE utc_timestamp <= 1347369839 AND id >= 1451658240 AND id < 145955088
> 
> the problem is that utc_timestamp is a function on mysql and it gets 
> executed. How can I force Spark  to not remove the `` on the where clauses ? 
> 
> 
> 
> 
> Jorge Machado
> www.jmachado.me 
> 
> Jorge Machado
> www.jmachado.me 
> 
> 
> 
> 
> 
> 



Re: [Spark Streaming] NoClassDefFoundError : StateSpec

2017-01-12 Thread Shixiong(Ryan) Zhu
You can find the Spark version of spark-submit in the log. Could you check
if it's not consistent?
On Thu, Jan 12, 2017 at 7:35 AM Ramkumar Venkataraman <
ram.the.m...@gmail.com> wrote:

> Spark: 1.6.1
>
> I am trying to use the new mapWithState API and I am getting the following
> error:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/StateSpec$
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.streaming.StateSpec$
>
> Build.sbt
> 
> scalaVersion := "2.10.6"
> typelevelDefaultSettings
> val sparkVersion = "1.6.1"
>
> resolvers ++= Seq(
>   "Sonatype OSS Snapshots" at
> "https://oss.sonatype.org/content/repositories/snapshots;
> )
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>   "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
>   "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
>   "com.fasterxml.jackson.core" % "jackson-databind" % "2.3.3" // Needed by
> spark-core
> )
> ==
>
> This is how my spark-submit looks like:
>
> ./bin/spark-submit --verbose --master yarn-client  --num-executors 50
> --driver-memory=4G --executor-memory=8G   --conf
> "spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC"
> --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"  --class MY_DRIVER
> ~/project-assembly-0.0.1-SNAPSHOT.jar
>
> ==
>
> Is there anything I am missing here? I understand that NoClassDefFoundError
> means the required Jars aren't present in the classpath, I am just not able
> to understand why this class alone is missing, when the others related to
> window, etc. are found. Do I have to pass in additional jars to make this
> API work?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NoClassDefFoundError-StateSpec-tp28301.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Running a spark code using submit job in google cloud platform

2017-01-12 Thread A Shaikh
You may have tested this code on Spark version on your local machine
version of which may be different to whats in Google Cloud Storage.
You need to select appropraite Spark version when you submit your job.

On 12 January 2017 at 15:51, Anahita Talebi 
wrote:

> Dear all,
>
> I am trying to run a .jar file as a job using submit job in google cloud
> console.
> https://cloud.google.com/dataproc/docs/guides/submit-job
>
> I actually ran the spark code on my local computer to generate a .jar
> file. Then in the Argument folder, I give the value of the arguments that I
> used in the spark code. One of the argument is training data set that I put
> in the same bucket that I save my .jar file. In the bucket, I put only the
> .jar file, training dataset and testing dataset.
>
> Main class or jar
> gs://Anahita/test.jar
>
> Arguments
>
> --lambda=.001
> --eta=1.0
> --trainFile=gs://Anahita/small_train.dat
> --testFile=gs://Anahita/small_test.dat
>
> The problem is that when I run the job I get the following error and
> actually it cannot read  my training and testing data sets.
>
> Exception in thread "main" java.lang.NoSuchMethodError: 
> org.apache.spark.rdd.RDD.coalesce(IZLscala/math/Ordering;)Lorg/apache/spark/rdd/RDD;
>
> Can anyone help me how I can solve this problem?
>
> Thanks,
>
> Anahita
>
>
>


spark rdd map error: too many arguments for unapply pattern, maximum = 22

2017-01-12 Thread Anton Kravchenko
Hi there,

When I do rdd map with more than 22 columns - I get "error: too many
arguments for unapply pattern, maximum = 22".
scala> val rddRes=rows.map{case Row(col1,..col23) => Row(...)}

Is there a known way to get around this issue?

p.s. Here is a full traceback:
C:\spark-2.0.1-bin-hadoop2.7>bin\spark-shell.cmd
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_102)

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val hive_extract =
spark.read.parquet("hdfs:/user/akravchenko/extract_week49_ip.parquet")
hive_extract: org.apache.spark.sql.DataFrame = [rec_lngth_cnt: int,
nch_near_line_rec_vrsn_cd: string ... 3593 more fields]

scala> hive_extract.createOrReplaceTempView("hive_extract_table")

scala> val df0=spark.sql("SELECT
rec_lngth_cnt,nch_edit_trlr_ind_cd_oc1,nch_edit_trlr_ind_cd_oc2,nch_edit_trlr_ind_cd_oc3,nch_edit_trlr_ind_cd_oc4,nch_edit_trlr_ind_cd_oc5,nch_edit_trlr_ind_cd_oc6,nch_edit_trlr_ind_cd_oc7,nch_edit_trlr_ind_cd_oc8,nch_edit_trlr_ind_cd_oc9,nch_edit_trlr_ind_cd_oc10,nch_edit_trlr_ind_cd_oc11,nch_edit_trlr_ind_cd_oc12,nch_edit_trlr_ind_cd_oc13,nch_edit_cd_oc1,nch_edit_cd_oc2,nch_edit_cd_oc3,nch_edit_cd_oc4,nch_edit_cd_oc5,nch_edit_cd_oc6,nch_edit_cd_oc7,nch_edit_cd_oc8,nch_edit_cd_oc9,nch_edit_cd_oc10,nch_edit_cd_oc11,nch_edit_cd_oc12,nch_edit_cd_oc13
FROM hive_extract_table limit 10")
df0: org.apache.spark.sql.DataFrame = [rec_lngth_cnt: int,
nch_edit_trlr_ind_cd_oc1: string ... 25 more fields]

scala> val rows: RDD[Row]=df0.rdd
rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] =
MapPartitionsRDD[82] at rdd at :35

scala> val rddRes=rows.map{case
Row(rec_lngth_cnt,nch_edit_trlr_ind_cd_oc1,nch_edit_trlr_ind_cd_oc2,nch_edit_trlr_ind_cd_oc3,nch_edit_trlr_ind_cd_oc4,nch_edit_trlr_ind_cd_oc5,nch_edit_trlr_ind_cd_oc6,nch_edit_trlr_ind_cd_oc7,nch_edit_trlr_ind_cd_oc8,nch_edit_trlr_ind_cd_oc9,nch_edit_trlr_ind_cd_oc10,nch_edit_trlr_ind_cd_oc11,nch_edit_trlr_ind_cd_oc12,nch_edit_trlr_ind_cd_oc13,nch_edit_cd_oc1,nch_edit_cd_oc2,nch_edit_cd_oc3,nch_edit_cd_oc4,nch_edit_cd_oc5,nch_edit_cd_oc6,nch_edit_cd_oc7,nch_edit_cd_oc8,nch_edit_cd_oc9,nch_edit_cd_oc10,nch_edit_cd_oc11,nch_edit_cd_oc12,nch_edit_cd_oc13)
 | =>
Row(rec_lngth_cnt,Row(nch_edit_trlr_ind_cd_oc1,nch_edit_trlr_ind_cd_oc2,nch_edit_trlr_ind_cd_oc3,nch_edit_trlr_ind_cd_oc4,nch_edit_trlr_ind_cd_oc5,nch_edit_trlr_ind_cd_oc6,nch_edit_trlr_ind_cd_oc7,nch_edit_trlr_ind_cd_oc8,nch_edit_trlr_ind_cd_oc9,nch_edit_trlr_ind_cd_oc10,nch_edit_trlr_ind_cd_oc11,nch_edit_trlr_ind_cd_oc12,nch_edit_trlr_ind_cd_oc13),Row(nch_edit_cd_oc1,nch_edit_cd_oc2,nch_edit_cd_oc3,nch_edit_cd_oc4,nch_edit_cd_oc5,nch_edit_cd_oc6,nch_edit_cd_oc7,nch_edit_cd_oc8,nch_edit_cd_oc9,nch_edit_cd_oc10,nch_edit_cd_oc11,nch_edit_cd_oc12,nch_edit_cd_oc13))}
:37: error: too many arguments for unapply pattern, maximum = 22
   val rddRes=rows.map{case
Row(rec_lngth_cnt,nch_edit_trlr_ind_cd_oc1,nch_edit_trlr_ind_cd_oc2,nch_edit_trlr_ind_cd_oc3,nch_edit_trlr_ind_cd_oc4,nch_edit_trlr_ind_cd_oc5,nch_edit_trlr_ind_cd_oc6,nch_edit_trlr_ind_cd_oc7,nch_edit_trlr_ind_cd_oc8,nch_edit_trlr_ind_cd_oc9,nch_edit_trlr_ind_cd_oc10,nch_edit_trlr_ind_cd_oc11,nch_edit_trlr_ind_cd_oc12,nch_edit_trlr_ind_cd_oc13,nch_edit_cd_oc1,nch_edit_cd_oc2,nch_edit_cd_oc3,nch_edit_cd_oc4,nch_edit_cd_oc5,nch_edit_cd_oc6,nch_edit_cd_oc7,nch_edit_cd_oc8,nch_edit_cd_oc9,nch_edit_cd_oc10,nch_edit_cd_oc11,nch_edit_cd_oc12,nch_edit_cd_oc13)
^

Thank you, Anton


UNSUBSCRIBE

2017-01-12 Thread williamtellme123
 

 

From: Harjit Singh [mailto:harjit.si...@deciphernow.com] 
Sent: Tuesday, April 26, 2016 3:11 PM
To: user@spark.apache.org
Subject: test

 

 

 

 

 

 



Re: mysql and Spark jdbc

2017-01-12 Thread Asher Krim
Have you tried using an alias? You should be able to replace
("dbtable”,"sometable")
with ("dbtable”,"SELECT utc_timestamp AS my_timestamp FROM sometable")

-- 
Asher Krim
Senior Software Engineer

On Thu, Jan 12, 2017 at 10:49 AM, Jorge Machado  wrote:

> Hi Guys,
>
> I’m having a issue loading data with a jdbc connector
> My line of code is :
>
> val df = sqlContext.read.format("jdbc").option("url","jdbc:mysql://
> localhost:3306").option("driver","com.mysql.jdbc.
> Driver").option("dbtable”,"sometable").option("user”,"
> superuser").option("password”,"supersafe").option("
> partitionColumn","id").option("lowerBound","1325376000").
> option("upperBound","1483228800").option("numPartitions","20").load()
>
> when I do : df.filter("last_value IS NOT NULL ").filter("utc_timestamp <=
>  1347369839").count
> or df.filter("last_value IS NOT NULL ").filter(“ `utc_timestamp` <=
>  1347369839").count
>
> on the mysql logs I see : SELECT `last_value`,`utc_timestamp` FROM
> sometable WHERE utc_timestamp <= 1347369839 AND id >= 1451658240 AND id <
> 145955088
>
> the problem is that utc_timestamp is a function on mysql and it gets
> executed. How can I force Spark  to not remove the `` on the where clauses
> ?
>
>
>
>
> Jorge Machado
> www.jmachado.me
>
> Jorge Machado
> www.jmachado.me
>
>
>
>
>
>


Re: [Spark Core] Re-using dataframes with limit() produces unexpected results

2017-01-12 Thread Ant Super
I get the correct result occasionally. Try more often. Or increase the
range number. Then it will break down eventually.

I use spark 2.0.0.2.5 in yarn client mode.

Am 12.01.2017 15:13 schrieb "Takeshi Yamamuro" :

Hi,

I got the correct answer. Did I miss something?

// maropu

---
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.0.0
  /_/

Using Python version 2.7.10 (v2.7.10:15c95b7d81dc, May 23 2015 09:33:12)
SparkSession available as 'spark'.
>>>
>>> from pyspark.sql import Row
>>> rdd=sc.parallelize([Row(i=i) for i in range(100)],200)
>>> rdd1=rdd.toDF().limit(12345).rdd
>>> rdd2=rdd1.map(lambda x:(x,x))
>>> rdd2.join(rdd2).count()
12345


On Thu, Jan 12, 2017 at 4:18 PM, Ant  wrote:

> Hello,
> it seems using a Spark DataFrame, which had limit() applied on it, in
> further calculations produces very unexpected results. Some people use it
> as poor man's sampling and end up debugging for hours to see what's going
> on. Here is an example
>
> from pyspark.sql import Row
> rdd=sc.parallelize([Row(i=i) for i in range(100)],200)
> rdd1=rdd.toDF().limit(12345).rdd
> rdd2=rdd1.map(lambda x:(x,x))
> rdd2.join(rdd2).count()
> # result is 10240 despite doing a self-join; expected 12345
>
> in Pyspark/Spark 2.0.0
>
> I understand that actions on limit may yield different rows every time,
> but re-calculating the same rdd2 within a single DAG is highly unexpected.
>
> Maybe a comment in the documentation may be helpful if there is no easy
> fix of limit. I do see that the intend for limit may be such that no two
> limit paths should occur in a single DAG.
>
> What do you think? What is the correct explanation?
>
> Anton
>



-- 
---
Takeshi Yamamuro


spark locality

2017-01-12 Thread vincent gromakowski
Hi all,
Does anyone have experience running Spark on Mesos with CNI (ip per
container) ?
How would Spark use IP or hostname for data locality with backend framework
like HDFS or Cassandra ?

V


Running a spark code using submit job in google cloud platform

2017-01-12 Thread Anahita Talebi
Dear all,

I am trying to run a .jar file as a job using submit job in google cloud
console.
https://cloud.google.com/dataproc/docs/guides/submit-job

I actually ran the spark code on my local computer to generate a .jar file.
Then in the Argument folder, I give the value of the arguments that I used
in the spark code. One of the argument is training data set that I put in
the same bucket that I save my .jar file. In the bucket, I put only the
.jar file, training dataset and testing dataset.

Main class or jar
gs://Anahita/test.jar

Arguments

--lambda=.001
--eta=1.0
--trainFile=gs://Anahita/small_train.dat
--testFile=gs://Anahita/small_test.dat

The problem is that when I run the job I get the following error and
actually it cannot read  my training and testing data sets.

Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.spark.rdd.RDD.coalesce(IZLscala/math/Ordering;)Lorg/apache/spark/rdd/RDD;

Can anyone help me how I can solve this problem?

Thanks,

Anahita


mysql and Spark jdbc

2017-01-12 Thread Jorge Machado
Hi Guys, 

I’m having a issue loading data with a jdbc connector
My line of code is : 

val df = 
sqlContext.read.format("jdbc").option("url","jdbc:mysql://localhost:3306 
").option("driver","com.mysql.jdbc.Driver").option("dbtable”,"sometable").option("user”,"superuser").option("password”,"supersafe").option("partitionColumn","id").option("lowerBound","1325376000").option("upperBound","1483228800").option("numPartitions","20").load()

when I do : df.filter("last_value IS NOT NULL ").filter("utc_timestamp <=  
1347369839").count 
or df.filter("last_value IS NOT NULL ").filter(“ `utc_timestamp` <=  
1347369839").count

on the mysql logs I see : SELECT `last_value`,`utc_timestamp` FROM sometable 
WHERE utc_timestamp <= 1347369839 AND id >= 1451658240 AND id < 145955088

the problem is that utc_timestamp is a function on mysql and it gets executed. 
How can I force Spark  to not remove the `` on the where clauses ? 




Jorge Machado
www.jmachado.me 

Jorge Machado
www.jmachado.me







Re: How to save spark-ML model in Java?

2017-01-12 Thread Asher Krim
What version of Spark are you on?
Although it's cut off, I think your error is with RandomForestClassifier,
is that correct? If so, you should upgrade to spark 2 since I think this
class only became writeable/readable in Spark 2 (
https://github.com/apache/spark/pull/12118)

On Thu, Jan 12, 2017 at 8:43 AM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi Malshan,
>
> The error says that one (or more) of the estimators/stages is either not
> writable or compatible that supports overwrite/model write operation.
>
> Suppose you want to configure an ML pipeline consisting of three stages
> (i.e. estimator): tokenizer, hashingTF, and nb:
> val nb = new NaiveBayes().setSmoothing(0.1)
> val tokenizer = new Tokenizer().setInputCol("
> label").setOutputCol("label")
> val hashingTF = new 
> HashingTF().setInputCol(tokenizer.getOutputCol).setOutputCol("features")
>
> val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF,
> nb))
>
>
> Now check if all the stages are writable. And to make it ease try saving
> stages individually:  -e.g. tokenizer.write.save("path")
>
>
> hashingTF.write.save("path")
> After that suppose you want to perform a 10-fold cross-validation as
> follows:
> val cv = new CrossValidator()
>   .setEstimator(pipeline)
>   .setEvaluator(new BinaryClassificationEvaluator)
>   .setEstimatorParamMaps(paramGrid)
>   .setNumFolds(10)
>
> Where:
> val paramGrid = new ParamGridBuilder()
> .addGrid(hashingTF.numFeatures, Array(10,
> 100, 1000))
> .addGrid(nb.smoothing, Array(0.001, 0.0001))
> .build()
>
> Now the model that you trained using the training set should be writable
> if all of the stages are okay:
> val model = cv.fit(trainingData)
> model.write.overwrite().save("output/NBModel")
>
>
>
> Hope that helps.
>
>
>
>
>
>
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>
> On 12 January 2017 at 09:09, Minudika Malshan 
> wrote:
>
>> Hi,
>>
>> When I try to save a pipeline model using spark ML (Java) , the following
>> exception is thrown.
>>
>>
>> java.lang.UnsupportedOperationException: Pipeline write will fail on
>> this Pipeline because it contains a stage which does not implement
>> Writable. Non-Writable stage: rfc_98f8c9e0bd04 of type class
>> org.apache.spark.ml.classification.Rand
>>
>>
>> Here is my code segment.
>>
>>
>> model.write().overwrite,save
>>
>>
>> model.write().overwrite().save("path
>> model.write().overwrite().save("mypath");
>>
>>
>> How to resolve this?
>>
>> Thanks and regards!
>>
>> Minudika
>>
>>
>


-- 
Asher Krim
Senior Software Engineer


[Spark Streaming] NoClassDefFoundError : StateSpec

2017-01-12 Thread Ramkumar Venkataraman
Spark: 1.6.1

I am trying to use the new mapWithState API and I am getting the following
error:

Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/streaming/StateSpec$
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.streaming.StateSpec$

Build.sbt

scalaVersion := "2.10.6"
typelevelDefaultSettings
val sparkVersion = "1.6.1"

resolvers ++= Seq(
  "Sonatype OSS Snapshots" at
"https://oss.sonatype.org/content/repositories/snapshots;
)

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
  "com.fasterxml.jackson.core" % "jackson-databind" % "2.3.3" // Needed by
spark-core
)
==

This is how my spark-submit looks like:

./bin/spark-submit --verbose --master yarn-client  --num-executors 50
--driver-memory=4G --executor-memory=8G   --conf
"spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC"
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"  --class MY_DRIVER
~/project-assembly-0.0.1-SNAPSHOT.jar

==

Is there anything I am missing here? I understand that NoClassDefFoundError
means the required Jars aren't present in the classpath, I am just not able
to understand why this class alone is missing, when the others related to
window, etc. are found. Do I have to pass in additional jars to make this
API work?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NoClassDefFoundError-StateSpec-tp28301.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Support of Theta Join

2017-01-12 Thread Mahender Sarangam
Hi All,

Is there any support of theta join in SPARK. We want to identify the 
country based on range on IP Address (we have in our DB)



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



failed to launch org.apache.spark.deploy.master.Master

2017-01-12 Thread Soheila S.
Hi,
I have executed my spark job using spark-submit on my local machine and on
cluster.
Now I want to try using HDFS. I mean put the data (text file) on hdfs and
read from there, execute the jar file and finally write the output to hdfs.
I got this error after running the job:

*failed to launch org.apache.spark.deploy.master.Master:*
*log is following:*
*Spark Command: /scratch/p_corpus/tools/jdk1.8.0_112/bin/java -cp
$/home/user-folder/cluster-conf-1369394/spark/:/scratch/p_corpus/tools/spark-2.0.1-bin-hadoop2.6/jars/*:/home/user-folder/cluster-conf-1369394/hadoop/:/home/user-folder/cluster-conf-1369394/hadoop/
-Xmx1g org.apache.spark.deploy.master.Master --host
taurusi5551.taurus.hrsk.tu-dresden.de
 --port 7077 --webui-port
8080 /home/user-folder/cluster-conf-1369394/spark*
**
*17/01/12 14:49:32 INFO master.Master: Started daemon with process name:
8524@taurusi5551*
*17/01/12 14:49:32 INFO util.SignalUtils: Registered signal handler for
TERM*
*17/01/12 14:49:32 INFO util.SignalUtils: Registered signal handler for HUP*
*17/01/12 14:49:32 INFO util.SignalUtils: Registered signal handler for INT*
*Usage: Master [options]*

*Options:*
*  -i HOST, --ip HOST Hostname to listen on (deprecated, please use
--host or -h) *
*  -h HOST, --host HOST   Hostname to listen on*
*  -p PORT, --port PORT   Port to listen on (default: 7077)*
*  --webui-port PORT  Port for web UI (default: 8080)*
*  --properties-file FILE Path to a custom Spark properties file.*
* Default is conf/spark-defaults.conf.*

Any help would be really appreciated.

Best,
Soheila


Re: [Spark Core] Re-using dataframes with limit() produces unexpected results

2017-01-12 Thread Takeshi Yamamuro
Hi,

I got the correct answer. Did I miss something?

// maropu

---
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.0.0
  /_/

Using Python version 2.7.10 (v2.7.10:15c95b7d81dc, May 23 2015 09:33:12)
SparkSession available as 'spark'.
>>>
>>> from pyspark.sql import Row
>>> rdd=sc.parallelize([Row(i=i) for i in range(100)],200)
>>> rdd1=rdd.toDF().limit(12345).rdd
>>> rdd2=rdd1.map(lambda x:(x,x))
>>> rdd2.join(rdd2).count()
12345


On Thu, Jan 12, 2017 at 4:18 PM, Ant  wrote:

> Hello,
> it seems using a Spark DataFrame, which had limit() applied on it, in
> further calculations produces very unexpected results. Some people use it
> as poor man's sampling and end up debugging for hours to see what's going
> on. Here is an example
>
> from pyspark.sql import Row
> rdd=sc.parallelize([Row(i=i) for i in range(100)],200)
> rdd1=rdd.toDF().limit(12345).rdd
> rdd2=rdd1.map(lambda x:(x,x))
> rdd2.join(rdd2).count()
> # result is 10240 despite doing a self-join; expected 12345
>
> in Pyspark/Spark 2.0.0
>
> I understand that actions on limit may yield different rows every time,
> but re-calculating the same rdd2 within a single DAG is highly unexpected.
>
> Maybe a comment in the documentation may be helpful if there is no easy
> fix of limit. I do see that the intend for limit may be such that no two
> limit paths should occur in a single DAG.
>
> What do you think? What is the correct explanation?
>
> Anton
>



-- 
---
Takeshi Yamamuro


Spark with oozie #Not implemented by the TFS FileSystem# issue

2017-01-12 Thread Rohit Mishra
Hello,

I am new to spark.
I need to run a spark job within oozie.
individually i am able to run the spark job but with oozie after the job is 
launched i am getting the following error:

017-01-12 13:51:57,696 INFO [main] org.apache.hadoop.service.AbstractService: 
Service org.apache.hadoop.mapreduce.v2.app.MRAppMaster failed in state INITED; 
cause: java.lang.UnsupportedOperationException: Not implemented by the TFS 
FileSystem implementation
java.lang.UnsupportedOperationException: Not implemented by the TFS FileSystem 
implementation
at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:216)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2564)
at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
at 
org.apache.hadoop.mapreduce.v2.app.MRAppMaster.getFileSystem(MRAppMaster.java:497)
at 
org.apache.hadoop.mapreduce.v2.app.MRAppMaster.serviceInit(MRAppMaster.java:281)
at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.mapreduce.v2.app.MRAppMaster$4.run(MRAppMaster.java:1499)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at 
org.apache.hadoop.mapreduce.v2.app.MRAppMaster.initAndStartAppMaster(MRAppMaster.java:1496)
at 
org.apache.hadoop.mapreduce.v2.app.MRAppMaster.main(MRAppMaster.java:1429)

Spark version: spark-1.5.2-bin-hadoop2.6
Hadoop: hadoop-2.6.2
Hbase : hbase-1.1.5
Oozie: oozie-4.2.0

snapshot of my pom.xml is:


   org.apache.zookeeper
   zookeeper
   3.4.8
   pom


   org.apache.hbase
   hbase-common
   1.1.5
   
  
 org.slf4j
 slf4j-log4j12
  
   



   org.apache.hbase
   hbase-client
   1.1.5
   
  
 org.slf4j
 slf4j-log4j12
  
   



   org.apache.hbase
   hbase-server
   1.1.5
   
  
 org.slf4j
 slf4j-log4j12
  
   


   org.apache.hbase
   hbase-testing-util
   1.1.5


   org.apache.spark
   spark-core_2.11
   1.5.2
   
  
 javax.servlet
 org.eclipse.jetty.orbit
  
   



   org.apache.spark
   spark-sql_2.11
   1.5.2



   org.apache.spark
   spark-yarn_2.11
   1.5.2






   org.mongodb.mongo-hadoop
   mongo-hadoop-core
   1.5.2



   org.apache.hadoop
   hadoop-common
   2.6.2
   
  
 servlet-api
 javax.servlet
  
  
 jetty-util
 org.mortbay.jetty
  
  
 jsp-api
 javax.servlet.jsp
  
   



   org.apache.hadoop
   hadoop-client
   2.6.2
   
  
 jetty-util
 org.mortbay.jetty
  
   



   org.apache.hadoop
   hadoop-mapreduce-client-core
   2.6.2


   org.mongodb
   mongo-java-driver
   3.2.1




   org.apache.hadoop
   hadoop-core
   1.2.1
   
  
 jetty-util
 org.mortbay.jetty
  
   



Till now I have searched several blogs. What i do understand from reading those 
blogs iis that there is some issue with the tachyon jar which is embedded in 
spark-assembly-1.5.2-hadoop2.6.0.jar.
I tried removing tachyon-0.5.0.jar tachyon-client-0.5.0.jar from shared library 
of oozie (was present under spark library) but then i started getting error:

Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], 
main() threw exception, org.apache.spark.util.Utils$.DEFAULT_DRIVER_MEM_MB()I
java.lang.NoSuchMethodError: 
org.apache.spark.util.Utils$.DEFAULT_DRIVER_MEM_MB()I

Please help me debug and solve it.

Thanks,
Rohit

Re: Spark and Kafka integration

2017-01-12 Thread Jacek Laskowski
Hi Phadnis,

I found this in
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html:

> This version of the integration is marked as experimental, so the API is 
> potentially subject to change.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jan 12, 2017 at 2:57 PM, Phadnis, Varun
 wrote:
> Hello,
>
>
>
> We are using  Spark 2.0 with Kafka 0.10.
>
>
>
> As I understand, much of the API packaged in the following dependency we are
> targeting is marked as “@Experimental”
>
>
>
> 
>
> org.apache.spark
>
> spark-streaming-kafka-0-10_2.11
>
> 2.0.0
>
> 
>
>
>
> What are implications of this being marked as experimental? Are they stable
> enough for production?
>
>
>
> Thanks,
>
> Varun
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark and Kafka integration

2017-01-12 Thread Phadnis, Varun
Hello,

We are using  Spark 2.0 with Kafka 0.10.

As I understand, much of the API packaged in the following dependency we are 
targeting is marked as "@Experimental"


org.apache.spark
spark-streaming-kafka-0-10_2.11
2.0.0


What are implications of this being marked as experimental? Are they stable 
enough for production?

Thanks,
Varun



Re: How to save spark-ML model in Java?

2017-01-12 Thread Md. Rezaul Karim
Hi Malshan,

The error says that one (or more) of the estimators/stages is either not
writable or compatible that supports overwrite/model write operation.

Suppose you want to configure an ML pipeline consisting of three stages
(i.e. estimator): tokenizer, hashingTF, and nb:
val nb = new NaiveBayes().setSmoothing(0.1)
val tokenizer = new
Tokenizer().setInputCol("label").setOutputCol("label")
val hashingTF = new
HashingTF().setInputCol(tokenizer.getOutputCol).setOutputCol("features")

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF,
nb))


Now check if all the stages are writable. And to make it ease try saving
stages individually:  -e.g. tokenizer.write.save("path")

hashingTF.write.save("path")
After that suppose you want to perform a 10-fold cross-validation as
follows:
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(10)

Where:
val paramGrid = new ParamGridBuilder()
.addGrid(hashingTF.numFeatures, Array(10, 100,
1000))
.addGrid(nb.smoothing, Array(0.001, 0.0001))
.build()

Now the model that you trained using the training set should be writable if
all of the stages are okay:
val model = cv.fit(trainingData)
model.write.overwrite().save("output/NBModel")



Hope that helps.







Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html


On 12 January 2017 at 09:09, Minudika Malshan  wrote:

> Hi,
>
> When I try to save a pipeline model using spark ML (Java) , the following
> exception is thrown.
>
>
> java.lang.UnsupportedOperationException: Pipeline write will fail on this
> Pipeline because it contains a stage which does not implement Writable.
> Non-Writable stage: rfc_98f8c9e0bd04 of type class org.apache.spark.ml.
> classification.Rand
>
>
> Here is my code segment.
>
>
> model.write().overwrite,save
>
>
> model.write().overwrite().save("path
> model.write().overwrite().save("mypath");
>
>
> How to resolve this?
>
> Thanks and regards!
>
> Minudika
>
>


Re: Add row IDs column to data frame

2017-01-12 Thread ayan guha
Just in case you are more comfortable with SQL,

row_number over ()

should also generate an unique id.

On Thu, Jan 12, 2017 at 7:00 PM, akbar501  wrote:

> The following are 2 different approaches to adding an id/index to RDDs and
> 1
> approach to adding an index to a DataFrame.
>
> Add an index column to an RDD
>
>
> ```scala
> // RDD
> val dataRDD = sc.textFile("./README.md")
> // Add index then set index as key in map() transformation
> // Results in RDD[(Long, String)]
> val indexedRDD = dataRDD.zipWithIndex().map(pair => (pair._2, pair._1))
> ```
>
> Add a unique id column to an RDD
>
>
> ```scala
> // RDD
> val dataRDD = sc.textFile("./README.md")
> // Add unique id then set id as key in map() transformation
> // Results in RDD[(Long, String)]
> val indexedRDD = dataRDD.zipWithUniqueId().map(pair => (pair._2, pair._1))
> indexedRDD.collect
> ```
>
> Add an index column to a DataFrame
>
>
> Note: You could use a similar approach with a Dataset.
>
> ```scala
> import spark.implicits._
> import org.apache.spark.sql.functions.monotonicallyIncreasingId
>
> val dataDF = spark.read.textFile("./README.md")
> val indexedDF = dataDF.withColumn("id", monotonically_increasing_id)
> indexedDF.select($"id", $"value").show
> ```
>
>
>
> -
> Delixus.com - Spark Consulting
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Append-column-to-Data-Frame-or-RDD-
> tp22385p28300.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Re: Re: how to change datatype by useing StructType

2017-01-12 Thread lk_spark
I have try like this:
  
  val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*")
  val rowRDD = peopleRDD.map(_.split(",")).map(attributes => {
  val ab = ArrayBuffer[Any]()
  for (i <- 0 until schemaType.length) {
if (schemaType(i).equalsIgnoreCase("int")) {
  ab += attributes(i).toInt
} else if (schemaType(i).equalsIgnoreCase("long")) {
  ab += attributes(i).toLong
} else {
  ab += attributes(i)
}
  }
  Row(ab.toArray)
})

val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF .show

I got error:
 Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a valid 
external type for schema of string
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)

all the file was Any, what should I do?



2017-01-12 

lk_spark 



发件人:"lk_spark"
发送时间:2017-01-12 14:38
主题:Re: Re: how to change datatype by useing StructType
收件人:"ayan guha","user.spark"
抄送:

yes, field year is in my data:

data:
  kevin,30,2016
  shen,30,2016
  kai,33,2016
  wei,30,2016

this will not work 
   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 
Row(attributes(0),attributes(1),attributes(2)))
but I need read data by configurable.
2017-01-12 

lk_spark 



发件人:ayan guha 
发送时间:2017-01-12 14:34
主题:Re: how to change datatype by useing StructType
收件人:"lk_spark","user.spark"
抄送:

Do you have year in your data?

On Thu, 12 Jan 2017 at 5:24 pm, lk_spark  wrote:





















hi,all



I have a txt file ,and I want to process it as dataframe 

:







data like this :



   name1,30



   name2,18







val schemaString = "name age year"


val xMap=new 

scala.collection.mutable.HashMap[String,DataType]()


xMap.put("name", StringType)
xMap.put("age", 

IntegerType)
xMap.put("year", 

IntegerType)

val fields = 

schemaString.split(" ").map(fieldName => StructField(fieldName, 

xMap.get(fieldName).get, nullable = true))
val schema = 

StructType(fields)

val peopleRDD = 

spark.sparkContext.textFile("/sourcedata/test/test*")


//spark.read.schema(schema).text("/sourcedata/test/test*")



val rowRDD = peopleRDD.map(_.split(",")).map(attributes 

=> Row(attributes(0),attributes(1))







// Apply the schema to the RDD
val 

peopleDF = spark.createDataFrame(rowRDD, schema)  







but when I write it to table or show it I will got 

error:











   Caused by: java.lang.RuntimeException: Error while encoding: 

java.lang.RuntimeException: java.lang.String is not a valid external type for 

schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top 

level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true) AS name#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, 

true], top level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true)







   if I change my code it will work:



   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 

Row(attributes(0),attributes(1).toInt)



   but this is not a good idea .







2017-01-12







lk_spark 

How to save a pipeline model in spark-ML(JAVA) ?

2017-01-12 Thread Minudika Malshan
Hi,

When I try to save a pipeline model using spark ML (Java) , the following
exception is thrown.

java.lang.UnsupportedOperationException: Pipeline write will fail on this
Pipeline because it contains a stage which does not implement Writable.
Non-Writable stage: rfc_98f8c9e0bd04 of type class org.apache.spark.ml.
classification.Rand


Here is my code.


CrossValidatorModel model = cv.fit(trainingData);
model.write().overwrite().save("path");
PipelineModel model = pipeline.fit(trainingData);
PipelineModel model = pipeline.fit(trainingData);
model.write().overwrite().save("path");


How to resolve this?

Thank you!

Minudika


How to save spark-ML model in Java?

2017-01-12 Thread Minudika Malshan
Hi,

When I try to save a pipeline model using spark ML (Java) , the following
exception is thrown.


java.lang.UnsupportedOperationException: Pipeline write will fail on this
Pipeline because it contains a stage which does not implement Writable.
Non-Writable stage: rfc_98f8c9e0bd04 of type class
org.apache.spark.ml.classification.Rand


Here is my code segment.


model.write().overwrite,save


model.write().overwrite().save("path
model.write().overwrite().save("mypath");


How to resolve this?

Thanks and regards!

Minudika


Re: Add row IDs column to data frame

2017-01-12 Thread akbar501
The following are 2 different approaches to adding an id/index to RDDs and 1
approach to adding an index to a DataFrame.

Add an index column to an RDD


```scala
// RDD
val dataRDD = sc.textFile("./README.md")
// Add index then set index as key in map() transformation
// Results in RDD[(Long, String)]
val indexedRDD = dataRDD.zipWithIndex().map(pair => (pair._2, pair._1))
```

Add a unique id column to an RDD


```scala
// RDD
val dataRDD = sc.textFile("./README.md")
// Add unique id then set id as key in map() transformation
// Results in RDD[(Long, String)]
val indexedRDD = dataRDD.zipWithUniqueId().map(pair => (pair._2, pair._1))
indexedRDD.collect
```

Add an index column to a DataFrame


Note: You could use a similar approach with a Dataset.

```scala
import spark.implicits._
import org.apache.spark.sql.functions.monotonicallyIncreasingId 

val dataDF = spark.read.textFile("./README.md")
val indexedDF = dataDF.withColumn("id", monotonically_increasing_id)
indexedDF.select($"id", $"value").show
```



-
Delixus.com - Spark Consulting
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Append-column-to-Data-Frame-or-RDD-tp22385p28300.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org