code generation memory issue

2016-11-21 Thread geoHeil
I am facing a strange issue when trying to correct some errors in my raw data 
The problem is reported here:
https://issues.apache.org/jira/browse/SPARK-18532



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/code-generation-memory-issue-tp28114.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: newbie question about RDD

2016-11-21 Thread Raghav
Sorry I forgot to ask how can I use spark context here ? I have hdfs
directory path of the files, as well as the name node of hdfs cluster.

Thanks for your help.

On Mon, Nov 21, 2016 at 9:45 PM, Raghav  wrote:

> Hi
>
> I am extremely new to Spark. I have to read a file form HDFS, and get it
> in memory  in RDD format.
>
> I have a Java class as follows:
>
> class Person {
> private long UUID;
> private String FirstName;
> private String LastName;
> private String zip;
>
>// public methods
> }
>
> The file in HDFS is as follows:
>
> UUID. FirstName LastName Zip
> 7462   John Doll06903
> 5231   Brad Finley 32820
>
>
> Can someone point me how to get a JavaRDD object by reading the
> file in HDFS ?
>
> Thanks.
>
> --
> Raghav
>



-- 
Raghav


newbie question about RDD

2016-11-21 Thread Raghav
Hi

I am extremely new to Spark. I have to read a file form HDFS, and get it in
memory  in RDD format.

I have a Java class as follows:

class Person {
private long UUID;
private String FirstName;
private String LastName;
private String zip;

   // public methods
}

The file in HDFS is as follows:

UUID. FirstName LastName Zip
7462   John Doll06903
5231   Brad Finley 32820


Can someone point me how to get a JavaRDD object by reading the
file in HDFS ?

Thanks.

-- 
Raghav


Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-21 Thread kant kodali
Hi Michael,

I only see spark 2.0.2 which is what I am using currently. Any idea on when
2.1 will be released?

Thanks,
kant

On Mon, Nov 21, 2016 at 5:12 PM, Michael Armbrust 
wrote:

> In Spark 2.1 we've added a from_json
> 
> function that I think will do what you want.
>
> On Fri, Nov 18, 2016 at 2:29 AM, kant kodali  wrote:
>
>> This seem to work
>>
>> import org.apache.spark.sql._
>> val rdd = df2.rdd.map { case Row(j: String) => j }
>> spark.read.json(rdd).show()
>>
>> However I wonder if this any inefficiency here ? since I have to apply
>> this function for billion rows.
>>
>>
>


Re: RDD Partitions not distributed evenly to executors

2016-11-21 Thread Thunder Stumpges
Has anyone figured this out yet!? I have gone looking for this exact
problem (spark 1.6.1) and I cannot get my partitions to be distributed
evenly across executors no matter what I've tried. it has been mentioned
several other times in the user group as well as the dev group (as
mentioned by Mike Hynes initially, as well as a few others I found).

It seems this has been a known issue for the better part of 6 months. Hard
to believe it has had no real progress.

Has anyone got a work around or something that can spread the cached RDD
partitions evenly across executors?

This is having a major performance impact on my Spark Streaming application
which is extremely imbalanced currently.

Thanks in advance!
Thunder


On Wed, Apr 6, 2016 at 6:36 AM Mike Hynes <91m...@gmail.com> wrote:

> Hello All (and Devs in particular),
>
> Thank you again for your further responses. Please find a detailed
> email below which identifies the cause (I believe) of the partition
> imbalance problem, which occurs in spark 1.5, 1.6, and a 2.0-SNAPSHOT.
> This is followed by follow-up questions for the dev community with
> more intimate knowledge of the scheduler so that they may confirm my
> guess at the cause, and please provide insight at how best to avoid
> the problem.
>
> Attached to this email are Gantt-chart plots which show the task
> execution over elapsed time in a Spark program. This program was meant
> to investigate the simplest possible vector operation for block-vector
> data stored in RDDs of type RDD[(Int,Vector)]. In the Gantt plots,
> you'll see the tasks shown as horizontal lines along the x axis, which
> shows elapsed time. The shaded regions represent a single executor
> such that all tasks managed by a single executor lie in a contiguous
> shaded region. The executors all managed 16 cores on 4 different
> compute nodes, and the tasks have been sorted and fit into 16 slots
> for each executor according their chronological order, as determined
> by the task information in the event log for the program, such that
> the y-axis corresponds to essentially the unique core id, ranging from
> 1 to 64. The numbers running horizontally at the top of these plots is
> the stage number, as determined by the DAG scheduler.
>
> In the program itself, two block vectors, v_1 and v_2, were created
> and copartitioned, cached, and then added together elementwise through
> a join operation on their block index keys. Stages 0 and 1 correspond
> to the map and count operations to create v_1; stages 2 and 3
> correspond to the same operations on v_2; and stages 6 through 15
> consist of identical count operations to materialize the vector v =
> v_1 + v_2, formed through a join on v_1 and v_2. The vectors v_1 and
> v_2 were initialized by first creating the keys using a
> sc.parallelize{0 to num_blocks - 1} operation, after which the keys
> were partitioned with a HashPartitioner (note that first a dummy map
> {k => (k,k)} on the keys was done so that the HashPartitioner could be
> used; the motivation for this was that, for large block vector RDDs,
> it was be better to hash partition the keys before generating the
> data). The size of the vectors is determined as a multiple of a fixed
> vector block size (size of each sub-block) times the number of
> partitions, which is itself an integer multiple of the number of
> cores. Furthermore, each partition has \gamma blocks. So each
> partition has \gamma blocks; there are \alpha partitions per core, and
> each block has size 2^16.
>
> The first plot, 02_4node_imbalance_spark1.6_run2.pdf, shows a
> representative run of the block vector addition program for \alpha =
> 4, \gamma = 4. A well-balanced partitioning would correspond to 4
> partitions for core, such that each executor is managing 64 tasks.
> However, as you can see in stage 0, this does not occur: there is a
> large imbalance, where cores 46--64 have many more tasks to compute
> than the others.
>
> Observing the order of the task assignment, I believe that what is
> happening here is that, due to the initial random delay of the
> executors in responding/receiving master instructions, the driver is
> assigning more tasks to the executor whose initial wave of tasks
> finishes first. Since there is *no* data locality in stage 0 to factor
> into determining on which nodes the computation should occur, my
> understanding is that the driver will allocate the tasks
> greedily---hence the initial delay is crucial for allocating
> partitions evenly across the nodes. Furthermore, note that stage 2 (an
> identical vector initialization operation to stage 0) is
> well-balanced, since all of the executors completed tasks at
> approximately the same time, and hence without data locality being a
> factor, were assigned new tasks at the same rate. Also, note here that
> the individual task durations are *decreasing markedly* through stages
> 6--15 (again, all of which are identical), but that the stages are
> longer than need be due to the 

Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-21 Thread Michael Armbrust
In Spark 2.1 we've added a from_json

