Re: use case reading files split per id

2016-11-14 Thread ayan guha
How about following approach -

- get the list of ID
- get one rdd each for them using wholetextfile
- map and flatmap to generate pair rdd with ID as key and list as value
- union all the RDD s together
- group by key
On 15 Nov 2016 16:43, "Mo Tao"  wrote:

> Hi ruben,
>
> You may try sc.binaryFiles which is designed for lots of small files and it
> can map paths into inputstreams.
> Each inputstream will keep only the path and some configuration, so it
> would
> be cheap to shuffle them.
> However, I'm not sure whether spark take the data locality into account
> while dealing with these inputstreams.
>
> Hope this helps
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/use-case-reading-files-split-per-
> id-tp28044p28075.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


RE: How to read a Multi Line json object via Spark

2016-11-14 Thread Kappaganthu, Sivaram (ES)
Hello,

Please find attached the old mail on this subject

Thanks,
Sivaram
From: Sree Eedupuganti [mailto:s...@inndata.in]
Sent: Tuesday, November 15, 2016 12:51 PM
To: user
Subject: How to read a Multi Line json object via Spark

I tried from Spark-Shell and i am getting the following error:

Here is the test.json file:


{

"colorsArray": [{

"red": "#f00",

"green": "#0f0",

"blue": "#00f",

"cyan": "#0ff",

"magenta": "#f0f",

"yellow": "#ff0",

"black": "#000"

}]

}


scala> val jtex = 
sqlContext.read.format("json").option("samplingRatio","1.0").load("/user/spark/test.json")



   jtex: org.apache.spark.sql.DataFrame = [_corrupt_record: string]

Any suggestions please. Thanks.
--
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited

--
This message and any attachments are intended only for the use of the addressee 
and may contain information that is privileged and confidential. If the reader 
of the message is not the intended recipient or an authorized representative of 
the intended recipient, you are hereby notified that any dissemination of this 
communication is strictly prohibited. If you have received this communication 
in error, notify the sender immediately by return email and delete the message 
and any attachments from your system.
--- Begin Message ---
Hi,

Thank you so much  for reply.  And the link is very good . I am new to both 
spark and python.  I am feeling little difficulty in understanding the below 
code. Could you please provide the scala code for the below python code.



In [100]: import json

  multiline_rdd=sc.wholeTextFiles(inputFile)

  type(multiline_rdd)



 import re

 json_rdd = multiline_rdd.map(lambda x : x[1])\

.map(lambda x : re.sub(r"\s+", "", x, \

flags=re.UNICODE))



Thanks,

SIvaram

From: Hyukjin Kwon [mailto:gurwls...@gmail.com]
Sent: Wednesday, October 12, 2016 11:45 AM
To: Kappaganthu, Sivaram (ES)
Cc: Luciano Resende; Jean Georges Perrin; user @spark
Subject: Re: JSON Arrays and Spark



No, I meant it should be in a single line but it supports array type too as a 
root wrapper of JSON objects.



If you need to parse multiple lines, I have a reference here.



http://searchdatascience.com/spark-adventures-1-processing-multi-line-json-files/



2016-10-12 15:04 GMT+09:00 Kappaganthu, Sivaram (ES) 
>:

Hi,



Does this mean that handling any Json with kind of below schema  with spark is 
not a good fit?? I have requirement to parse the below Json that spans across 
multiple lines. Whats the best way to parse the jsns of this kind?? Please 
suggest.



root

|-- maindate: struct (nullable = true)

||-- mainidnId: string (nullable = true)

|-- Entity: array (nullable = true)

||-- element: struct (containsNull = true)

|||-- Profile: struct (nullable = true)

||||-- Kind: string (nullable = true)

|||-- Identifier: string (nullable = true)

|||-- Group: array (nullable = true)

||||-- element: struct (containsNull = true)

|||||-- Period: struct (nullable = true)

||||||-- pid: string (nullable = true)

||||||-- pDate: string (nullable = true)

||||||-- quarter: long (nullable = true)

||||||-- labour: array (nullable = true)

|||||||-- element: struct (containsNull = true)

||||||||-- category: string (nullable = true)

||||||||-- id: string (nullable = true)

||||||||-- person: struct (nullable = true)

|||||||||-- address: array (nullable = true)

||||||||||-- element: struct (containsNull 
= true)

|||||||||||-- city: string (nullable = 
true)

|||||||||||-- line1: string (nullable = 
true)

|||||||||||-- line2: string (nullable = 
true)

|||||||||||-- postalCode: string 
(nullable = true)

|||||||||||-- state: string (nullable = 
true)

|||||||||||-- type: string (nullable = 
true)

|||||||||-- familyName: string (nullable = true)

||||||||-- tax: array (nullable = true)

|||||||||-- element: struct (containsNull = 
true)

||||||||||-- code: string (nullable = true)

||||||||||-- qwage: double (nullable = true)

||||   

How to read a Multi Line json object via Spark

2016-11-14 Thread Sree Eedupuganti
I tried from Spark-Shell and i am getting the following error:

Here is the test.json file:

{
"colorsArray": [{
"red": "#f00",
"green": "#0f0",
"blue": "#00f",
"cyan": "#0ff",
"magenta": "#f0f",
"yellow": "#ff0",
"black": "#000"
}]}


scala> val jtex =
sqlContext.read.format("json").option("samplingRatio","1.0").load("/user/spark/test.json")

   jtex: org.apache.spark.sql.DataFrame = [_corrupt_record: string]


Any suggestions please. Thanks.
-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


HiveContext.getOrCreate not accessible

2016-11-14 Thread Praseetha
Hi All,


I have a streaming app and when i try invoking the HiveContext.getOrCreate,
it errors out with the following stmt. 'object HiveContext in package hive
cannot be accessed in package org.apache.spark.sql.hive'

I would require HiveContext instead of SQLContext for my application and
creating new HiveContext everytime would not be a feasible solution.

Here is my code snippet:
object sampleStreamingApp  {

  def createStreamingContext(checkpointDirectory: String): StreamingContext
= {
val conf = new SparkConf().setAppName("sampleStreaming")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Milliseconds(5000))
ssc.checkpoint(checkpointDirectory)
val smDStream = ssc.textFileStream("/user/hdpuser/data")
val smSplitted = smDStream.map( x => x.split(";") ).map( x => Row.fromSeq(
x ) )
smSplitted.foreachRDD { rdd =>
 val sqlContext = HiveContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._

}
}
ssc
  }

  def main(args: Array[String]) {
  val checkpointDirectory =
"hdfs://localhost:8020/user/dfml/checkpointing/AAA"
  val ssc = StreamingContext.getActiveOrCreate(checkpointDirectory, () =>
createStreamingContext(checkpointDirectory))

  ssc.start()
  ssc.awaitTermination()
  }
}

Any help would be appreciated.

Regds,
--Praseetha


mapWithState job slows down & exceeds yarn's memory limits

2016-11-14 Thread Daniel Haviv
Hi,
I have a fairly simple stateful streaming job that suffers from high GC and
it's executors are killed as they are exceeding the size of the requested
container.
My current executor-memory is 10G, spark overhead is 2G and it's running
with one core.

At first the job begins running at a rate that is below the batch time
(45s) and after a few batches it starts running at much slower rates (2.1
minutes at times).

This is the relevant code:

import sparkSession.implicits._
val x: RDD[(String, Record)] = ss.sql(s"""select  * from
data_table  where partition_ts >=
${DateTime.now.minusHours(10).toString("MMddHH")}""").as[Record].map(x
=> {
  (x.iid, x)
}).rdd
val stateSpec = StateSpec.function(trackStateFunc _).numPartitions(16)
  .timeout(Durations.minutes(60 * 48)).initialState(x)
val ssc = new StreamingContext(sc, Seconds(45))
val sqlContext = ss.sqlContext
val kafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val stateStream  =
kafkaStream.transform(x=>{x.map(_._2)}).transform(x=>{val sparkSession
= ss ; import sparkSession.implicits._ ;
sqlContext.read.schema(schema).json(x).as[Record].map(r=>Tuple2(r.iid,r))}.rdd).mapWithState(stateSpec)

stateStream.foreachRDD(x=>{x.coalesce(16).toDF().write.mode(SaveMode.Append).insertInto("joineddata")})


Right now, after playing with the parameters a bit, I'm running with
spark.memory.storageFraction=0 and spark.memory.fraction=0.2  .

Any help would be appreciated.

Thank you,

Daniel


Re: use case reading files split per id

2016-11-14 Thread Mo Tao
Hi ruben,

You may try sc.binaryFiles which is designed for lots of small files and it
can map paths into inputstreams.
Each inputstream will keep only the path and some configuration, so it would
be cheap to shuffle them.
However, I'm not sure whether spark take the data locality into account
while dealing with these inputstreams.

Hope this helps



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/use-case-reading-files-split-per-id-tp28044p28075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Cannot find Native Library in "cluster" deploy-mode

2016-11-14 Thread Mo Tao
Hi jtgenesis,

UnsatisfiedLinkError could be caused by the missing library that your .so
files require, so you may have a look at the exception message.

You can also try setExecutorEnv("LD_LIBRARY_PATH", ".:" +
sys.env("LD_LIBRARY_PATH")) and then submit your job with your .so files
using the --files option. Spark would put these .so files in the working
directory of each exectuor.

Best



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-find-Native-Library-in-cluster-deploy-mode-tp28072p28074.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ANNOUNCE] Apache Spark 2.0.2

2016-11-14 Thread Reynold Xin
We are happy to announce the availability of Spark 2.0.2!

Apache Spark 2.0.2 is a maintenance release containing 90 bug fixes along
with Kafka 0.10 support and runtime metrics for Structured Streaming. This
release is based on the branch-2.0 maintenance branch of Spark. We strongly
recommend all 2.0.x users to upgrade to this stable release.

To download Apache Spark 2.0.12 visit http://spark.apache.org/downloads.html

We would like to acknowledge all community members for contributing patches
to this release.


Re: AVRO File size when caching in-memory

2016-11-14 Thread Prithish
I am using 2.0.1 and databricks avro library 3.0.1. I am running this on
the latest AWS EMR release.

On Mon, Nov 14, 2016 at 3:06 PM, Jörn Franke  wrote:

> spark version? Are you using tungsten?
>
> > On 14 Nov 2016, at 10:05, Prithish  wrote:
> >
> > Can someone please explain why this happens?
> >
> > When I read a 600kb AVRO file and cache this in memory (using
> cacheTable), it shows up as 11mb (storage tab in Spark UI). I have tried
> this with different file sizes, and the size in-memory is always
> proportionate. I thought Spark compresses when using cacheTable.
>


Re: spark streaming with kinesis

2016-11-14 Thread Takeshi Yamamuro
Seems it it not a good design to frequently restart workers in a minute
because
their initialization and shutdown take much time as you said
(e.g., interconnection overheads with dynamodb and graceful shutdown).

Anyway, since this is a kind of questions about the aws kinesis library, so
you'd better to ask aws guys in their forum or something.

// maropu


On Mon, Nov 14, 2016 at 11:20 PM, Shushant Arora 
wrote:

> 1.No, I want to implement low level consumer on kinesis stream.
> so need to stop the worker once it read the latest sequence number sent by
> driver.
>
> 2.What is the cost of frequent register and deregister of worker node. Is
> that when worker's shutdown is called it will terminate run method but
> leasecoordinator will wait for 2seconds before releasing the lease. So I
> cannot deregister a worker in less than 2 seconds ?
>
> Thanks!
>
>
>
> On Mon, Nov 14, 2016 at 7:36 PM, Takeshi Yamamuro 
> wrote:
>
>> Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not
>> enough for your usecase?
>>
>> On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Thanks!
>>> Is there a way to get the latest sequence number of all shards of a
>>> kinesis stream?
>>>
>>>
>>>
>>> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro >> > wrote:
>>>
 Hi,

 The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
 in KinesisClientLibConfiguration though,
 it is not configurable in the current implementation.

 The detail can be found in;
 https://github.com/apache/spark/blob/master/external/kinesis
 -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
 isReceiver.scala#L152

 // maropu


 On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> *Hi *
>
> *is **spark.streaming.blockInterval* for kinesis input stream is
> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
> fetched data from kinesis .
>
> Means stream batch interval cannot be less than 
> *spark.streaming.blockInterval
> and this should be configrable , Also is there any minimum value for
> streaming batch interval ?*
>
> *Thanks*
>
>


 --
 ---
 Takeshi Yamamuro

>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Spark SQL UDF - passing map as a UDF parameter

2016-11-14 Thread Nirav Patel
I am trying to use following API from Functions to convert a map into
column so I can pass it to UDF.

map(cols: Column

*): Column


"Creates a new map column. The input columns must be grouped as key-value
pairs, e.g. (key1, value1, key2, value2, ...). The key columns must all
have the same data type, and can't be null. The value columns must all have
the same data type."


final val idxMap = idxMapRdd.collectAsMap
val colmap =  map(idxMapA.map(lit _): _*)

But getting following error:

:139: error: type mismatch;
 found   : Iterable[org.apache.spark.sql.Column]
 required: Seq[org.apache.spark.sql.Column]
   val colmap =  map(idxMapArr.map(lit _): _*)


If I try:
val colmap =  map(idxMapArr.map(lit _).toSeq: _*)

It says:

java.lang.RuntimeException: Unsupported literal type class scala.Tuple2
(17.0,MBO)
at
org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at $anonfun$1.apply(:153)



What is the correct usage of a `map` api to convert hashmap into column?

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: sbt shenanigans for a Spark-based project

2016-11-14 Thread Don Drake
I would remove your entire local Maven repo (~/.m2/repo in linux) and try
again. I'm able to compile sample code with your build.sbt and sbt
v.0.13.12.

-Don

On Mon, Nov 14, 2016 at 3:11 PM, Marco Mistroni  wrote:

> uhm.sorry.. still same issues. this is hte new version
>
> name := "SparkExamples"
> version := "1.0"
> scalaVersion := "2.11.8"
> val sparkVersion = "2.0.1"
>
> // Add a single dependency
> libraryDependencies += "junit" % "junit" % "4.8" % "test"
> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
> "org.slf4j" % "slf4j-simple" % "1.7.5",
> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
> libraryDependencies += "org.apache.spark"%%"spark-core"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
> sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-mllib"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming-flume-sink" %
> "2.0.1"
> libraryDependencies += "org.apache.spark"%%"spark-sql"   % sparkVersion
>
>
> resolvers += "softprops-maven" at "http://dl.bintray.com/
> content/softprops/maven"
>
> Still seeing these kinds of errors  which seems to lead to the fact that
> somehow sbt is getting confused..
>
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:2:
> object mllib is not a member of package org.apache.spark
> [error] import org.apache.spark.mllib.linalg.{ Vector, Vectors }
> [error] ^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:3:
> object mllib is not a member of package org.apache.spark
> [error] import org.apache.spark.mllib.regression.LabeledPoint
> [error] ^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:4:
> object classification is not a member of package org.apache.spark.ml
> [error] import org.apache.spark.ml.classification.{
> RandomForestClassifier, RandomForestClassificationModel }
> [error]^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:6:
> object feature is not a member of package org.apache.spark.ml
> [error] import org.apache.spark.ml.feature.{ StringIndexer, IndexToString,
> VectorIndexer, VectorAssembler }
> [error]^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:7:
> object evaluation is not a member of package org.apache.spark.ml
> [error] import org.apache.spark.ml.evaluation.{ RegressionEvaluator,
> MulticlassClassificationEvaluator }
> [error]^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:8:
> object classification is not a member of package org.apache.spark.ml
> [error] import org.apache.spark.ml.classification._
> [error]^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:9:
> object tuning is not a member of package org.apache.spark.ml
> [error] import org.apache.spark.ml.tuning.{ CrossValidator,
> ParamGridBuilder }
> [error]^
> [error] C:\Users\marco\SparkExamples\src\main\scala\
> DecisionTreeExampleML.scala:10: object tuning is not a member of package
> org.apache.spark.ml
> [error] import org.apache.spark.ml.tuning.{ ParamGridBuilder,
> TrainValidationSplit }
> [error]^
> [error] C:\Users\marco\SparkExamples\src\main\scala\
> DecisionTreeExampleML.scala:16: object Pipeline is not a member of
> package org.apache.spark.ml
> [error] import org.apache.spark.ml.{ Pipeline, PipelineModel }
>
> any other hints?
>
> thanks and regarsd
>  marco
>
>
>
>
> On Sun, Nov 13, 2016 at 10:52 PM, Don Drake  wrote:
>
>> I would upgrade your Scala version to 2.11.8 as Spark 2.0 uses Scala 2.11
>> by default.
>>
>> On Sun, Nov 13, 2016 at 3:01 PM, Marco Mistroni 
>> wrote:
>>
>>> HI all
>>>  i have a small Spark-based project which at the moment depends on jar
>>> from Spark 1.6.0
>>> The project has few Spark examples plus one which depends on Flume
>>> libraries
>>>
>>>
>>> I am attempting to move to Spark 2.0, but i am having issues with
>>> my dependencies
>>> The stetup below works fine when compiled against 1.6.0 dependencies
>>>
>>> name := "SparkExamples"
>>> version := "1.0"
>>> scalaVersion := "2.10.5"
>>> val sparkVersion = "1.6.0"
>>>
>>>
>>> // Add a single dependency
>>> libraryDependencies += "junit" % "junit" % "4.8" % "test"
>>> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
>>> "org.slf4j" % "slf4j-simple" % "1.7.5",
>>> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
>>> libraryDependencies += "org.apache.spark"%%"spark-core"   %
>>> sparkVersion
>>> libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
>>> 

Cannot find Native Library in "cluster" deploy-mode

2016-11-14 Thread jtgenesis
hey guys,

I'm hoping someone could provide some assistance. I'm having issues
(UnsatisfiedLinkError) calling some native libraries when submitting the
application in "cluster" mode. When running the application in local mode,
the application runs fine. Here's what my setup looks like.

The .so files are stored in the same location across all cluster nodes. The
node that submits the application has its LD_LIBRARY_PATH pointing to the
native libraries. Within the application i set the environment variables
like so:

SparkContext(new SparkConf()
.setAppName("SparkDriver")
.setExecutorEnv("LD_LIBRARY_PATH", sys.env("LD_LIBRARY_PATH"))

I thought the .setExecutorEnv would give each node the correct path to the
native libraries, but I guess not. 

I'm not really sure what I'm missing in my setup, any help is greatly
appreciated.

Thanks!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-find-Native-Library-in-cluster-deploy-mode-tp28072.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: sbt shenanigans for a Spark-based project

2016-11-14 Thread Marco Mistroni
uhm.sorry.. still same issues. this is hte new version

name := "SparkExamples"
version := "1.0"
scalaVersion := "2.11.8"
val sparkVersion = "2.0.1"

// Add a single dependency
libraryDependencies += "junit" % "junit" % "4.8" % "test"
libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
"org.slf4j" % "slf4j-simple" % "1.7.5",
"org.clapper" %% "grizzled-slf4j" % "1.0.2")
libraryDependencies += "org.apache.spark"%%"spark-core"   % sparkVersion
libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
sparkVersion
libraryDependencies += "org.apache.spark"%%"spark-mllib"   % sparkVersion
libraryDependencies += "org.apache.spark"%%"spark-streaming-flume-sink" %
"2.0.1"
libraryDependencies += "org.apache.spark"%%"spark-sql"   % sparkVersion


resolvers += "softprops-maven" at "
http://dl.bintray.com/content/softprops/maven;

Still seeing these kinds of errors  which seems to lead to the fact that
somehow sbt is getting confused..

C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:2:
object mllib is not a member of package org.apache.spark
[error] import org.apache.spark.mllib.linalg.{ Vector, Vectors }
[error] ^
[error]
C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:3:
object mllib is not a member of package org.apache.spark
[error] import org.apache.spark.mllib.regression.LabeledPoint
[error] ^
[error]
C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:4:
object classification is not a member of package org.apache.spark.ml
[error] import org.apache.spark.ml.classification.{ RandomForestClassifier,
RandomForestClassificationModel }
[error]^
[error]
C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:6:
object feature is not a member of package org.apache.spark.ml
[error] import org.apache.spark.ml.feature.{ StringIndexer, IndexToString,
VectorIndexer, VectorAssembler }
[error]^
[error]
C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:7:
object evaluation is not a member of package org.apache.spark.ml
[error] import org.apache.spark.ml.evaluation.{ RegressionEvaluator,
MulticlassClassificationEvaluator }
[error]^
[error]
C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:8:
object classification is not a member of package org.apache.spark.ml
[error] import org.apache.spark.ml.classification._
[error]^
[error]
C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:9:
object tuning is not a member of package org.apache.spark.ml
[error] import org.apache.spark.ml.tuning.{ CrossValidator,
ParamGridBuilder }
[error]^
[error]
C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:10:
object tuning is not a member of package org.apache.spark.ml
[error] import org.apache.spark.ml.tuning.{ ParamGridBuilder,
TrainValidationSplit }
[error]^
[error]
C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:16:
object Pipeline is not a member of package org.apache.spark.ml
[error] import org.apache.spark.ml.{ Pipeline, PipelineModel }

any other hints?

thanks and regarsd
 marco




On Sun, Nov 13, 2016 at 10:52 PM, Don Drake  wrote:

> I would upgrade your Scala version to 2.11.8 as Spark 2.0 uses Scala 2.11
> by default.
>
> On Sun, Nov 13, 2016 at 3:01 PM, Marco Mistroni 
> wrote:
>
>> HI all
>>  i have a small Spark-based project which at the moment depends on jar
>> from Spark 1.6.0
>> The project has few Spark examples plus one which depends on Flume
>> libraries
>>
>>
>> I am attempting to move to Spark 2.0, but i am having issues with
>> my dependencies
>> The stetup below works fine when compiled against 1.6.0 dependencies
>>
>> name := "SparkExamples"
>> version := "1.0"
>> scalaVersion := "2.10.5"
>> val sparkVersion = "1.6.0"
>>
>>
>> // Add a single dependency
>> libraryDependencies += "junit" % "junit" % "4.8" % "test"
>> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
>> "org.slf4j" % "slf4j-simple" % "1.7.5",
>> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
>> libraryDependencies += "org.apache.spark"%%"spark-core"   % sparkVersion
>> libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
>> sparkVersion
>> libraryDependencies += "org.apache.spark"%%"spark-mllib"   %
>> sparkVersion
>> libraryDependencies += "org.apache.spark"%%"spark-streaming-flume" %
>> "1.3.0"
>> libraryDependencies += "org.apache.spark"%%"spark-sql"   % sparkVersion
>>
>>
>> resolvers += "softprops-maven" at "http://dl.bintray.com/content
>> /softprops/maven"
>>
>>
>>
>> This is the build.sbt version for using Spark 2 dependencies
>>
>> name := 