function that I think will do what you want.

On Fri, Nov 18, 2016 at 2:29 AM, kant kodali  wrote:

> This seem to work
>
> import org.apache.spark.sql._
> val rdd = df2.rdd.map { case Row(j: String) => j }
> spark.read.json(rdd).show()
>
> However I wonder if this any inefficiency here ? since I have to apply
> this function for billion rows.
>
>


Re: Stateful aggregations with Structured Streaming

2016-11-21 Thread Michael Armbrust
We are planning on adding mapWithState or something similar in a future
release.  In the mean time, standard Dataframe aggregations should work
(count, sum, etc).  If you are looking to do something custom, I'd suggest
looking at Aggregators

.

On Sat, Nov 19, 2016 at 5:46 AM, Yuval.Itzchakov  wrote:

> I've been using `DStream.mapWithState` and was looking forward to trying
> out
> Structured Streaming. The thing I can't under is, does Structured Streaming
> in it's current state support stateful aggregations?
>
> Looking at the StateStore design document
> (https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wf
> Vp_hDM8ZL254/edit#heading=h.2h7zw4ru3nw7),
> and then doing a bit of digging around in the Spark codebase, I've seen
> `mapPartitionsWithStateStore` as the only viable way of doing something
> with
> a store, but the API requires an `UnsafeRow` for key and value which makes
> we question if this is a real public API one should be using?
>
> Does anyone know what the state of things are currently in regards to an
> equivalent to `mapWithState` in Structured Streaming?
>
> Thanks,
> Yuval.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Stateful-aggregations-with-
> Structured-Streaming-tp28108.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Create a Column expression from a String

2016-11-21 Thread Stuart White
Yes, that's what I was looking for.  Thanks!

On Mon, Nov 21, 2016 at 6:56 PM, Michael Armbrust
 wrote:
> You are looking for org.apache.spark.sql.functions.expr()
>
> On Sat, Nov 19, 2016 at 6:12 PM, Stuart White 
> wrote:
>>
>> I'd like to allow for runtime-configured Column expressions in my
>> Spark SQL application.  For example, if my application needs a 5-digit
>> zip code, but the file I'm processing contains a 9-digit zip code, I'd
>> like to be able to configure my application with the expression
>> "substring('zipCode, 0, 5)" to use for the zip code.
>>
>> So, I think I'm looking for something like this:
>>
>> def parseColumnExpression(colExpr: String) : Column
>>
>> I see that SparkSession's sql() method exists to take a string and
>> parse it into a DataFrame.  But that's not quite what I want.
>>
>> Does a mechanism exist that would allow me to take a string
>> representation of a column expression and parse it into an actual
>> column expression (something that could be use in a .select() call,
>> for example)?
>>
>> Thanks!
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

2016-11-21 Thread Michael Armbrust
You could also do this with Datasets, which will probably be a little more
efficient (since you are telling us you only care about one column)

ds1.select($"value".as[Array[Byte]]).map(Student.parseFrom)

On Thu, Nov 17, 2016 at 1:05 PM, shyla deshpande 
wrote:

> Hello everyone,
>  The following code works ...
>
> def main(args : Array[String]) {
>
>   val spark = SparkSession.builder.
> master("local")
> .appName("spark session example")
> .getOrCreate()
>
>   import spark.implicits._
>
>   val ds1 = spark.readStream.format("kafka").
> option("kafka.bootstrap.servers","localhost:9092").
> option("subscribe","student").load()
>
>   val ds2 = ds1.map(row=> 
> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
>
>   val query = ds2.writeStream
> .outputMode("append")
> .format("console")
> .start()
>
>   query.awaitTermination()
>
> }
>
>
> On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> val spark = SparkSession.builder.
>>   master("local")
>>   .appName("spark session example")
>>   .getOrCreate()
>>
>> import spark.implicits._
>>
>> val dframe1 = spark.readStream.format("kafka").
>>   option("kafka.bootstrap.servers","localhost:9092").
>>   option("subscribe","student").load()
>>
>> *How do I deserialize the value column from dataframe1 *
>>
>> *which is Array[Byte] to Student object using Student.parseFrom..???*
>>
>> *Please help.*
>>
>> *Thanks.*
>>
>>
>>
>> // Stream of votes from Kafka as bytesval votesAsBytes = 
>> KafkaUtils.createDirectStream[String, Array[Byte]](
>>   ssc, LocationStrategies.PreferConsistent,
>>   ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), 
>> kafkaParams))
>> // Parse them into Vote case class.val votes: DStream[Vote] = 
>> votesAsBytes.map {
>>   (cr: ConsumerRecord[String, Array[Byte]]) =>
>> Vote.parseFrom(cr.value())}
>>
>>
>


Re: Create a Column expression from a String

2016-11-21 Thread Michael Armbrust
You are looking for org.apache.spark.sql.functions.expr()

On Sat, Nov 19, 2016 at 6:12 PM, Stuart White 
wrote:

> I'd like to allow for runtime-configured Column expressions in my
> Spark SQL application.  For example, if my application needs a 5-digit
> zip code, but the file I'm processing contains a 9-digit zip code, I'd
> like to be able to configure my application with the expression
> "substring('zipCode, 0, 5)" to use for the zip code.
>
> So, I think I'm looking for something like this:
>
> def parseColumnExpression(colExpr: String) : Column
>
> I see that SparkSession's sql() method exists to take a string and
> parse it into a DataFrame.  But that's not quite what I want.
>
> Does a mechanism exist that would allow me to take a string
> representation of a column expression and parse it into an actual
> column expression (something that could be use in a .select() call,
> for example)?
>
> Thanks!
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: SparkILoop doesn't run

2016-11-21 Thread Jakob Odersky
The issue I was having had to do with missing classpath settings; in
sbt it can be solved by setting `fork:=true` to run tests in new jvms
with appropriate classpaths.

Mohit, from the looks of the error message, it also appears to be some
classpath issue. This typically happens when there are libraries of
multiple scala versions on the same classpath. You mention that it
worked before, can you recall what libraries you upgraded before it
broke?

--Jakob