Re: scala.MatchError while doing BinaryClassificationMetrics

2016-11-14 Thread Nick Pentreath
Typically you pass in the result of a model transform to the evaluator.

So:
val model = estimator.fit(data)
val auc = evaluator.evaluate(model.transform(testData)

Check Scala API docs for some details:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

On Mon, 14 Nov 2016 at 20:02 Bhaarat Sharma  wrote:

Can you please suggest how I can use BinaryClassificationEvaluator? I tried:

scala> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

scala>  val evaluator = new BinaryClassificationEvaluator()
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator =
binEval_0d57372b7579

Try 1:

scala> evaluator.evaluate(testScoreAndLabel.rdd)
:105: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(Double, Double)]
 required: org.apache.spark.sql.Dataset[_]
   evaluator.evaluate(testScoreAndLabel.rdd)

Try 2:

scala> evaluator.evaluate(testScoreAndLabel)
java.lang.IllegalArgumentException: Field "rawPrediction" does not exist.
  at
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:228)

Try 3:

scala>
evaluator.evaluate(testScoreAndLabel.select("Label","ModelProbability"))
org.apache.spark.sql.AnalysisException: cannot resolve '`Label`' given
input columns: [_1, _2];
  at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)


On Mon, Nov 14, 2016 at 1:44 PM, Nick Pentreath 
wrote:

DataFrame.rdd returns an RDD[Row]. You'll need to use map to extract the
doubles from the test score and label DF.

But you may prefer to just use spark.ml evaluators, which work with
DataFrames. Try BinaryClassificationEvaluator.

On Mon, 14 Nov 2016 at 19:30, Bhaarat Sharma  wrote:

I am getting scala.MatchError in the code below. I'm not able to see why
this would be happening. I am using Spark 2.0.1

scala> testResults.columns
res538: Array[String] = Array(TopicVector, subject_id, hadm_id,
isElective, isNewborn, isUrgent, isEmergency, isMale, isFemale,
oasis_score, sapsii_score, sofa_score, age, hosp_death, test,
ModelFeatures, Label, rawPrediction, ModelProbability,
ModelPrediction)

scala> testResults.select("Label","ModelProbability").take(1)
res542: Array[org.apache.spark.sql.Row] =
Array([0.0,[0.737304818744076,0.262695181255924]])

scala> val testScoreAndLabel = testResults.
 | select("Label","ModelProbability").
 | map { case Row(l:Double, p:Vector) => (p(1), l) }
testScoreAndLabel: org.apache.spark.sql.Dataset[(Double, Double)] =
[_1: double, _2: double]

scala> testScoreAndLabel
res539: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double,
_2: double]

scala> testScoreAndLabel.columns
res540: Array[String] = Array(_1, _2)

scala> val testMetrics = new BinaryClassificationMetrics(testScoreAndLabel.rdd)
testMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
= org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@36e780d1

The code below gives the error

val auROC = testMetrics.areaUnderROC() //this line gives the error

Caused by: scala.MatchError:
[0.0,[0.7316583497453766,0.2683416502546234]] (of class
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)


Re: Grouping Set

2016-11-14 Thread ayan guha
And, run the same SQL in hive and post any difference.
On 15 Nov 2016 07:48, "ayan guha"  wrote:

> It should be A,yes. Can you please reproduce this with small data and
> exact SQL?
> On 15 Nov 2016 02:21, "Andrés Ivaldi"  wrote:
>
>> Hello, I'm tryin to use Grouping Set, but I dont know if it is a bug or
>> the correct behavior.
>>
>> Givven the above example
>> Select a,b,sum(c) from table group by a,b grouping set ( (a), (a,b) )
>>
>> What shound be the expected result
>> A:
>>
>> A  | B| sum(c)
>> xx | null | 
>> xx | yy   | 
>> xx | zz   | 
>>
>>
>> B
>> A   | B| sum(c)
>> xx  | null | 
>> xx  | yy   | 
>> xx  | zz   | 
>> null| yy   | 
>> null| zz   | 
>> null| null | 
>>
>>
>> I believe is A, but i'm getting B
>> thanks
>>
>> --
>> Ing. Ivaldi Andres
>>
>


Re: Grouping Set

2016-11-14 Thread ayan guha
It should be A,yes. Can you please reproduce this with small data and exact
SQL?
On 15 Nov 2016 02:21, "Andrés Ivaldi"  wrote:

> Hello, I'm tryin to use Grouping Set, but I dont know if it is a bug or
> the correct behavior.
>
> Givven the above example
> Select a,b,sum(c) from table group by a,b grouping set ( (a), (a,b) )
>
> What shound be the expected result
> A:
>
> A  | B| sum(c)
> xx | null | 
> xx | yy   | 
> xx | zz   | 
>
>
> B
> A   | B| sum(c)
> xx  | null | 
> xx  | yy   | 
> xx  | zz   | 
> null| yy   | 
> null| zz   | 
> null| null | 
>
>
> I believe is A, but i'm getting B
> thanks
>
> --
> Ing. Ivaldi Andres
>


Re: Newbie question - Best way to bootstrap with Spark

2016-11-14 Thread Jon Gregg
Piggybacking off this - how are you guys teaching DataFrames and Datasets
to new users?  I haven't taken the edx courses but I don't see Spark SQL
covered heavily in the syllabus.  I've dug through the Databricks
documentation but it's a lot of information for a new user I think - hoping
there is a video or course option instead.

On Mon, Nov 14, 2016 at 11:13 AM, Rishikesh Teke 
wrote:

> Integrate spark with apache zeppelin  https://zeppelin.apache.org/
>    its again a very handy way to bootstrap
> with spark.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Newbie-question-Best-way-to-bootstrap-with-Spark-
> tp28032p28069.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming: question on sticky session across batches ?

2016-11-14 Thread Manish Malhotra
sending again.
any help is appreciated !

thanks in advance.

On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra <
manish.malhotra.w...@gmail.com> wrote:

> Hello Spark Devs/Users,
>
> Im trying to solve the use case with Spark Streaming 1.6.2 where for every
> batch ( say 2 mins) data needs to go to the same reducer node after
> grouping by key.
> The underlying storage is Cassandra and not HDFS.
>
> This is a map-reduce job, where also trying to use the partitions of the
> Cassandra table to batch the data for the same partition.
>
> The requirement of sticky session/partition across batches is because the
> operations which we need to do, needs to read data for every key and then
> merge this with the current batch aggregate values. So, currently when
> there is no stickyness across batches, we have to read for every key, merge
> and then write back. and reads are very expensive. So, if we have sticky
> session, we can avoid read in every batch and have a cache of till last
> batch aggregates across batches.
>
> So, there are few options, can think of:
>
> 1. to change the TaskSchedulerImpl, as its using Random to identify the
> node for mapper/reducer before starting the batch/phase.
> Not sure if there is a custom scheduler way of achieving it?
>
> 2. Can custom RDD can help to find the node for the key-->node.
> there is a getPreferredLocation() method.
> But not sure, whether this will be persistent or can vary for some edge
> cases?
>
> Thanks in advance for you help and time !
>
> Regards,
> Manish
>