On Mon, Nov 21, 2016 at 2:34 PM, Jakob Odersky  wrote:
> Trying it out locally gave me an NPE. I'll look into it in more
> detail, however the SparkILoop.run() method is dead code. It's used
> nowhere in spark and can be removed without any issues.
>
> On Thu, Nov 17, 2016 at 11:16 AM, Mohit Jaggi  wrote:
>> Thanks Holden. I did post to the user list but since this is not a common
>> case, I am trying the developer list as well. Yes there is a reason: I get
>> code from somewhere e.g. a notebook. This type of code did work for me
>> before.
>>
>> Mohit Jaggi
>> Founder,
>> Data Orchard LLC
>> www.dataorchardllc.com
>>
>>
>>
>>
>> On Nov 17, 2016, at 8:53 AM, Holden Karau  wrote:
>>
>> Moving to user list
>>
>> So this might be a better question for the user list - but is there a reason
>> you are trying to use the SparkILoop for tests?
>>
>> On Thu, Nov 17, 2016 at 5:47 PM Mohit Jaggi  wrote:
>>>
>>>
>>>
>>> I am trying to use SparkILoop to write some tests(shown below) but the
>>> test hangs with the following stack trace. Any idea what is going on?
>>>
>>>
>>> import org.apache.log4j.{Level, LogManager}
>>> import org.apache.spark.repl.SparkILoop
>>> import org.scalatest.{BeforeAndAfterAll, FunSuite}
>>>
>>> class SparkReplSpec extends FunSuite with BeforeAndAfterAll {
>>>
>>>   override def beforeAll(): Unit = {
>>>   }
>>>
>>>   override def afterAll(): Unit = {
>>>   }
>>>
>>>   test("yay!") {
>>> val rootLogger = LogManager.getRootLogger
>>> val logLevel = rootLogger.getLevel
>>> rootLogger.setLevel(Level.ERROR)
>>>
>>> val output = SparkILoop.run(
>>>   """
>>> |println("hello")
>>>   """.stripMargin)
>>>
>>> println(s" $output ")
>>>
>>>   }
>>> }
>>>
>>>
>>> /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/bin/java
>>> -Dspark.master=local[*] -Didea.launcher.port=7532
>>> "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA CE.app/Contents/bin"
>>> -Dfile.encoding=UTF-8 -classpath "/Users/mohit/Library/Application
>>> 

Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-21 Thread yeshwanth kumar
Thanks for your reply,

i can definitely change the underlying compression format.
but i am trying to understand the Locality Level,
why executor ran on a different node, where the blocks are not present,
when Locality Level is RACK_LOCAL

can you shed some light on this.


Thanks,
Yesh


-Yeshwanth
Can you Imagine what I would do if I could do all I can - Art of War

On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke  wrote:

> Use as a format orc, parquet or avro because they support any compression
> type with parallel processing. Alternatively split your file in several
> smaller ones. Another alternative would be bzip2 (but slower in general) or
> Lzo (usually it is not included by default in many distributions).
>
> On 21 Nov 2016, at 23:17, yeshwanth kumar  wrote:
>
> Hi,
>
> we are running Hive on Spark, we have an external table over snappy
> compressed csv file of size 917.4 M
> HDFS block size is set to 256 MB
>
> as per my Understanding, if i run a query over that external table , it
> should launch 4 tasks. one for each block.
> but i am seeing one executor and one task processing all the file.
>
> trying to understand the reason behind,
>
> i went one step further to understand the block locality
> when i get the block locations for that file, i found
>
> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-
> 48e1-4a8f-be48-b0953fdaad37,DISK],
>  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-
> ce0c-4eb8-8183-8d8ff5f24115,DISK],
>  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-
> b030-43f8-91c9-d8517e68414a,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-
> 4845-b043-8b91ae4017c0,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-
> 489b-8209-4307f3296211,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-
> 45fd-ae0f-cc6eb268b0d2,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-
> 4601-8070-f6c5da840e09,DISK],
> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-
> 494d-87ee-bcfff2182a96,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-
> 48d3-b858-a023b5c44e9c,DISK]
>
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-
> 498c-a487-5ce6aaa66f48,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-
> 4e20-a360-e7cdad5dacc3,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-
> 4c8f-8a13-7be37ce769c9,DISK]]
>
> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task
>
> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
> 10.11.0.228, because these 2 nodes has all the four blocks needed for
> computation
> but the executor is running in 10.11.0.225
>
> my theory is not applying anywhere.
>
> please help me in understanding how spark/yarn calculates number of
> executors/tasks.
>
> Thanks,
> -Yeshwanth
>
>


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-21 Thread Jörn Franke
Use as a format orc, parquet or avro because they support any compression type 
with parallel processing. Alternatively split your file in several smaller 
ones. Another alternative would be bzip2 (but slower in general) or Lzo 
(usually it is not included by default in many distributions).

> On 21 Nov 2016, at 23:17, yeshwanth kumar  wrote:
> 
> Hi,
> 
> we are running Hive on Spark, we have an external table over snappy 
> compressed csv file of size 917.4 M
> HDFS block size is set to 256 MB
> 
> as per my Understanding, if i run a query over that external table , it 
> should launch 4 tasks. one for each block.
> but i am seeing one executor and one task processing all the file.
> 
> trying to understand the reason behind,
> 
> i went one step further to understand the block locality 
> when i get the block locations for that file, i found
> 
> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-4a8f-be48-b0953fdaad37,DISK],
>  
>  
> DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-4eb8-8183-8d8ff5f24115,DISK],
>  
>  
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-43f8-91c9-d8517e68414a,DISK]]
>  
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4845-b043-8b91ae4017c0,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-489b-8209-4307f3296211,DISK],
>  
> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-45fd-ae0f-cc6eb268b0d2,DISK]]
> 
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4601-8070-f6c5da840e09,DISK],
> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-494d-87ee-bcfff2182a96,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-48d3-b858-a023b5c44e9c,DISK]
> 
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-498c-a487-5ce6aaa66f48,DISK],
>  
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4e20-a360-e7cdad5dacc3,DISK],
>  
> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4c8f-8a13-7be37ce769c9,DISK]]
> 
> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task
> 
> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or 
> 10.11.0.228, because these 2 nodes has all the four blocks needed for 
> computation
> but the executor is running in 10.11.0.225
> 
> my theory is not applying anywhere.
> 
> please help me in understanding how spark/yarn calculates number of 
> executors/tasks.
> 
> Thanks,
> -Yeshwanth


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-21 Thread Aniket Bhatnagar
Try changing compression to bzip2 or lzo. For reference -
http://comphadoop.weebly.com

Thanks,
Aniket

On Mon, Nov 21, 2016, 10:18 PM yeshwanth kumar 
wrote:

> Hi,
>
> we are running Hive on Spark, we have an external table over snappy
> compressed csv file of size 917.4 M
> HDFS block size is set to 256 MB
>
> as per my Understanding, if i run a query over that external table , it
> should launch 4 tasks. one for each block.
> but i am seeing one executor and one task processing all the file.
>
> trying to understand the reason behind,
>
> i went one step further to understand the block locality
> when i get the block locations for that file, i found
>
> [DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-bf39d33d-48e1-4a8f-be48-b0953fdaad37,DISK],
>  DatanodeInfoWithStorage[10.11.0.227:50010
> ,DS-a760c1c8-ce0c-4eb8-8183-8d8ff5f24115,DISK],
>  DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-0e5427e2-b030-43f8-91c9-d8517e68414a,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-f50ddf2f-b827-4845-b043-8b91ae4017c0,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-e8c9785f-c352-489b-8209-4307f3296211,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010
> ,DS-6f6a3ffd-334b-45fd-ae0f-cc6eb268b0d2,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-f8bea6a8-a433-4601-8070-f6c5da840e09,DISK],
> DatanodeInfoWithStorage[10.11.0.227:50010
> ,DS-8aa3f249-790e-494d-87ee-bcfff2182a96,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-d06714f4-2fbb-48d3-b858-a023b5c44e9c,DISK]
>
> DatanodeInfoWithStorage[10.11.0.226:50010
> ,DS-b3a00781-c6bd-498c-a487-5ce6aaa66f48,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010
> ,DS-fa5aa339-e266-4e20-a360-e7cdad5dacc3,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010
> ,DS-9d597d3f-cd4f-4c8f-8a13-7be37ce769c9,DISK]]
>
> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task
>
> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
> 10.11.0.228, because these 2 nodes has all the four blocks needed for
> computation
> but the executor is running in 10.11.0.225
>
> my theory is not applying anywhere.
>
> please help me in understanding how spark/yarn calculates number of
> executors/tasks.
>
> Thanks,
> -Yeshwanth
>


Re: sort descending with multiple columns

2016-11-21 Thread Sreekanth Jella
Yes, thank you.

Thanks,
Sreekanth,
+1 (571) 376-0714

On Nov 18, 2016 6:33 AM, "Stuart White"  wrote:

> Is this what you're looking for?
>
> val df = Seq(
>   (1, "A"),
>   (1, "B"),
>   (1, "C"),
>   (2, "D"),
>   (3, "E")
> ).toDF("foo", "bar")
>
> val colList = Seq("foo", "bar")
> df.sort(colList.map(col(_).desc): _*).show
>
> +---+---+
> |foo|bar|
> +---+---+
> |  3|  E|
> |  2|  D|
> |  1|  C|
> |  1|  B|
> |  1|  A|
> +---+---+
>
> On Fri, Nov 18, 2016 at 1:15 AM, Sreekanth Jella 
> wrote:
> > Hi,
> >
> > I'm trying to sort multiple columns and column names are dynamic.
> >
> > df.sort(colList.head, colList.tail: _*)
> >
> >
> > But I'm not sure how to sort in descending order for all columns, I tried
> > this but it's for only first column..
> >
> > df.sort(df.col(colList.head).desc)
> > How can I pass all column names (or some) with descending order.
> >
> >
> > Thanks,
> > Sreekanth
>


Re: SparkILoop doesn't run

2016-11-21 Thread Jakob Odersky
Trying it out locally gave me an NPE. I'll look into it in more
detail, however the SparkILoop.run() method is dead code. It's used
nowhere in spark and can be removed without any issues.

On Thu, Nov 17, 2016 at 11:16 AM, Mohit Jaggi  wrote:
> Thanks Holden. I did post to the user list but since this is not a common
> case, I am trying the developer list as well. Yes there is a reason: I get
> code from somewhere e.g. a notebook. This type of code did work for me
> before.
>
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com
>
>
>
>
> On Nov 17, 2016, at 8:53 AM, Holden Karau  wrote:
>
> Moving to user list
>
> So this might be a better question for the user list - but is there a reason
> you are trying to use the SparkILoop for tests?
>
> On Thu, Nov 17, 2016 at 5:47 PM Mohit Jaggi  wrote:
>>
>>
>>
>> I am trying to use SparkILoop to write some tests(shown below) but the
>> test hangs with the following stack trace. Any idea what is going on?
>>
>>
>> import org.apache.log4j.{Level, LogManager}
>> import org.apache.spark.repl.SparkILoop
>> import org.scalatest.{BeforeAndAfterAll, FunSuite}
>>
>> class SparkReplSpec extends FunSuite with BeforeAndAfterAll {
>>
>>   override def beforeAll(): Unit = {
>>   }
>>
>>   override def afterAll(): Unit = {
>>   }
>>
>>   test("yay!") {
>> val rootLogger = LogManager.getRootLogger
>> val logLevel = rootLogger.getLevel
>> rootLogger.setLevel(Level.ERROR)
>>
>> val output = SparkILoop.run(
>>   """
>> |println("hello")
>>   """.stripMargin)
>>
>> println(s" $output ")
>>
>>   }
>> }
>>
>>
>> /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/bin/java
>> -Dspark.master=local[*] -Didea.launcher.port=7532
>> "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA CE.app/Contents/bin"
>> -Dfile.encoding=UTF-8 -classpath "/Users/mohit/Library/Application
>> 

RDD Partitions on HDFS file in Hive on Spark Query

2016-11-21 Thread yeshwanth kumar
Hi,

we are running Hive on Spark, we have an external table over snappy
compressed csv file of size 917.4 M
HDFS block size is set to 256 MB

as per my Understanding, if i run a query over that external table , it
should launch 4 tasks. one for each block.
but i am seeing one executor and one task processing all the file.

trying to understand the reason behind,

i went one step further to understand the block locality
when i get the block locations for that file, i found

[DatanodeInfoWithStorage[10.11.0.226:50010
,DS-bf39d33d-48e1-4a8f-be48-b0953fdaad37,DISK],
 DatanodeInfoWithStorage[10.11.0.227:50010
,DS-a760c1c8-ce0c-4eb8-8183-8d8ff5f24115,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010
,DS-0e5427e2-b030-43f8-91c9-d8517e68414a,DISK]]

DatanodeInfoWithStorage[10.11.0.226:50010
,DS-f50ddf2f-b827-4845-b043-8b91ae4017c0,DISK],
DatanodeInfoWithStorage[10.11.0.228:50010
,DS-e8c9785f-c352-489b-8209-4307f3296211,DISK],
DatanodeInfoWithStorage[10.11.0.225:50010
,DS-6f6a3ffd-334b-45fd-ae0f-cc6eb268b0d2,DISK]]

DatanodeInfoWithStorage[10.11.0.226:50010
,DS-f8bea6a8-a433-4601-8070-f6c5da840e09,DISK],
DatanodeInfoWithStorage[10.11.0.227:50010
,DS-8aa3f249-790e-494d-87ee-bcfff2182a96,DISK],
DatanodeInfoWithStorage[10.11.0.228:50010
,DS-d06714f4-2fbb-48d3-b858-a023b5c44e9c,DISK]

DatanodeInfoWithStorage[10.11.0.226:50010
,DS-b3a00781-c6bd-498c-a487-5ce6aaa66f48,DISK],
DatanodeInfoWithStorage[10.11.0.228:50010
,DS-fa5aa339-e266-4e20-a360-e7cdad5dacc3,DISK],
DatanodeInfoWithStorage[10.11.0.225:50010
,DS-9d597d3f-cd4f-4c8f-8a13-7be37ce769c9,DISK]]

and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task

if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
10.11.0.228, because these 2 nodes has all the four blocks needed for
computation
but the executor is running in 10.11.0.225

my theory is not applying anywhere.

please help me in understanding how spark/yarn calculates number of
executors/tasks.

Thanks,
-Yeshwanth


Potential memory leak in yarn ApplicationMaster

2016-11-21 Thread Spark User
Hi All,

It seems like the heap usage for
org.apache.spark.deploy.yarn.ApplicationMaster keeps growing continuously.
The driver crashes with OOM eventually.

More details:
I have a spark streaming app that runs on spark-2.0. The
spark.driver.memory is 10G and spark.yarn.driver.memoryOverhead is 2048.
Looking at driver heap dumps taken every 30 mins, the heap usage for
org.apache.spark.deploy.yarn.ApplicationMaster grows by 100MB every 30 mins.

Also, I suspect it may be caused because I had set below to true (which is
by default true I think)
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.shuffle.service.enabled=true \

I am trying out by setting them to false now to check if the heap usage for
ApplicationMaster stops increasing.

By investigating the heap dump and looking at the code for
ApplicationMaster it seems like the heap usage is growing because of
releasedExecutorLossReasons HashMap in
https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala#L124

Has anyone else seen this issue before?

Thanks,
Bharath


Re: How to write a custom file system?

2016-11-21 Thread Samy Dindane

We don't use HDFS but GlusterFS which works like your typical local POSIX file 
system.

On 11/21/2016 06:49 PM, Jörn Franke wrote:

Once you configured a custom file system in Hadoop it can be used by Spark out 
of the box. Depending what you implement in the custom file system you may 
think about side effects to any application including spark (memory consumption 
etc).


On 21 Nov 2016, at 18:26, Samy Dindane  wrote:

Hi,

I'd like to extend the file:// file system and add some custom logic to the API 
that lists files.
I think I need to extend FileSystem or LocalFileSystem from 
org.apache.hadoop.fs, but I am not sure how to go about it exactly.

How to write a custom file system and make it usable by Spark?

Thank you,

Samy

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Starting a new Spark codebase, Python or Scala / Java?

2016-11-21 Thread Anthony May
A sensible default strategy is to use the same language in which a system
was developed or a highly compatible language. That would be Scala for
Spark, however I assume you don't currently know Scala to the same degree
as Python or at all. In which case to help you make the decision you should
also consider your own personal/team productivity and project constraints.
If you have time and/or require the bleeding edge features and performance
then learning/strengthening in Scala is worth it and you should use the
Scala API.
If you're already very productive in Python and have tighter time
constraints and don't need the bleeding edge features and maximum
performance isn't a high priority then I'd recommend using the Python API.

On Mon, 21 Nov 2016 at 11:58 Jon Gregg  wrote:

> Spark is written in Scala, so yes it's still the strongest option.  You
> also get the Dataset type with Scala (compile time type-safety), and that's
> not an available feature with Python.
>
> That said, I think the Python API is a viable candidate if you use Pandas
> for Data Science.  There are similarities between the DataFrame and Pandas
> APIs, and you can convert a Spark DataFrame to a Pandas DataFrame.
>
> On Mon, Nov 21, 2016 at 1:51 PM, Brandon White 
> wrote:
>
> Hello all,
>
> I will be starting a new Spark codebase and I would like to get opinions
> on using Python over Scala. Historically, the Scala API has always been the
> strongest interface to Spark. Is this still true? Are there still many
> benefits and additional features in the Scala API that are not available in
> the Python API? Are there any performance concerns using the Python API
> that do not exist when using the Scala API? Anything else I should know
> about?
>
> I appreciate any insight you have on using the Scala API over the Python
> API.
>
> Brandon
>
>
>


Cluster deploy mode driver location

2016-11-21 Thread Saif.A.Ellafi
Hello there,

I have a Spark program in 1.6.1, however, when I submit it to cluster, it 
randomly picks the driver.

I know there is a driver specification option, but along with it it is 
mandatory to define many other options I am not familiar with. The trouble is, 
the .jars I am launching need to be available at the driver host, and I would 
like to have this jars in just a specific host, which I like it to be the 
driver.

Any help?

Thanks!
Saif



Re: Starting a new Spark codebase, Python or Scala / Java?

2016-11-21 Thread Jon Gregg
Spark is written in Scala, so yes it's still the strongest option.  You
also get the Dataset type with Scala (compile time type-safety), and that's
not an available feature with Python.

That said, I think the Python API is a viable candidate if you use Pandas
for Data Science.  There are similarities between the DataFrame and Pandas
APIs, and you can convert a Spark DataFrame to a Pandas DataFrame.

On Mon, Nov 21, 2016 at 1:51 PM, Brandon White 
wrote:

> Hello all,
>
> I will be starting a new Spark codebase and I would like to get opinions
> on using Python over Scala. Historically, the Scala API has always been the
> strongest interface to Spark. Is this still true? Are there still many
> benefits and additional features in the Scala API that are not available in
> the Python API? Are there any performance concerns using the Python API
> that do not exist when using the Scala API? Anything else I should know
> about?
>
> I appreciate any insight you have on using the Scala API over the Python
> API.
>
> Brandon
>


Starting a new Spark codebase, Python or Scala / Java?

2016-11-21 Thread Brandon White
Hello all,

I will be starting a new Spark codebase and I would like to get opinions on
using Python over Scala. Historically, the Scala API has always been the
strongest interface to Spark. Is this still true? Are there still many
benefits and additional features in the Scala API that are not available in
the Python API? Are there any performance concerns using the Python API
that do not exist when using the Scala API? Anything else I should know
about?

I appreciate any insight you have on using the Scala API over the Python
API.

Brandon


Re: How to write a custom file system?

2016-11-21 Thread Jörn Franke
Once you configured a custom file system in Hadoop it can be used by Spark out 
of the box. Depending what you implement in the custom file system you may 
think about side effects to any application including spark (memory consumption 
etc).

> On 21 Nov 2016, at 18:26, Samy Dindane  wrote:
> 
> Hi,
> 
> I'd like to extend the file:// file system and add some custom logic to the 
> API that lists files.
> I think I need to extend FileSystem or LocalFileSystem from 
> org.apache.hadoop.fs, but I am not sure how to go about it exactly.
> 
> How to write a custom file system and make it usable by Spark?
> 
> Thank you,
> 
> Samy
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to write a custom file system?