Pasting oddity with Spark 2.0 (scala)

2016-11-14 Thread jggg777
This one has stumped the group here, hoping to get some insight into why this
error is happening.

I'm going through the  Databricks DataFrames scala docs

 
.  Halfway down is the "Flattening" code, which I've copied below:

*The original code*

>>>
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

implicit class DataFrameFlattener(df: DataFrame) {
  def flattenSchema: DataFrame = {
df.select(flatten(Nil, df.schema): _*)
  }
  
  protected def flatten(path: Seq[String], schema: DataType): Seq[Column] =
schema match {
case s: StructType => s.fields.flatMap(f => flatten(path :+ f.name,
f.dataType))
case other => col(path.map(n =>
s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
  }
}
>>>

*Pasting into spark-shell with right-click (or pasting into Zeppelin)*


On EMR using Spark 2.0.0 (this also happens on Spark 2.0.1), running
"spark-shell", I right click to paste in the code above.  Here are the
errors I get.  Note that I get the same errors when I paste into Zeppelin on
EMR.

>>>
scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala>

scala> implicit class DataFrameFlattener(df: DataFrame) {
 |   def flattenSchema: DataFrame = {
 | df.select(flatten(Nil, df.schema): _*)
 |   }
 |
 |   protected def flatten(path: Seq[String], schema: DataType):
Seq[Column] = schema match {
 | case s: StructType => s.fields.flatMap(f => flatten(path :+
f.name, f.dataType))
 | case other => col(path.map(n =>
s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
 |   }
 | }
:11: error: not found: type DataFrame
   implicit class DataFrameFlattener(df: DataFrame) {
 ^
:12: error: not found: type DataFrame
 def flattenSchema: DataFrame = {
^
:16: error: not found: type Column
 protected def flatten(path: Seq[String], schema: DataType):
Seq[Column] = schema match {
 ^
:16: error: not found: type DataType
 protected def flatten(path: Seq[String], schema: DataType):
Seq[Column] = schema match {
  ^
:17: error: not found: type StructType
   case s: StructType => s.fields.flatMap(f => flatten(path :+
f.name, f.dataType))
   ^
:18: error: not found: value col
   case other => col(path.map(n =>
s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
>>>

*Pasting using :paste in spark-shell*


However when I paste the same code into spark-shell using :paste, the code
succeeds.

>>>
scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

implicit class DataFrameFlattener(df: DataFrame) {
  def flattenSchema: DataFrame = {
df.select(flatten(Nil, df.schema): _*)
  }

  protected def flatten(path: Seq[String], schema: DataType): Seq[Column] =
schema match {
case s: StructType => s.fields.flatMap(f => flatten(path :+ f.name,
f.dataType))
case other => col(path.map(n =>
s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
  }
}

// Exiting paste mode, now interpreting.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
defined class DataFrameFlattener
>>>


Any idea what's going on here, and how to get this code working in Zeppelin? 
One thing we've found is that providing the full paths for DataFrame,
StructType, etc (for example org.apache.spark.sql.DataFrame) does work, but
it's a painful workaround and we don't know why the imports don't seem to be
working as usual.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pasting-oddity-with-Spark-2-0-scala-tp28071.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: scala.MatchError while doing BinaryClassificationMetrics

2016-11-14 Thread Bhaarat Sharma
Can you please suggest how I can use BinaryClassificationEvaluator? I tried:

scala> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

scala>  val evaluator = new BinaryClassificationEvaluator()
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator =
binEval_0d57372b7579

Try 1:

scala> evaluator.evaluate(testScoreAndLabel.rdd)
:105: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(Double, Double)]
 required: org.apache.spark.sql.Dataset[_]
   evaluator.evaluate(testScoreAndLabel.rdd)

Try 2:

scala> evaluator.evaluate(testScoreAndLabel)
java.lang.IllegalArgumentException: Field "rawPrediction" does not exist.
  at
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:228)

Try 3:

scala>
evaluator.evaluate(testScoreAndLabel.select("Label","ModelProbability"))
org.apache.spark.sql.AnalysisException: cannot resolve '`Label`' given
input columns: [_1, _2];
  at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)


On Mon, Nov 14, 2016 at 1:44 PM, Nick Pentreath 
wrote:

> DataFrame.rdd returns an RDD[Row]. You'll need to use map to extract the
> doubles from the test score and label DF.
>
> But you may prefer to just use spark.ml evaluators, which work with
> DataFrames. Try BinaryClassificationEvaluator.
>
> On Mon, 14 Nov 2016 at 19:30, Bhaarat Sharma  wrote:
>
>> I am getting scala.MatchError in the code below. I'm not able to see why
>> this would be happening. I am using Spark 2.0.1
>>
>> scala> testResults.columns
>> res538: Array[String] = Array(TopicVector, subject_id, hadm_id, isElective, 
>> isNewborn, isUrgent, isEmergency, isMale, isFemale, oasis_score, 
>> sapsii_score, sofa_score, age, hosp_death, test, ModelFeatures, Label, 
>> rawPrediction, ModelProbability, ModelPrediction)
>>
>> scala> testResults.select("Label","ModelProbability").take(1)
>> res542: Array[org.apache.spark.sql.Row] = 
>> Array([0.0,[0.737304818744076,0.262695181255924]])
>>
>> scala> val testScoreAndLabel = testResults.
>>  | select("Label","ModelProbability").
>>  | map { case Row(l:Double, p:Vector) => (p(1), l) }
>> testScoreAndLabel: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: 
>> double, _2: double]
>>
>> scala> testScoreAndLabel
>> res539: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double, _2: 
>> double]
>>
>> scala> testScoreAndLabel.columns
>> res540: Array[String] = Array(_1, _2)
>>
>> scala> val testMetrics = new 
>> BinaryClassificationMetrics(testScoreAndLabel.rdd)
>> testMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = 
>> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@36e780d1
>>
>> The code below gives the error
>>
>> val auROC = testMetrics.areaUnderROC() //this line gives the error
>>
>> Caused by: scala.MatchError: [0.0,[0.7316583497453766,0.2683416502546234]] 
>> (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
>>
>>


Re: scala.MatchError while doing BinaryClassificationMetrics

2016-11-14 Thread Nick Pentreath
DataFrame.rdd returns an RDD[Row]. You'll need to use map to extract the
doubles from the test score and label DF.

But you may prefer to just use spark.ml evaluators, which work with
DataFrames. Try BinaryClassificationEvaluator.

On Mon, 14 Nov 2016 at 19:30, Bhaarat Sharma  wrote:

> I am getting scala.MatchError in the code below. I'm not able to see why
> this would be happening. I am using Spark 2.0.1
>
> scala> testResults.columns
> res538: Array[String] = Array(TopicVector, subject_id, hadm_id, isElective, 
> isNewborn, isUrgent, isEmergency, isMale, isFemale, oasis_score, 
> sapsii_score, sofa_score, age, hosp_death, test, ModelFeatures, Label, 
> rawPrediction, ModelProbability, ModelPrediction)
>
> scala> testResults.select("Label","ModelProbability").take(1)
> res542: Array[org.apache.spark.sql.Row] = 
> Array([0.0,[0.737304818744076,0.262695181255924]])
>
> scala> val testScoreAndLabel = testResults.
>  | select("Label","ModelProbability").
>  | map { case Row(l:Double, p:Vector) => (p(1), l) }
> testScoreAndLabel: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: 
> double, _2: double]
>
> scala> testScoreAndLabel
> res539: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double, _2: 
> double]
>
> scala> testScoreAndLabel.columns
> res540: Array[String] = Array(_1, _2)
>
> scala> val testMetrics = new 
> BinaryClassificationMetrics(testScoreAndLabel.rdd)
> testMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = 
> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@36e780d1
>
> The code below gives the error
>
> val auROC = testMetrics.areaUnderROC() //this line gives the error
>
> Caused by: scala.MatchError: [0.0,[0.7316583497453766,0.2683416502546234]] 
> (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
>
>


Re: Two questions about running spark on mesos

2016-11-14 Thread Michael Gummelt
1. I had never even heard of conf/slaves until this email, and I only see
it referenced in the docs next to Spark Standalone, so I doubt that works.

2. Yes.  See the --kill option in spark-submit.

Also, we're considering dropping the Spark dispatcher in DC/OS in favor of
Metronome, which will be our consolidated method of running any one-off
jobs.  The dispatcher is really just a lesser maintained and more
feature-sparse metronome.  If I were you, I would look into running
Metronome rather than the dispatcher (or just run DC/OS).

On Mon, Nov 14, 2016 at 3:10 AM, Yu Wei  wrote:

> Hi Guys,
>
>
> Two questions about running spark on mesos.
>
> 1, Does spark configuration of conf/slaves still work when running spark
> on mesos?
>
> According to my observations, it seemed that conf/slaves still took
> effect when running spark-shell.
>
> However, it doesn't take effect when deploying in cluster mode.
>
> Is this expected behavior?
>
>Or did I miss anything?
>
>
> 2, Could I kill submitted jobs when running spark on mesos in cluster mode?
>
> I launched spark on mesos in cluster mode. Then submitted a long
> running job succeeded.
>
> Then I want to kill the job.
> How could I do that? Is there any similar commands as launching spark
> on yarn?
>
>
> Thanks,
>
> Jared, (韦煜)
> Software developer
> Interested in open source software, big data, Linux
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


scala.MatchError while doing BinaryClassificationMetrics

2016-11-14 Thread Bhaarat Sharma
I am getting scala.MatchError in the code below. I'm not able to see why
this would be happening. I am using Spark 2.0.1

scala> testResults.columns
res538: Array[String] = Array(TopicVector, subject_id, hadm_id,
isElective, isNewborn, isUrgent, isEmergency, isMale, isFemale,
oasis_score, sapsii_score, sofa_score, age, hosp_death, test,
ModelFeatures, Label, rawPrediction, ModelProbability,
ModelPrediction)

scala> testResults.select("Label","ModelProbability").take(1)
res542: Array[org.apache.spark.sql.Row] =
Array([0.0,[0.737304818744076,0.262695181255924]])

scala> val testScoreAndLabel = testResults.
 | select("Label","ModelProbability").
 | map { case Row(l:Double, p:Vector) => (p(1), l) }
testScoreAndLabel: org.apache.spark.sql.Dataset[(Double, Double)] =
[_1: double, _2: double]

scala> testScoreAndLabel
res539: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double,
_2: double]

scala> testScoreAndLabel.columns
res540: Array[String] = Array(_1, _2)

scala> val testMetrics = new BinaryClassificationMetrics(testScoreAndLabel.rdd)
testMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
= org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@36e780d1

The code below gives the error

val auROC = testMetrics.areaUnderROC() //this line gives the error

Caused by: scala.MatchError:
[0.0,[0.7316583497453766,0.2683416502546234]] (of class
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)


Spark streaming data loss due to timeout in writing BlockAdditionEvent to WAL by the driver

2016-11-14 Thread Arijit
Hi,


We are seeing another case of data loss/drop when the following exception 
happens. This particular Exception treated as WARN resulted in dropping 2095 
events from processing.


16/10/26 19:24:08 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 
BlockAdditionEvent(ReceivedBlockInfo(12,Some(2095),None,WriteAheadLogBasedStoreResult(input-12-1477508431881,Some(2095),FileBasedWriteAheadLogSegment(hdfs://mycluster/commerce/streamingContextCheckpointDir/receivedData/12/log-1477509840005-147750995,0,2097551
 to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 
milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

We tried with increasing the timeout to 60 seconds but could not eliminate the 
issue completely. Requesting suggestions on what would be the recourse to stop 
this data bleeding.

Thanks, Arijit



Re: Convert SparseVector column to Densevector column

2016-11-14 Thread janardhan shetty
This worked thanks Maropu.

On Sun, Nov 13, 2016 at 9:34 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> How about this?
>
> import org.apache.spark.ml.linalg._
> val toSV = udf((v: Vector) => v.toDense)
> val df = Seq((0.1, Vectors.sparse(16, Array(0, 3), Array(0.1, 0.3))),
> (0.2, Vectors.sparse(16, Array(0, 3), Array(0.1, 0.3.toDF("a", "b")
> df.select(toSV($"b"))
>
> // maropu
>
>
> On Mon, Nov 14, 2016 at 1:20 PM, janardhan shetty 
> wrote:
>
>> Hi,
>>
>> Is there any easy way of converting a dataframe column from SparseVector
>> to DenseVector  using
>>
>> import org.apache.spark.ml.linalg.DenseVector API ?
>>
>> Spark ML 2.0
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: took more time to get data from spark dataset to driver program

2016-11-14 Thread Rishikesh Teke
Again if you run spark cluster in standalone mode with optimum number of
executors with balanced cores and memory configuration, it will run faster
as more parallel operations took place.  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/took-more-time-to-get-data-from-spark-dataset-to-driver-program-tp28041p28070.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Newbie question - Best way to bootstrap with Spark

2016-11-14 Thread Rishikesh Teke
Integrate spark with apache zeppelin  https://zeppelin.apache.org/
   its again a very handy way to bootstrap
with spark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Best-way-to-bootstrap-with-Spark-tp28032p28069.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Nearest neighbour search

2016-11-14 Thread Nick Pentreath
LSH-based NN search and similarity join should be out in Spark 2.1 -
there's a little work being done still to clear up the APIs and some
functionality.

Check out https://issues.apache.org/jira/browse/SPARK-5992

On Mon, 14 Nov 2016 at 16:12, Kevin Mellott 
wrote:

> You may be able to benefit from Soundcloud's open source implementation,
> either as a solution or as a reference implementation.
>
> https://github.com/soundcloud/cosine-lsh-join-spark
>
> Thanks,
> Kevin
>
> On Sun, Nov 13, 2016 at 2:07 PM, Meeraj Kunnumpurath <
> mee...@servicesymphony.com> wrote:
>
> That was a bit of a brute force search, so I changed the code to use a UDF
> to create the dot product between the two IDF vectors, and do a sort on the
> new column.
>
> package com.ss.ml.clustering
>
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import org.apache.spark.sql.functions._
> import org.apache.spark.ml.feature.{IDF, Tokenizer, HashingTF}
> import org.apache.spark.ml.linalg.Vector
>
> object ClusteringBasics extends App {
>
>   val spark = SparkSession.builder().appName("Clustering 
> Basics").master("local").getOrCreate()
>   import spark.implicits._
>
>   val df = spark.read.option("header", "false").csv("data")
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>   val df1 = tf.transform(tk.transform(df))
>   val idfs = idf.fit(df1).transform(df1)
>
>   val nn = nearestNeighbour("", 
> idfs)
>   println(nn)
>
>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
> val tfIdfSrc = ds.filter(s"_c0 == 
> '$uri'").take(1)(0).getAs[Vector]("tf-idf")
> def dorProduct(vectorA: Vector) = {
>   var dp = 0.0
>   var index = vectorA.size - 1
>   for (i <- 0 to index) {
> dp += vectorA(i) * tfIdfSrc(i)
>   }
>   dp
> }
> val dpUdf = udf((v1: Vector, v2: Vector) => dorProduct(v1))
> ds.filter(s"_c0 != '$uri'").withColumn("dp", 
> dpUdf('tf-idf)).sort("dp").take(1)(0).getString(1)
>   }
>
> }
>
>
> However, that is generating the exception below,
>
> Exception in thread "main" java.lang.RuntimeException: Unsupported literal
> type class org.apache.spark.ml.feature.IDF idf_e49381a285dd
> at
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
> at org.apache.spark.sql.functions$.lit(functions.scala:101)
> at org.apache.spark.sql.Column.$minus(Column.scala:672)
> at
> com.ss.ml.clustering.ClusteringBasics$.nearestNeighbour(ClusteringBasics.scala:36)
> at
> com.ss.ml.clustering.ClusteringBasics$.delayedEndpoint$com$ss$ml$clustering$ClusteringBasics$1(ClusteringBasics.scala:22)
> at
> com.ss.ml.clustering.ClusteringBasics$delayedInit$body.apply(ClusteringBasics.scala:8)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at com.ss.ml.clustering.ClusteringBasics$.main(ClusteringBasics.scala:8)
> at com.ss.ml.clustering.ClusteringBasics.main(ClusteringBasics.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> On Sun, Nov 13, 2016 at 10:56 PM, Meeraj Kunnumpurath <
> mee...@servicesymphony.com> wrote:
>
> This is what I have done, is there a better way of doing this?
>
>   val df = spark.read.option("header", "false").csv("data")
>
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>
>   val df1 = tf.transform(tk.transform(df))
>
>   val idfs = idf.fit(df1).transform(df1)
>
>
>   println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama;,
> idfs))
>
>
>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
>
> var res : Row = null
>
> var metric : Double = 0
>
> val tfIdfSrc = ds.filter(s"_c0 ==
> '$uri'").take(1)(0).getAs[Vector]("tf-idf")
>
> ds.filter("_c0 != '" + uri + "'").foreach { r =>
>
>   val tfIdfDst = r.getAs[Vector]("tf-idf")
>
>   val dp = dorProduct(tfIdfSrc, tfIdfDst)
>
>   if (dp > metric) {
>
> res = r
>
> metric = dp
>
>   }
>
> }
>
> return res.getAs[String]("_c1")
>
>   }
>

Grouping Set

2016-11-14 Thread Andrés Ivaldi
Hello, I'm tryin to use Grouping Set, but I dont know if it is a bug or the
correct behavior.

Givven the above example
Select a,b,sum(c) from table group by a,b grouping set ( (a), (a,b) )

What shound be the expected result
A:

A  | B| sum(c)
xx | null | 
xx | yy   | 
xx | zz   | 


B
A   | B| sum(c)
xx  | null | 
xx  | yy   | 
xx  | zz   | 
null| yy   | 
null| zz   | 
null| null | 


I believe is A, but i'm getting B
thanks

-- 
Ing. Ivaldi Andres


Re: Nearest neighbour search

2016-11-14 Thread Kevin Mellott
You may be able to benefit from Soundcloud's open source implementation,
either as a solution or as a reference implementation.

https://github.com/soundcloud/cosine-lsh-join-spark

Thanks,
Kevin

On Sun, Nov 13, 2016 at 2:07 PM, Meeraj Kunnumpurath <
mee...@servicesymphony.com> wrote:

> That was a bit of a brute force search, so I changed the code to use a UDF
> to create the dot product between the two IDF vectors, and do a sort on the
> new column.
>
> package com.ss.ml.clustering
>
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import org.apache.spark.sql.functions._
> import org.apache.spark.ml.feature.{IDF, Tokenizer, HashingTF}
> import org.apache.spark.ml.linalg.Vector
>
> object ClusteringBasics extends App {
>
>   val spark = SparkSession.builder().appName("Clustering 
> Basics").master("local").getOrCreate()
>   import spark.implicits._
>
>   val df = spark.read.option("header", "false").csv("data")
>
>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>   val df1 = tf.transform(tk.transform(df))
>   val idfs = idf.fit(df1).transform(df1)
>
>   val nn = nearestNeighbour("", 
> idfs)
>   println(nn)
>
>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
> val tfIdfSrc = ds.filter(s"_c0 == 
> '$uri'").take(1)(0).getAs[Vector]("tf-idf")
> def dorProduct(vectorA: Vector) = {
>   var dp = 0.0
>   var index = vectorA.size - 1
>   for (i <- 0 to index) {
> dp += vectorA(i) * tfIdfSrc(i)
>   }
>   dp
> }
> val dpUdf = udf((v1: Vector, v2: Vector) => dorProduct(v1))
> ds.filter(s"_c0 != '$uri'").withColumn("dp", 
> dpUdf('tf-idf)).sort("dp").take(1)(0).getString(1)
>   }
>
> }
>
>
> However, that is generating the exception below,
>
> Exception in thread "main" java.lang.RuntimeException: Unsupported literal
> type class org.apache.spark.ml.feature.IDF idf_e49381a285dd
> at org.apache.spark.sql.catalyst.expressions.Literal$.apply(
> literals.scala:57)
> at org.apache.spark.sql.functions$.lit(functions.scala:101)
> at org.apache.spark.sql.Column.$minus(Column.scala:672)
> at com.ss.ml.clustering.ClusteringBasics$.nearestNeighbour(
> ClusteringBasics.scala:36)
> at com.ss.ml.clustering.ClusteringBasics$.delayedEndpoint$com$ss$ml$
> clustering$ClusteringBasics$1(ClusteringBasics.scala:22)
> at com.ss.ml.clustering.ClusteringBasics$delayedInit$
> body.apply(ClusteringBasics.scala:8)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> at scala.runtime.AbstractFunction0.apply$mcV$
> sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.generic.TraversableForwarder$class.
> foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at com.ss.ml.clustering.ClusteringBasics$.main(ClusteringBasics.scala:8)
> at com.ss.ml.clustering.ClusteringBasics.main(ClusteringBasics.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> On Sun, Nov 13, 2016 at 10:56 PM, Meeraj Kunnumpurath <
> mee...@servicesymphony.com> wrote:
>
>> This is what I have done, is there a better way of doing this?
>>
>>   val df = spark.read.option("header", "false").csv("data")
>>
>>
>>   val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>>
>>   val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>>
>>   val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>>
>>
>>   val df1 = tf.transform(tk.transform(df))
>>
>>   val idfs = idf.fit(df1).transform(df1)
>>
>>
>>   println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama;,
>> idfs))
>>
>>
>>   def nearestNeighbour(uri: String, ds: DataFrame) : String = {
>>
>> var res : Row = null
>>
>> var metric : Double = 0
>>
>> val tfIdfSrc = ds.filter(s"_c0 == '$uri'").take(1)(0).getAs[Vect
>> or]("tf-idf")
>>
>> ds.filter("_c0 != '" + uri + "'").foreach { r =>
>>
>>   val tfIdfDst = r.getAs[Vector]("tf-idf")
>>
>>   val dp = dorProduct(tfIdfSrc, tfIdfDst)
>>
>>   if (dp > metric) {
>>
>> res = r
>>
>> metric = dp
>>
>>   }
>>
>> }
>>
>> return res.getAs[String]("_c1")
>>
>>   }
>>
>>
>>   def cosineSimilarity(vectorA: Vector, vectorB: Vector) = {
>>
>> var dotProduct = 0.0
>>
>> var normA = 0.0
>>
>> var normB = 0.0
>>
>> var index = vectorA.size - 1
>>
>> for (i <- 0 to index) {
>>
>>   dotProduct += 

Re: spark streaming with kinesis

2016-11-14 Thread Shushant Arora
1.No, I want to implement low level consumer on kinesis stream.
so need to stop the worker once it read the latest sequence number sent by
driver.

2.What is the cost of frequent register and deregister of worker node. Is
that when worker's shutdown is called it will terminate run method but
leasecoordinator will wait for 2seconds before releasing the lease. So I
cannot deregister a worker in less than 2 seconds ?

Thanks!



On Mon, Nov 14, 2016 at 7:36 PM, Takeshi Yamamuro 
wrote:

> Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not
> enough for your usecase?
>
> On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Thanks!
>> Is there a way to get the latest sequence number of all shards of a
>> kinesis stream?
>>
>>
>>
>> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro 
>> wrote:
>>
>>> Hi,
>>>
>>> The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
>>> in KinesisClientLibConfiguration though,
>>> it is not configurable in the current implementation.
>>>
>>> The detail can be found in;
>>> https://github.com/apache/spark/blob/master/external/kinesis
>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/
>>> KinesisReceiver.scala#L152
>>>
>>> // maropu
>>>
>>>
>>> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 *Hi *

 *is **spark.streaming.blockInterval* for kinesis input stream is
 hardcoded to 1 sec or is it configurable ? Time interval at which receiver
 fetched data from kinesis .

 Means stream batch interval cannot be less than 
 *spark.streaming.blockInterval
 and this should be configrable , Also is there any minimum value for
 streaming batch interval ?*

 *Thanks*


>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: spark streaming with kinesis

2016-11-14 Thread Takeshi Yamamuro
Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not enough
for your usecase?

On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora 
wrote:

> Thanks!
> Is there a way to get the latest sequence number of all shards of a
> kinesis stream?
>
>
>
> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
>> in KinesisClientLibConfiguration though,
>> it is not configurable in the current implementation.
>>
>> The detail can be found in;
>> https://github.com/apache/spark/blob/master/external/kinesis
>> -asl/src/main/scala/org/apache/spark/streaming/kinesis
>> /KinesisReceiver.scala#L152
>>
>> // maropu
>>
>>
>> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> *Hi *
>>>
>>> *is **spark.streaming.blockInterval* for kinesis input stream is
>>> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
>>> fetched data from kinesis .
>>>
>>> Means stream batch interval cannot be less than 
>>> *spark.streaming.blockInterval
>>> and this should be configrable , Also is there any minimum value for
>>> streaming batch interval ?*
>>>
>>> *Thanks*
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-11-14 Thread Shushant Arora
Thanks!
Is there a way to get the latest sequence number of all shards of a kinesis
stream?



On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> The time interval can be controlled by `IdleTimeBetweenReadsInMillis` in 
> KinesisClientLibConfiguration
> though,
> it is not configurable in the current implementation.
>
> The detail can be found in;
> https://github.com/apache/spark/blob/master/external/
> kinesis-asl/src/main/scala/org/apache/spark/streaming/
> kinesis/KinesisReceiver.scala#L152
>
> // maropu
>
>
> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> *Hi *
>>
>> *is **spark.streaming.blockInterval* for kinesis input stream is
>> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
>> fetched data from kinesis .
>>
>> Means stream batch interval cannot be less than 
>> *spark.streaming.blockInterval
>> and this should be configrable , Also is there any minimum value for
>> streaming batch interval ?*
>>
>> *Thanks*
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: spark streaming with kinesis

2016-11-14 Thread Takeshi Yamamuro
Hi,

The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
in KinesisClientLibConfiguration though,
it is not configurable in the current implementation.

The detail can be found in;
https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala#L152

// maropu


On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora 
wrote:

> *Hi *
>
> *is **spark.streaming.blockInterval* for kinesis input stream is
> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
> fetched data from kinesis .
>
> Means stream batch interval cannot be less than *spark.streaming.blockInterval
> and this should be configrable , Also is there any minimum value for
> streaming batch interval ?*
>
> *Thanks*
>
>


-- 
---
Takeshi Yamamuro


Spark hash function

2016-11-14 Thread Rohit Verma
Hi All,

One of the miscellaneous functions in spark sql is hash
expression[Murmur3Hash]("hash"),

I was wondering whether its  which variant of murmurhas3

murmurhash3_x64_128
murmurhash3_x86_32 ( this is also part of spark unsafe package). 

Also what is seed for the hash function.

I am doing a query as as 

dataset = spark.sql(“select hash(name) from employee”);

what would be correct way to check equableness with this like 

dataset.first.equals(#MyHashFunction(val)) is true.

Thanks all.

Regards
Rohit

Two questions about running spark on mesos

2016-11-14 Thread Yu Wei
Hi Guys,


Two questions about running spark on mesos.

1, Does spark configuration of conf/slaves still work when running spark on 
mesos?

According to my observations, it seemed that conf/slaves still took effect 
when running spark-shell.

However, it doesn't take effect when deploying in cluster mode.

Is this expected behavior?

   Or did I miss anything?


2, Could I kill submitted jobs when running spark on mesos in cluster mode?

I launched spark on mesos in cluster mode. Then submitted a long running 
job succeeded.

Then I want to kill the job.

How could I do that? Is there any similar commands as launching spark on 
yarn?



Thanks,

Jared, (??)
Software developer
Interested in open source software, big data, Linux


SparkSQL: intra-SparkSQL-application table registration

2016-11-14 Thread Mohamed Nadjib Mami

Hello,

I've asked the following question [1] on Stackoverflow but didn't get an 
answer, yet. I use now this channel to give it more visibility, and 
hopefully find someone who can help.


"*Context.* I have tens of SQL queries stored in separate files. For 
benchmarking purposes, I created an application that iterates through 
each of those query files and passes it to a standalone Spark 
application. This latter /first/ parses the query, extracts the used 
tables, registers them (using: registerTempTable() in Spark < 2 and 
createOrReplaceTempView() in Spark 2), and executes effectively the 
query (spark.sql()).


*Challenge.* Since registering the tables can sometimes be time 
consuming, I would like to register the tables only once when they are 
first used, and keep that in form of metadata that can readily be used 
in the subsequent queries without the need to re-register the tables 
again. It's a sort of intra-job caching but not any of the caching Spark 
offers (table caching), as far as I know.


Is that possible? if not can anyone suggest another approach to 
accomplish the same goal (i.e., iterating through separate query files 
and run a querying Spark application without registering the tables that 
have already been registered before)."


[1]: 
http://stackoverflow.com/questions/40549924/sparksql-intra-sparksql-application-table-registration


Cheers,
Mohamed



Re: AVRO File size when caching in-memory

2016-11-14 Thread Jörn Franke
spark version? Are you using tungsten?

> On 14 Nov 2016, at 10:05, Prithish  wrote:
> 
> Can someone please explain why this happens?
> 
> When I read a 600kb AVRO file and cache this in memory (using cacheTable), it 
> shows up as 11mb (storage tab in Spark UI). I have tried this with different 
> file sizes, and the size in-memory is always proportionate. I thought Spark 
> compresses when using cacheTable. 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



AVRO File size when caching in-memory

2016-11-14 Thread Prithish
Can someone please explain why this happens?

When I read a 600kb AVRO file and cache this in memory (using cacheTable),
it shows up as 11mb (storage tab in Spark UI). I have tried this with
different file sizes, and the size in-memory is always proportionate. I
thought Spark compresses when using cacheTable.