2016-11-21 Thread Samy Dindane

Hi,

I'd like to extend the file:// file system and add some custom logic to the API 
that lists files.
I think I need to extend FileSystem or LocalFileSystem from 
org.apache.hadoop.fs, but I am not sure how to go about it exactly.

How to write a custom file system and make it usable by Spark?

Thank you,

Samy

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Pasting into spark-shell doesn't work for Databricks example

2016-11-21 Thread jggg777
I'm simply pasting in the UDAF example from this page and getting errors
(basic EMR setup with Spark 2.0):
https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/03%20UDF%20and%20UDAF%20-%20scala.html

The imports appear to work, but then I see errors like "not found: type
UserDefinedAggregateFunction".  

If I run ":paste" and paste it in that way it does work, but I'm interested
in knowing why Ctrl-V doesn't.  What is happening under the hood which makes
it seem like the imports are working even though they aren't?  And is there
a way to fix this in general?

>>>
scala> import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.MutableAggregationBuffer

scala> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala>

scala> class GeometricMean extends UserDefinedAggregateFunction {
 |   // This is the input fields for your aggregate function.
 |   override def inputSchema: org.apache.spark.sql.types.StructType =
 | StructType(StructField("value", DoubleType) :: Nil)
 |
 |   // This is the internal fields you keep for computing your
aggregate.
 |   override def bufferSchema: StructType = StructType(
 | StructField("count", LongType) ::
 | StructField("product", DoubleType) :: Nil
 |   )
 |
 |   // This is the output type of your aggregatation function.
 |   override def dataType: DataType = DoubleType
 |
 |   override def deterministic: Boolean = true
 |
 |   // This is the initial value for your buffer schema.
 |   override def initialize(buffer: MutableAggregationBuffer): Unit = {
 | buffer(0) = 0L
 | buffer(1) = 1.0
 |   }
 |
 |   // This is how to update your buffer schema given an input.
 |   override def update(buffer: MutableAggregationBuffer, input: Row):
Unit = {
 | buffer(0) = buffer.getAs[Long](0) + 1
 | buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
 |   }
 |
 |   // This is how to merge two objects with the bufferSchema type.
 |   override def merge(buffer1: MutableAggregationBuffer, buffer2:
Row): Unit = {
 | buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
 | buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
 |   }
 |
 |   // This is where you output the final value, given the final value
of your bufferSchema.
 |   override def evaluate(buffer: Row): Any = {
 | math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0))
 |   }
 | }
:11: error: not found: type UserDefinedAggregateFunction
   class GeometricMean extends UserDefinedAggregateFunction {
   ^
:14: error: not found: value StructType
   StructType(StructField("value", DoubleType) :: Nil)
   ^
:14: error: not found: value StructField
   StructType(StructField("value", DoubleType) :: Nil)
  ^
:14: error: not found: value DoubleType
   StructType(StructField("value", DoubleType) :: Nil)
   ^
:17: error: not found: type StructType
 override def bufferSchema: StructType = StructType(
^
:17: error: not found: value StructType
 override def bufferSchema: StructType = StructType(
 ^
:18: error: not found: value StructField
   StructField("count", LongType) ::
   ^
:18: error: not found: value LongType
   StructField("count", LongType) ::
^
:19: error: not found: value StructField
   StructField("product", DoubleType) :: Nil
   ^
:19: error: not found: value DoubleType
   StructField("product", DoubleType) :: Nil
  ^
:23: error: not found: type DataType
 override def dataType: DataType = DoubleType
^
:23: error: not found: value DoubleType
 override def dataType: DataType = DoubleType
   ^
:28: error: not found: type MutableAggregationBuffer
 override def initialize(buffer: MutableAggregationBuffer): Unit = {
 ^
:34: error: not found: type MutableAggregationBuffer
 override def update(buffer: MutableAggregationBuffer, input: Row):
Unit = {
 ^
:34: error: not found: type Row
 override def update(buffer: MutableAggregationBuffer, input: Row):
Unit = {
  ^
:40: error: not found: type 

Re: Linear regression + Janino Exception

2016-11-21 Thread Kazuaki Ishizaki
Thank you for reporting the error.
I think that this is associated to 
https://issues.apache.org/jira/browse/SPARK-18492

The reporter of this JIRA entry has not posted the program yet. Would it 
be possible to add your program that can reproduce this issue to this JIRA 
entry?

Regards,
Kazuaki Ishizaki



From:   janardhan shetty 
To: user 
Date:   2016/11/21 12:01
Subject:Re: Linear regression + Janino Exception



Seems like this is associated to :
https://issues.apache.org/jira/browse/SPARK-16845

On Sun, Nov 20, 2016 at 6:09 PM, janardhan shetty  
wrote:
Hi,

I am trying to execute Linear regression algorithm for Spark 2.02 and 
hitting the below error when I am fitting my training set:

val lrModel = lr.fit(train)

It happened on 2.0.0 as well. Any resolution steps is appreciated.

Error Snippet: 
16/11/20 18:03:45 ERROR CodeGenerator: failed to compile: 
org.codehaus.janino.JaninoRuntimeException: Code of method 
"(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V"
 
of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
 
grows beyond 64 KB
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
scalaUDF;
/* 009 */   private scala.Function1 catalystConverter;
/* 010 */   private scala.Function1 converter;
/* 011 */   private scala.Function1 udf;
/* 012 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
scalaUDF1;
/* 013 */   private scala.Function1 catalystConverter1;
/* 014 */   private scala.Function1 converter1;
/* 015 */   private scala.Function1 udf1;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
scalaUDF2;
/* 017 */   private scala.Function1 catalystConverter2;







Re: using StreamingKMeans

2016-11-21 Thread Julian Keppel
I do research in anomaly detection with methods of machine learning at the
moment. And currently I do kmeans clustering, too in an offline learning
setting. In further work we want to compare the two paradigms of offline
and online learning. I would like to share some thoughts on this
disscussion.

My offline setting is exactly what Guha Ayan explained: We collect data for
training and test over few days/weeks/month and train the model
periodically, lets say once a week for example. Please note that kmeans is
unsupervised, so it doesn't have any idea of what you data is about and
what could be "normal" or an anomaly. So in my opinion the training dataset
has to represent a state in which everything occors, the normal datapoints
and also as any anomalies. Referring to "Anomaly Detection: A Survey" from
Varun Chandola et. al. from 2009 there are different methods of
interpreting the results than. As an example: "Normal data instances belong
to large and dense clusters, while anomalies either belong to small or
sparse clusters". So potential anomalies have to be present in you
trainingdataset, I think.

The online learning setting is meant to adapt rapid changes in you
environment. So for example, if you are analyzing network traffinc, and you
add a new service which produces a lot of traffic (a lot of users use the
new service), than in an offline setting where you learn just once a week,
your new service may produce a false alarm whereas the online model would
adapt these changes (depending on the configured forgetfullness). There are
use cases where you have a very dynamic environment (for example flight
ticket prices), where you need to adapt you model rapidly (see for example
here: https://youtu.be/wyfTjd9z1sY).

2016-11-20 2:11 GMT+01:00 Debasish Ghosh :

> I share both the concerns that u have expressed. And as I mentioned in my
> earlier mail, offline (batch) training is an option if I get a dataset
> without outliers. In that case I can train and have a model. I find the
> model parameters, which will be the mean distance to the centroid. Note in
> training I will have only 1 cluster as it's only normal data (no outlier).
>
> I can now pass these parameters to the prediction phase which can work on
> streaming data. In the prediction phase I just compute the distance to
> centroid for each point and flag the violating ones as outliers.
>
> This looks like a perfectly valid option if I get a dataset with no
> outliers to train on.
>
> Now my question is what then is the use case in which we can use
> StreamingKMeans ? In the above scenario we use batch KMeans in training
> phase while we just compute the distance in the prediction phase. And how
> do we address the scenario where we have only one stream of data available ?
>
> regards.
>
> On Sun, 20 Nov 2016 at 6:07 AM, ayan guha  wrote:
>
>> Here are 2 concerns I would have with the design (This discussion is
>> mostly to validate my own understanding)
>>
>> 1. if you have outliers "before" running k-means, aren't your centroids
>> get skewed? In other word, outliers by themselves may bias the cluster
>> evaluation, isn't it?
>> 2. Typically microbatches are small, like 3 sec in your case. in this
>> window you may not have enough data to run any statistically sigficant
>> operation, can you?
>>
>> My approach would have been: Run K-means on data without outliers (in
>> batch mode). Determine the model, ie centroids in case of kmeans. Then load
>> the model in your streaming app and just apply "outlier detection"
>> function, which takes the form of
>>
>> def detectOutlier(model,data):
>>   /// your code, like mean distance etc
>>   return T or F
>>
>> In response to your point about "alternet set of data", I would assume
>> you would accumulate the data you are receiving from streaming over few
>> weeks or months before running offline training.
>>
>> Am I missing something?
>>
>> On Sun, Nov 20, 2016 at 10:29 AM, Debasish Ghosh <
>> ghosh.debas...@gmail.com> wrote:
>>
>> Looking for alternative suggestions in case where we have 1 continuous
>> stream of data. Offline training and online prediction can be one option if
>> we can have an alternate set of data to train. But if it's one single
>> stream you don't have separate sets for training or cross validation.
>>
>> So whatever data u get in each micro batch, train on them and u get the
>> cluster centroids from the model. Then apply some heuristics like mean
>> distance from centroid and detect outliers. So for every microbatch u get
>> the outliers based on the model and u can control forgetfulness of the
>> model through the decay factor that u specify for StramingKMeans.
>>
>> Suggestions ?
>>
>> regards.
>>
>> On Sun, 20 Nov 2016 at 3:51 AM, ayan guha  wrote:
>>
>> Curious why do you want to train your models every 3 secs?
>> On 20 Nov 2016 06:25, "Debasish Ghosh"  wrote:
>>
>> Thanks a lot 

Re: Flume integration

2016-11-21 Thread Ian Brooks
Hi Mich,

Thanks. I would prefer not to add another system into the mix as we currently 
don't use kafka at all. We are still in the prototype phase at the moment and 
it seems to be working well though it doesn't like you restrating the flume 
sink part without restarting the SPARK application. That is something we should 
be able to manage though.



*-Ian *


Hi Ian,


Flume is great for ingesting data into HDFS and Hbase. However, that is part of 
batch layer.


For real time processing, I would go through Kafka into spark streaming. Except 
your case, I have not established if anyone else does Flume directly into Spark?


If so how mature is it.


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[1]/
 
  
http://talebzadehmich.wordpress.com[2]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
  


On 21 November 2016 at 10:27, Ian Brooks  wrote:




*-Ian*


Hi
While I am following this discussion with interest, I am trying to comprehend 
any architectural benefit of a spark sink.
Is there any feature in flume makes it more suitable to ingest stream data than 
sppark streaming, so that we should chain them? For example does it help 
durability or reliability of the source?
Or, it is a more tactical choice based on connector availability or such?
To me, flume is important component to ingest streams to hdfs or hive directly 
ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:


Hi Ian,


Has this been resolved?


How about data to Flume and then Kafka and Kafka streaming into Spark?


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[1]/
 
  
http://talebzadehmich.wordpress.com[2]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
  


On 13 July 2016 at 11:13, Ian Brooks  wrote:


Hi,
 
I'm currently trying to implement a prototype Spark application that gets data 
from Flume and processes it. I'm using the pull based method mentioned in 
https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html[5] 
 
The is initially working fine for getting data from Flume, however the Spark 
client doesn't appear to be letting Flume know that the data has been received, 
so Flume doesn't remove it from the batch. 
 
After 100 requests Flume stops allowing any new data and logs
 
08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] 
(org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error 
while processing transaction. 

 
My code to pull the data from Flume is
 
SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
Duration batchInterval = new Duration(1);
final String checkpointDir = "/tmp/";
 
final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
ssc.checkpoint(checkpointDir);
JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, host, port);
 
// Transform each flume avro event to a process-able format
JavaDStream transformedEvents = flumeStream.map(new 
Function() {
 
@Override
public String call(SparkFlumeEvent flumeEvent) throws Exception {
String flumeEventStr = flumeEvent.event().toString();
avroData avroData = new avroData();
Gson gson = new GsonBuilder().create();
avroData = gson.fromJson(flumeEventStr, avroData.class); 
HashMap body = avroData.getBody();
String data = body.get("bytes");
return data;
}
});
 
...
 
ssc.start();
ssc.awaitTermination();
ssc.close();
}
 
Is there something specific I should be doing to let the Flume server know the 
batch has been received and processed?


*Ian Brooks*
 




*Ian Brooks*
Lead Cloud Systems Engineer
 
Mobile: +44 7900987187[6]
UK Office: +44 131 629 5155[7]
US Office: +1 650 943 2403[8]
Skype: ijbrooks
 
E-mail: _i.brooks@sensewhere.com_ 
Web: www.sensewhere.com[9] 
 
*sensewhere Ltd*. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA. 
Company Number: SC357036
*sensewhere USA* 800 West El Camino Real, Suite 180, Mountain View, California, 
94040
*sensewhere China* Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue, Nanshan 
District, Shenzhen 51806
 
  




*Ian Brooks*
Lead Cloud Systems Engineer

Mobile: +44 7900987187
UK Office: +44 131 629 5155
US 

Re: Flume integration

2016-11-21 Thread Mich Talebzadeh
Hi Ian,

Flume is great for ingesting data into HDFS and Hbase. However, that is
part of batch layer.

For real time processing, I would go through Kafka into spark streaming.
Except your case, I have not established if anyone else does Flume directly
into Spark?

If so how mature is it.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 21 November 2016 at 10:27, Ian Brooks  wrote:

>
> We use Flume already as our way of getting data from our application in to
> HDFS and HBase, we have some new work we are looking at that requires
> realtime processing on data that we don't need to store, so It fits into
> our existing platform easier just to pass the data through Flume like
> everything else and just route this data to SPARK.
>
> -Ian
>
>
>
>
> On Monday 21 November 2016 07:59:42 ayan guha wrote:
>
> Hi
>
> While I am following this discussion with interest, I am trying to
> comprehend any architectural benefit of a spark sink.
>
> Is there any feature in flume makes it more suitable to ingest stream data
> than sppark streaming, so that we should chain them? For example does it
> help durability or reliability of the source?
>
> Or, it is a more tactical choice based on connector availability or such?
>
> To me, flume is important component to ingest streams to hdfs or hive
> directly ie it plays on the batch side of lambda architecture pattern.
>
> On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:
>
> Hi Ian,
>
>
> Has this been resolved?
>
>
> How about data to Flume and then Kafka and Kafka streaming into Spark?
>
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On 13 July 2016 at 11:13, Ian Brooks  wrote:
>
> Hi,
>
>
>
> I'm currently trying to implement a prototype Spark application that gets
> data from Flume and processes it. I'm using the pull based method mentioned
> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>
>
>
> The is initially working fine for getting data from Flume, however the
> Spark client doesn't appear to be letting Flume know that the data has been
> received, so Flume doesn't remove it from the batch.
>
>
>
> After 100 requests Flume stops allowing any new data and logs
>
>
>
> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
> Error while processing transaction.
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
> MemoryChannel.java:96)
>
>
>
> My code to pull the data from Flume is
>
>
>
> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> Duration batchInterval = new Duration(1);
>
> final String checkpointDir = "/tmp/";
>
>
>
> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
> ssc.checkpoint(checkpointDir);
>
> JavaReceiverInputDStream flumeStream = 
> FlumeUtils.createPollingStream(ssc,
> host, port);
>
>
>
> // Transform each flume avro event to a process-able format
>
> JavaDStream transformedEvents = flumeStream.map(new
> Function() {
>
>
>
> @Override
>
> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> String flumeEventStr = flumeEvent.event().toString();
>
> avroData avroData = new avroData();
>
> Gson gson = new GsonBuilder().create();
>
> avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> HashMap body = avroData.getBody();
>
> String data = body.get("bytes");
>
> return data;
>
> }
>
> });
>
>
>
> ...
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> ssc.close();
>
> }
>
>
>
> Is there something specific I should be doing to let the Flume server know
> the batch has been received and processed?
>
>
> --
>
> Ian Brooks
>
>
>
>
>
>
>
> --
>
> 

Re: Flume integration

2016-11-21 Thread Ian Brooks

*-Ian*


Hi
While I am following this discussion with interest, I am trying to comprehend 
any architectural benefit of a spark sink.
Is there any feature in flume makes it more suitable to ingest stream data than 
sppark streaming, so that we should chain them? For example does it help 
durability or reliability of the source?
Or, it is a more tactical choice based on connector availability or such?
To me, flume is important component to ingest streams to hdfs or hive directly 
ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:


Hi Ian,


Has this been resolved?


How about data to Flume and then Kafka and Kafka streaming into Spark?


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[2]/
 
  
http://talebzadehmich.wordpress.com[3]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
  


On 13 July 2016 at 11:13, Ian Brooks  wrote:


Hi,
 
I'm currently trying to implement a prototype Spark application that gets data 
from Flume and processes it. I'm using the pull based method mentioned in 
https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html[5] 
 
The is initially working fine for getting data from Flume, however the Spark 
client doesn't appear to be letting Flume know that the data has been received, 
so Flume doesn't remove it from the batch. 
 
After 100 requests Flume stops allowing any new data and logs
 
08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] 
(org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error 
while processing transaction. 

 
My code to pull the data from Flume is
 
SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
Duration batchInterval = new Duration(1);
final String checkpointDir = "/tmp/";
 
final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
ssc.checkpoint(checkpointDir);
JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, host, port);
 
// Transform each flume avro event to a process-able format
JavaDStream transformedEvents = flumeStream.map(new 
Function() {
 
@Override
public String call(SparkFlumeEvent flumeEvent) throws Exception {
String flumeEventStr = flumeEvent.event().toString();
avroData avroData = new avroData();
Gson gson = new GsonBuilder().create();
avroData = gson.fromJson(flumeEventStr, avroData.class); 
HashMap body = avroData.getBody();
String data = body.get("bytes");
return data;
}
});
 
...
 
ssc.start();
ssc.awaitTermination();
ssc.close();
}
 
Is there something specific I should be doing to let the Flume server know the 
batch has been received and processed?


*Ian Brooks*
 




*Ian Brooks*
Lead Cloud Systems Engineer

Mobile: +44 7900987187
UK Office: +44 131 629 5155
US Office: +1 650 943 2403
Skype: ijbrooks

E-mail: i.bro...@sensewhere.com[6] 
Web: www.sensewhere.com[7] 

*sensewhere Ltd*. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA.
Company Number: SC357036
*sensewhere USA* 800 West El Camino Real, Suite 180, Mountain View, California, 
94040
*sensewhere China* Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue, Nanshan 
District, Shenzhen 51806

  


[1] mailto:mich.talebza...@gmail.com
[2] 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
[3] http://talebzadehmich.wordpress.com
[4] mailto:i.bro...@sensewhere.com
[5] https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
[6] mailt:i.bro...@sensewhere.com
[7] http://www.sensewhere.com/


Will different receivers run on different worker?

2016-11-21 Thread Cyanny LIANG
Hi, I am new to Spark Streaming.
In our project we want to implement a custom receiver to subscribe our log
data.
I have two questions:

1. Do Muti DStream Receivers run in different process or different threads?
2. Union muti DStream, such as 10 DStream, we observed that spark will
create 10 jobs. how many receivers will be start on a worker? We find that,
if the application got 5 executors, 10 receivers will be started on these
executors by random. Is that right?


-- 
Cyanny LIANG