Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
There're some skew.

6461640SUCCESSPROCESS_LOCAL200 / 2015/03/04 23:45:471.1 min6 s198.6 MB21.1
GB240.8 MB5961590SUCCESSPROCESS_LOCAL30 / 2015/03/04 23:45:4744 s5 s200.7
MB4.8 GB154.0 MB
But I expect this kind of skewness to be quite common.

Jianshi


On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang 
wrote:

> I see. I'm using core's join. The data might have some skewness
> (checking).
>
> I understand shuffle can spill data to disk but when consuming it, say in
> cogroup or groupByKey, it still needs to read the whole group elements,
> right? I guess OOM happened there when reading very large groups.
>
> Jianshi
>
> On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai 
> wrote:
>
>>  I think what you could do is to monitor through web UI to see if
>> there’s any skew or other symptoms in shuffle write and read. For GC you
>> could use the below configuration as you mentioned.
>>
>>
>>
>> From Spark core side, all the shuffle related operations can spill the
>> data into disk and no need to read the whole partition into memory. But if
>> you uses SparkSQL, it depends on how SparkSQL uses this operators.
>>
>>
>>
>> CC @hao if he has some thoughts on it.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Thursday, March 5, 2015 3:28 PM
>> *To:* Shao, Saisai
>>
>> *Cc:* user
>> *Subject:* Re: Having lots of FetchFailedException in join
>>
>>
>>
>> Hi Saisai,
>>
>>
>>
>> What's your suggested settings on monitoring shuffle? I've
>> enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.
>>
>>
>>
>> I found SPARK-3461 (Support external groupByKey using
>> repartitionAndSortWithinPartitions) want to make groupByKey using external
>> storage. It's still open status. Does that mean now
>> groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
>> the group as a whole during consuming?
>>
>>
>>
>> How can I deal with the key skewness in joins? Is there a skew-join
>> implementation?
>>
>>
>>
>>
>>
>> Jianshi
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
>> wrote:
>>
>>  Hi Jianshi,
>>
>>
>>
>> From my understanding, it may not be the problem of NIO or Netty, looking
>> at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
>> theoretically EAOM can spill the data into disk if memory is not enough,
>> but there might some issues when join key is skewed or key number is
>> smaller, so you will meet OOM.
>>
>>
>>
>> Maybe you could monitor each stage or task’s shuffle and GC status also
>> system status to identify the problem.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Thursday, March 5, 2015 2:32 PM
>> *To:* Aaron Davidson
>> *Cc:* user
>> *Subject:* Re: Having lots of FetchFailedException in join
>>
>>
>>
>> One really interesting is that when I'm using the
>> netty-based spark.shuffle.blockTransferService, there's no OOM error
>> messages (java.lang.OutOfMemoryError: Java heap space).
>>
>>
>>
>> Any idea why it's not here?
>>
>>
>>
>> I'm using Spark 1.2.1.
>>
>>
>>
>> Jianshi
>>
>>
>>
>> On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
>> wrote:
>>
>>  I changed spark.shuffle.blockTransferService to nio and now I'm getting
>> OOM errors, I'm doing a big join operation.
>>
>>
>>
>>
>>
>> 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
>> (TID 6207)
>>
>> java.lang.OutOfMemoryError: Java heap space
>>
>> at
>> org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
>>
>> at
>> org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
>>
>> at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
>>
>> at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
>>
>> at
>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
>>
>> at
>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
>>
>> at
>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
>>
>> at
>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>>
>> at
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
>>
>> at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
>>
>> at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>>
>> at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>
>> at
>> scala.collect

RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
Yes, if one key has too many values, there still has a chance to meet the OOM.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:49 PM
To: Shao, Saisai
Cc: Cheng, Hao; user
Subject: Re: Having lots of FetchFailedException in join

I see. I'm using core's join. The data might have some skewness (checking).

I understand shuffle can spill data to disk but when consuming it, say in 
cogroup or groupByKey, it still needs to read the whole group elements, right? 
I guess OOM happened there when reading very large groups.

Jianshi

On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai 
mailto:saisai.s...@intel.com>> wrote:
I think what you could do is to monitor through web UI to see if there’s any 
skew or other symptoms in shuffle write and read. For GC you could use the 
below configuration as you mentioned.

From Spark core side, all the shuffle related operations can spill the data 
into disk and no need to read the whole partition into memory. But if you uses 
SparkSQL, it depends on how SparkSQL uses this operators.

CC @hao if he has some thoughts on it.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:28 PM
To: Shao, Saisai

Cc: user
Subject: Re: Having lots of FetchFailedException in join

Hi Saisai,

What's your suggested settings on monitoring shuffle? I've enabled 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using 
repartitionAndSortWithinPartitions) want to make groupByKey using external 
storage. It's still open status. Does that mean now 
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the 
group as a whole during consuming?

How can I deal with the key skewness in joins? Is there a skew-join 
implementation?


Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
mailto:saisai.s...@intel.com>> wrote:
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
mailto:jianshi.hu...@gmail.com>> wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apac

using log4j2 with spark

2015-03-04 Thread Lior Chaga
Hi,
Trying to run spark 1.2.1 w/ hadoop 1.0.4 on cluster and configure it to
run with log4j2.
Problem is that spark-assembly.jar contains log4j and slf4j classes
compatible with log4j 1.2 in it, and so it detects it should use log4j 1.2 (
https://github.com/apache/spark/blob/54e7b456dd56c9e52132154e699abca87563465b/core/src/main/scala/org/apache/spark/Logging.scala
on line 121).

Is there a maven profile for building spark-assembly w/out the log4j
dependencies, or any other way I can force spark to use log4j2?

Thanks!
Lior


Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
I see. I'm using core's join. The data might have some skewness (checking).

I understand shuffle can spill data to disk but when consuming it, say in
cogroup or groupByKey, it still needs to read the whole group elements,
right? I guess OOM happened there when reading very large groups.

Jianshi

On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai  wrote:

>  I think what you could do is to monitor through web UI to see if there’s
> any skew or other symptoms in shuffle write and read. For GC you could use
> the below configuration as you mentioned.
>
>
>
> From Spark core side, all the shuffle related operations can spill the
> data into disk and no need to read the whole partition into memory. But if
> you uses SparkSQL, it depends on how SparkSQL uses this operators.
>
>
>
> CC @hao if he has some thoughts on it.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 3:28 PM
> *To:* Shao, Saisai
>
> *Cc:* user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> Hi Saisai,
>
>
>
> What's your suggested settings on monitoring shuffle? I've
> enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.
>
>
>
> I found SPARK-3461 (Support external groupByKey using
> repartitionAndSortWithinPartitions) want to make groupByKey using external
> storage. It's still open status. Does that mean now
> groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
> the group as a whole during consuming?
>
>
>
> How can I deal with the key skewness in joins? Is there a skew-join
> implementation?
>
>
>
>
>
> Jianshi
>
>
>
>
>
>
>
> On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
> wrote:
>
>  Hi Jianshi,
>
>
>
> From my understanding, it may not be the problem of NIO or Netty, looking
> at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
> theoretically EAOM can spill the data into disk if memory is not enough,
> but there might some issues when join key is skewed or key number is
> smaller, so you will meet OOM.
>
>
>
> Maybe you could monitor each stage or task’s shuffle and GC status also
> system status to identify the problem.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 2:32 PM
> *To:* Aaron Davidson
> *Cc:* user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> One really interesting is that when I'm using the
> netty-based spark.shuffle.blockTransferService, there's no OOM error
> messages (java.lang.OutOfMemoryError: Java heap space).
>
>
>
> Any idea why it's not here?
>
>
>
> I'm using Spark 1.2.1.
>
>
>
> Jianshi
>
>
>
> On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
> wrote:
>
>  I changed spark.shuffle.blockTransferService to nio and now I'm getting
> OOM errors, I'm doing a big join operation.
>
>
>
>
>
> 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
> (TID 6207)
>
> java.lang.OutOfMemoryError: Java heap space
>
> at
> org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
>
> at
> org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
>
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
>
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
>
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
>
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
>
> at
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
>
> at
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
>
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
>
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>
> at
> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
> at
> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
> at org.apache.spark.rdd.RDD.iterator(RD

RE: Passing around SparkContext with in the Driver

2015-03-04 Thread Kapil Malik
Replace 
val sqlContext = new SQLContext(sparkContext)
with

@transient 
val sqlContext = new SQLContext(sparkContext)

-Original Message-
From: kpeng1 [mailto:kpe...@gmail.com] 
Sent: 04 March 2015 23:39
To: user@spark.apache.org
Subject: Passing around SparkContext with in the Driver

Hi All,

I am trying to create a class that wraps functionalities that I need; some of 
these functions require access to the SparkContext, which I would like to pass 
in.  I know that the SparkContext is not seralizable, and I am not planning on 
passing it to worker nodes or anything, I just want to wrap some 
functionalities that require SparkContext's api.  As a preface, I am basically 
using the spark shell to test the functionality of my code at the moment, so I 
am not sure if that plays into any of the issues I am having. 
Here is my current class:

class MyClass(sparkContext: SparkContext) {
  import org.apache.spark.sql._
  import org.apache.spark.rdd._

  val sqlContext = new SQLContext(sparkContext)

  val DATA_TYPE_MAPPING = Map(
"int" -> IntegerType,
"double" -> DoubleType,
"float" -> FloatType,
"long" -> LongType,
"short" -> ShortType,
"binary" -> BinaryType,
"bool" -> BooleanType,
"byte" -> ByteType,
"string" -> StringType)

  //removes the first line of a text file
  def removeHeader(partitionIdx: Int, fileItr: Iterator[String]):
Iterator[String] ={
//header line is first line in first partition
if(partitionIdx == 0){
  fileItr.drop(1)
}
fileItr
  }

  //returns back a StructType for the schema
  def getSchema(rawSchema: Array[String]): StructType ={
//return backs a StructField
def getSchemaFieldHelper(schemaField: String): StructField ={
  val schemaParts = schemaField.split(' ')
  StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true)
}

val structFields = rawSchema.map(column => getSchemaFieldHelper(column))
StructType(structFields)
  }

  def getRow(strRow: String): Row ={
val spRow = strRow.split(',')
val tRow = spRow.map(_.trim)
Row(tRow:_*)
  }
  def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile:
String): SchemaRDD ={
//apply schema to rdd to create schemaRDD
def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType):
SchemaRDD ={
  val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct)
  schemaRDD
}
  
val rawSchema = sparkContext.textFile(schemaFile).collect
val schema = getSchema(rawSchema)
  
val rawCsvData = sparkContext.textFile(csvFile)
  
//if we want to keep header from csv file
if(includeHeader){
  val rowRDD = rawCsvData.map(getRow) 
  val schemaRDD = createSchemaRDD(rowRDD, schema)
  return schemaRDD
}
 
val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader)
val rowRDD = csvData.map(getRow)
val schemaRDD = createSchemaRDD(rowRDD, schema)
schemaRDD
  }

}

So in the spark shell I am basically creating an instance of this class and 
calling applySchemaToCsv like so:
val test = new MyClass(sc)
test.applySchemaToCsv("/tmp/myFile.csv", false, "/tmp/schema.txt")

What I am getting is not serializable exception:
15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at
:62
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615)
   .
   .
   .
Caused by: java.io.NotSerializableException:


If I remove the class wrapper and make references to sc directly everything 
works.  I am basically wondering what is causing the serialization issues and 
if I can wrap a class around these functions.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
I think what you could do is to monitor through web UI to see if there’s any 
skew or other symptoms in shuffle write and read. For GC you could use the 
below configuration as you mentioned.

From Spark core side, all the shuffle related operations can spill the data 
into disk and no need to read the whole partition into memory. But if you uses 
SparkSQL, it depends on how SparkSQL uses this operators.

CC @hao if he has some thoughts on it.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:28 PM
To: Shao, Saisai
Cc: user
Subject: Re: Having lots of FetchFailedException in join

Hi Saisai,

What's your suggested settings on monitoring shuffle? I've enabled 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using 
repartitionAndSortWithinPartitions) want to make groupByKey using external 
storage. It's still open status. Does that mean now 
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the 
group as a whole during consuming?

How can I deal with the key skewness in joins? Is there a skew-join 
implementation?


Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
mailto:saisai.s...@intel.com>> wrote:
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
mailto:jianshi.hu...@gmail.com>> wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
Hi Saisai,

What's your suggested settings on monitoring shuffle? I've
enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using
repartitionAndSortWithinPartitions) want to make groupByKey using external
storage. It's still open status. Does that mean now
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
the group as a whole during consuming?


How can I deal with the key skewness in joins? Is there a skew-join
implementation?



Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai  wrote:

>  Hi Jianshi,
>
>
>
> From my understanding, it may not be the problem of NIO or Netty, looking
> at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
> theoretically EAOM can spill the data into disk if memory is not enough,
> but there might some issues when join key is skewed or key number is
> smaller, so you will meet OOM.
>
>
>
> Maybe you could monitor each stage or task’s shuffle and GC status also
> system status to identify the problem.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 2:32 PM
> *To:* Aaron Davidson
> *Cc:* user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> One really interesting is that when I'm using the
> netty-based spark.shuffle.blockTransferService, there's no OOM error
> messages (java.lang.OutOfMemoryError: Java heap space).
>
>
>
> Any idea why it's not here?
>
>
>
> I'm using Spark 1.2.1.
>
>
>
> Jianshi
>
>
>
> On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
> wrote:
>
>  I changed spark.shuffle.blockTransferService to nio and now I'm getting
> OOM errors, I'm doing a big join operation.
>
>
>
>
>
> 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
> (TID 6207)
>
> java.lang.OutOfMemoryError: Java heap space
>
> at
> org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
>
> at
> org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
>
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
>
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
>
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
>
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
>
> at
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
>
> at
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
>
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
>
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>
> at
> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
> at
> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
> at
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>
>
>
> Is join/cogroup still memory bound?
>
>
>
>
>
> Jianshi
>
>
>
>
>
>
>
> On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang 
> wrote:
>
>  Hmm... ok, previous errors are still block fetch errors.
>
>
>
> 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning

RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
mailto:jianshi.hu...@gmail.com>> wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)

Is join/cogroup still memory bound?


Jianshi



On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang 
mailto:jianshi.hu...@gmail.com>> wrote:
Hmm... ok, previous errors are still block fetch errors.

15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch 
of 11 outstanding blocks
java.io.IOException: Failed to connect to host-/:55597
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
at 
org.apache.spark.storage.ShuffleBlo

RE: distribution of receivers in spark streaming

2015-03-04 Thread Shao, Saisai
Yes, hostname is enough.

I think currently it is hard for user code to get the worker list from 
standalone master. If you can get the Master object, you could get the worker 
list, but AFAIK may be it is difficult to get this object. All you could do is 
to manually get the worker list and assigned its hostname to each receiver.

Thanks
Jerry

From: Du Li [mailto:l...@yahoo-inc.com]
Sent: Thursday, March 5, 2015 2:29 PM
To: Shao, Saisai; User
Subject: Re: distribution of receivers in spark streaming

Hi Jerry,

Thanks for your response.

Is there a way to get the list of currently registered/live workers? Even in 
order to provide preferredLocation, it would be safer to know which workers are 
active. Guess I only need to provide the hostname, right?

Thanks,
Du

On Wednesday, March 4, 2015 10:08 PM, "Shao, Saisai" 
mailto:saisai.s...@intel.com>> wrote:

Hi Du,

You could try to sleep for several seconds after creating streaming context to 
let all the executors registered, then all the receivers can distribute to the 
nodes more evenly. Also setting locality is another way as you mentioned.

Thanks
Jerry


From: Du Li [mailto:l...@yahoo-inc.com.INVALID]
Sent: Thursday, March 5, 2015 1:50 PM
To: User
Subject: Re: distribution of receivers in spark streaming

Figured it out: I need to override method preferredLocation() in MyReceiver 
class.

On Wednesday, March 4, 2015 3:35 PM, Du Li 
mailto:l...@yahoo-inc.com.INVALID>> wrote:

Hi,

I have a set of machines (say 5) and want to evenly launch a number (say 8) of 
kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:
val streams = (1 to numReceivers).map(_ => ssc.receiverStream(new 
MyKafkaReceiver()))
ssc.union(streams)

However, from the spark UI, I saw that some machines are not running any 
instance of the receiver while some get three. The mapping changed every time 
the system was restarted. This impacts the receiving and also the processing 
speeds.

I wonder if it's possible to control/suggest the distribution so that it would 
be more even. How is the decision made in spark?

Thanks,
Du






Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
One really interesting is that when I'm using the
netty-based spark.shuffle.blockTransferService, there's no OOM error
messages (java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
wrote:

> I changed spark.shuffle.blockTransferService to nio and now I'm getting
> OOM errors, I'm doing a big join operation.
>
>
> 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
> (TID 6207)
> java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
> at
> org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
> at
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
> at
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at
> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>
> Is join/cogroup still memory bound?
>
>
> Jianshi
>
>
>
> On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang 
> wrote:
>
>> Hmm... ok, previous errors are still block fetch errors.
>>
>> 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
>> fetch of 11 outstanding blocks
>> java.io.IOException: Failed to connect to host-/:55597
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>> at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIter

Re: distribution of receivers in spark streaming

2015-03-04 Thread Du Li
Hi Jerry,
Thanks for your response.
Is there a way to get the list of currently registered/live workers? Even in 
order to provide preferredLocation, it would be safer to know which workers are 
active. Guess I only need to provide the hostname, right?
Thanks,Du 

 On Wednesday, March 4, 2015 10:08 PM, "Shao, Saisai" 
 wrote:
   

 #yiv8205255497 #yiv8205255497 -- _filtered #yiv8205255497 
{font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv8205255497 
{font-family:SimSun;panose-1:2 1 6 0 3 1 1 1 1 1;} _filtered #yiv8205255497 
{panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv8205255497 
{font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv8205255497 
{panose-1:2 1 6 0 3 1 1 1 1 1;}#yiv8205255497 #yiv8205255497 
p.yiv8205255497MsoNormal, #yiv8205255497 li.yiv8205255497MsoNormal, 
#yiv8205255497 div.yiv8205255497MsoNormal 
{margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8205255497 a:link, 
#yiv8205255497 span.yiv8205255497MsoHyperlink 
{color:#0563C1;text-decoration:underline;}#yiv8205255497 a:visited, 
#yiv8205255497 span.yiv8205255497MsoHyperlinkFollowed 
{color:#954F72;text-decoration:underline;}#yiv8205255497 
span.yiv8205255497EmailStyle17 {color:#1F497D;}#yiv8205255497 
.yiv8205255497MsoChpDefault {font-size:10.0pt;} _filtered #yiv8205255497 
{margin:1.0in 1.0in 1.0in 1.0in;}#yiv8205255497 div.yiv8205255497WordSection1 
{}#yiv8205255497 Hi Du,    You could try to sleep for several seconds after 
creating streaming context to let all the executors registered, then all the 
receivers can distribute to the nodes more evenly. Also setting locality is 
another way as you mentioned.    Thanks Jerry       From: Du Li 
[mailto:l...@yahoo-inc.com.INVALID]
Sent: Thursday, March 5, 2015 1:50 PM
To: User
Subject: Re: distribution of receivers in spark streaming    Figured it out: I 
need to override method preferredLocation() in MyReceiver class.    On 
Wednesday, March 4, 2015 3:35 PM, Du Li  wrote:    
Hi,    I have a set of machines (say 5) and want to evenly launch a number (say 
8) of kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:         val streams = (1 to 
numReceivers).map(_ => ssc.receiverStream(new MyKafkaReceiver()))         
ssc.union(streams)    However, from the spark UI, I saw that some machines are 
not running any instance of the receiver while some get three. The mapping 
changed every time the system was restarted. This impacts the receiving and 
also the processing speeds.    I wonder if it's possible to control/suggest the 
distribution so that it would be more even. How is the decision made in spark?  
  Thanks, Du          

   

RE: distribution of receivers in spark streaming

2015-03-04 Thread Shao, Saisai
Hi Du,

You could try to sleep for several seconds after creating streaming context to 
let all the executors registered, then all the receivers can distribute to the 
nodes more evenly. Also setting locality is another way as you mentioned.

Thanks
Jerry


From: Du Li [mailto:l...@yahoo-inc.com.INVALID]
Sent: Thursday, March 5, 2015 1:50 PM
To: User
Subject: Re: distribution of receivers in spark streaming

Figured it out: I need to override method preferredLocation() in MyReceiver 
class.

On Wednesday, March 4, 2015 3:35 PM, Du Li 
mailto:l...@yahoo-inc.com.INVALID>> wrote:

Hi,

I have a set of machines (say 5) and want to evenly launch a number (say 8) of 
kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:
val streams = (1 to numReceivers).map(_ => ssc.receiverStream(new 
MyKafkaReceiver()))
ssc.union(streams)

However, from the spark UI, I saw that some machines are not running any 
instance of the receiver while some get three. The mapping changed every time 
the system was restarted. This impacts the receiving and also the processing 
speeds.

I wonder if it's possible to control/suggest the distribution so that it would 
be more even. How is the decision made in spark?

Thanks,
Du





Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID
6207)
java.lang.OutOfMemoryError: Java heap space
at
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)

Is join/cogroup still memory bound?


Jianshi



On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang 
wrote:

> Hmm... ok, previous errors are still block fetch errors.
>
> 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
> fetch of 11 outstanding blocks
> java.io.IOException: Failed to connect to host-/:55597
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.appl

Re: distribution of receivers in spark streaming

2015-03-04 Thread Du Li
Figured it out: I need to override method preferredLocation() in MyReceiver 
class. 

 On Wednesday, March 4, 2015 3:35 PM, Du Li  
wrote:
   

 Hi,
I have a set of machines (say 5) and want to evenly launch a number (say 8) of 
kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:        val streams = (1 to 
numReceivers).map(_ => ssc.receiverStream(new MyKafkaReceiver()))        
ssc.union(streams)
However, from the spark UI, I saw that some machines are not running any 
instance of the receiver while some get three. The mapping changed every time 
the system was restarted. This impacts the receiving and also the processing 
speeds.
I wonder if it's possible to control/suggest the distribution so that it would 
be more even. How is the decision made in spark?
Thanks,Du



   

Re: spark master shut down suddenly

2015-03-04 Thread Benjamin Stickel
Generally the location of logs in /var/log/mesos but the definitive
configuration can be found via the /etc/mesos-master/... configuration
files. There should be a configuration file labeled log_dir.

ps -ax | grep mesos should also show the output of the configuration if it
is configured.

Another location to review would potentially be /etc/default/mesos-master



On Wed, Mar 4, 2015 at 9:31 PM, Denny Lee  wrote:

> It depends on your setup but one of the locations is /var/log/mesos
> On Wed, Mar 4, 2015 at 19:11 lisendong  wrote:
>
>> I ‘m sorry, but how to look at the mesos logs?
>> where are them?
>>
>>
>>
>> 在 2015年3月4日,下午6:06,Akhil Das  写道:
>>
>>
>> You can check in the mesos logs and see whats really happening.
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Mar 4, 2015 at 3:10 PM, lisendong  wrote:
>>
>>> 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not
>>> heard
>>> from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
>>> connection and attempting reconnect
>>> 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
>>> 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost
>>> leadership
>>> 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
>>> shutting down.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


How to parse Json formatted Kafka message in spark streaming

2015-03-04 Thread Cui Lin
Friends,

I'm trying to parse json formatted Kafka messages and then send back to 
cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread "main" java.lang.UnsupportedOperationException: empty 
collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](…)

messages.foreachRDD { rdd =>
  val message:RDD[String] = rdd.map { y => y._2 }
  sqlContext.jsonRDD(message).registerTempTable("tempTable")
  sqlContext.sql("SELECT time,To FROM tempTable")
.saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
"msg"))
}

2. how to get all column names from json messages? I have hundreds of columns 
in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin


Re: spark master shut down suddenly

2015-03-04 Thread Denny Lee
It depends on your setup but one of the locations is /var/log/mesos
On Wed, Mar 4, 2015 at 19:11 lisendong  wrote:

> I ‘m sorry, but how to look at the mesos logs?
> where are them?
>
>
>
> 在 2015年3月4日,下午6:06,Akhil Das  写道:
>
>
> You can check in the mesos logs and see whats really happening.
>
> Thanks
> Best Regards
>
> On Wed, Mar 4, 2015 at 3:10 PM, lisendong  wrote:
>
>> 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not
>> heard
>> from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
>> connection and attempting reconnect
>> 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
>> 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost
>> leadership
>> 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
>> shutting down.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Unable to Read/Write Avro RDD on cluster.

2015-03-04 Thread ๏̯͡๏
I am trying to read RDD avro, transform and write.
I am able to run it locally fine but when i run onto cluster, i see issues
with Avro.


export SPARK_HOME=/home/dvasthimal/spark/spark-1.0.2-bin-2.4.1
export SPARK_YARN_USER_ENV="CLASSPATH=/apache/hadoop/conf"
export HADOOP_CONF_DIR=/apache/hadoop/conf
export YARN_CONF_DIR=/apache/hadoop/conf
export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.0.2-hadoop2.4.1.jar
export SPARK_LIBRARY_PATH=/apache/hadoop/lib/native
export SPARK_YARN_USER_ENV="CLASSPATH=/apache/hadoop/conf"
export SPARK_YARN_USER_ENV="CLASSPATH=/apache/hadoop/conf"
export
SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-company-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/home/dvasthimal/spark/avro-mapred-1.7.7-hadoop2.jar:/home/dvasthimal/spark/avro-1.7.7.jar
export SPARK_LIBRARY_PATH="/apache/hadoop/lib/native"
export YARN_CONF_DIR=/apache/hadoop/conf/

cd $SPARK_HOME

./bin/spark-submit --master yarn-cluster --jars
/home/dvasthimal/spark/avro-mapred-1.7.7-hadoop2.jar,/home/dvasthimal/spark/avro-1.7.7.jar
--num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores
1  --queue hdmi-spark --class com.company.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-02-16 endDate=2015-02-16
epoutputdirectory=/user/dvasthimal/epdatasets_small/exptsession
subcommand=successevents
outputdir=/user/dvasthimal/epdatasets/successdetail

Spark assembly has been built with Hive, including Datanucleus jars on
classpath
15/03/04 03:20:29 INFO client.ConfiguredRMFailoverProxyProvider: Failing
over to rm2
15/03/04 03:20:30 INFO yarn.Client: Got Cluster metric info from
ApplicationsManager (ASM), number of NodeManagers: 2221
15/03/04 03:20:30 INFO yarn.Client: Queue info ... queueName: hdmi-spark,
queueCurrentCapacity: 0.7162806, queueMaxCapacity: 0.08,
  queueApplicationCount = 7, queueChildQueueCount = 0
15/03/04 03:20:30 INFO yarn.Client: Max mem capabililty of a single
resource in this cluster 16384
15/03/04 03:20:30 INFO yarn.Client: Preparing Local resources
15/03/04 03:20:30 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/03/04 03:20:30 WARN hdfs.BlockReaderLocal: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.


15/03/04 03:20:46 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token
7780745 for dvasthimal on 10.115.206.112:8020
15/03/04 03:20:46 INFO yarn.Client: Uploading
file:/home/dvasthimal/spark/spark_reporting-1.0-SNAPSHOT.jar to hdfs://
apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/spark_reporting-1.0-SNAPSHOT.jar
15/03/04 03:20:47 INFO yarn.Client: Uploading
file:/home/dvasthimal/spark/spark-1.0.2-bin-2.4.1/lib/spark-assembly-1.0.2-hadoop2.4.1.jar
to hdfs://
apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/spark-assembly-1.0.2-hadoop2.4.1.jar
15/03/04 03:20:52 INFO yarn.Client: Uploading
file:/home/dvasthimal/spark/avro-mapred-1.7.7-hadoop2.jar to hdfs://
apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/avro-mapred-1.7.7-hadoop2.jar
15/03/04 03:20:52 INFO yarn.Client: Uploading
file:/home/dvasthimal/spark/avro-1.7.7.jar to hdfs://
apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/avro-1.7.7.jar
15/03/04 03:20:54 INFO yarn.Client: Setting up the launch environment
15/03/04 03:20:54 INFO yarn.Client: Setting up container launch context
15/03/04 03:20:54 INFO yarn.Client: Command for starting the Spark
ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx4096m,
-Djava.io.tmpdir=$PWD/tmp,
-Dspark.app.name=\"com.company.ep.poc.spark.reporting.SparkApp\",
 -Dlog4j.configuration=log4j-spark-container.properties,
org.apache.spark.deploy.yarn.ApplicationMaster, --class,
com.company.ep.poc.spark.reporting.SparkApp, --jar ,
file:/home/dvasthimal/spark/spark_reporting-1.0-SNAPSHOT.jar,  --args
 'startDate=2015-02-16'  --args  'endDate=2015-02-16'  --args
 'epoutputdirectory=/user/dvasthimal/epdatasets_small/exptsession'  --args
 'subcommand=successevents'  --args
 'outputdir=/user/dvasthimal/epdatasets/successdetail' , --executor-memory,
2048, --executor-cores, 1, --num-executors , 3, 1>, /stdout, 2>,
/stderr)
15/03/04 03:20:54 INFO yarn.Client: Submitting application to ASM
15/03/04 03:20:54 INFO impl.YarnClientImpl: Submitted application
application_1425075571333_61948
15/03/04 03:20:56 INFO yarn.Client: Application report from ASM:
 application identifier: application_1425075571333_61948
 appId: 61948
 clientToAMToken: null
 appDiagnostics:
 appMasterHost: N/A
 appQueue: hdmi-spark
 appMasterRpcPort: -1
 appStartTime: 1425464454263
 yarnAppState: ACCEPTED
 distributedFinalState: UNDEFINED
 appTrackingUrl:
https://apollo-phx-rm-2.company.com:50030/proxy/application_1425075571333_61948/
 appUser: dvasthimal
15/0

Re: Where can I find more information about the R interface forSpark?

2015-03-04 Thread Ted Yu
Please follow SPARK-5654

On Wed, Mar 4, 2015 at 7:22 PM, Haopu Wang  wrote:

>  Thanks, it's an active project.
>
>
>
> Will it be released with Spark 1.3.0?
>
>
>  --
>
> *From:* 鹰 [mailto:980548...@qq.com]
> *Sent:* Thursday, March 05, 2015 11:19 AM
> *To:* Haopu Wang; user
> *Subject:* Re: Where can I find more information about the R interface
> forSpark?
>
>
>
> you can search SparkR on google or search it on github
>


RE: Where can I find more information about the R interface forSpark?

2015-03-04 Thread Haopu Wang
Thanks, it's an active project.

 

Will it be released with Spark 1.3.0?

 



From: 鹰 [mailto:980548...@qq.com] 
Sent: Thursday, March 05, 2015 11:19 AM
To: Haopu Wang; user
Subject: Re: Where can I find more information about the R interface forSpark?

 

you can search SparkR on google or search it on github 



Re: Where can I find more information about the R interface forSpark?

2015-03-04 Thread ??
you can search SparkR on google or search it on github

Re: Where can I find more information about the R interface for Spark?

2015-03-04 Thread haopu
Do you have any update on SparkR?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Where-can-I-find-more-information-about-the-R-interface-for-Spark-tp155p21922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark master shut down suddenly

2015-03-04 Thread lisendong
I ‘m sorry, but how to look at the mesos logs?
where are them?



> 在 2015年3月4日,下午6:06,Akhil Das  写道:
> 
> You can check in the mesos logs and see whats really happening.
> 
> Thanks
> Best Regards
> 
> On Wed, Mar 4, 2015 at 3:10 PM, lisendong  > wrote:
> 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard
> from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
> connection and attempting reconnect
> 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
> 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost leadership
> 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
> shutting down.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



how to update als in mllib?

2015-03-04 Thread lisendong
I 'm using spark1.0.0 with cloudera.

but I want to use new als code which supports more features, such as rdd
cache level(MEMORY ONLY), checkpoint, and so on.

What is the easiest way to use the new als code?

I only need the mllib als code, so maybe I don't need to update all the
spark & mllib  of the cluster machines...

maybe I download a new spark jar, and include it in my driver is enough?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-update-als-in-mllib-tp21921.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Driver disassociated

2015-03-04 Thread Ted Yu
See this thread:
https://groups.google.com/forum/#!topic/akka-user/X3xzpTCbEFs

Here're the relevant config parameters in Spark:
val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses",
6000)
val akkaHeartBeatInterval =
conf.getInt("spark.akka.heartbeat.interval", 1000)

Cheers

On Wed, Mar 4, 2015 at 4:09 PM, Thomas Gerber 
wrote:

> Also,
>
> I was experiencing another problem which might be related:
> "Error communicating with MapOutputTracker" (see email in the ML today).
>
> I just thought I would mention it in case it is relevant.
>
> On Wed, Mar 4, 2015 at 4:07 PM, Thomas Gerber 
> wrote:
>
>> 1.2.1
>>
>> Also, I was using the following parameters, which are 10 times the
>> default ones:
>> spark.akka.timeout 1000
>> spark.akka.heartbeat.pauses 6
>> spark.akka.failure-detector.threshold 3000.0
>> spark.akka.heartbeat.interval 1
>>
>> which should have helped *avoid* the problem if I understand correctly.
>>
>> Thanks,
>> Thomas
>>
>> On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu  wrote:
>>
>>> What release are you using ?
>>>
>>> SPARK-3923 went into 1.2.0 release.
>>>
>>> Cheers
>>>
>>> On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber 
>>> wrote:
>>>
 Hello,

 sometimes, in the *middle* of a job, the job stops (status is then
 seen as FINISHED in the master).

 There isn't anything wrong in the shell/submit output.

 When looking at the executor logs, I see logs like this:

 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
 actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
 :40019/user/MapOutputTracker#893807065]
 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs
 for shuffle 38, fetching them
 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766]
 -> [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 disassociated! Shutting down.
 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with
 remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 How can I investigate further?
 Thanks

>>>
>>>
>>
>


Re: TreeNodeException: Unresolved attributes

2015-03-04 Thread Zhan Zhang
Which spark version did you use? I tried spark-1.2.1 and didn’t meet this 
problem.

scala> val m = hiveContext.sql(" select * from  testtable where value like 
'%Restaurant%'")
15/03/05 02:02:30 INFO ParseDriver: Parsing command: select * from  testtable 
where value like '%Restaurant%'
15/03/05 02:02:30 INFO ParseDriver: Parse Completed
15/03/05 02:02:30 INFO MemoryStore: ensureFreeSpace(462299) called with 
curMem=1087888, maxMem=280248975
15/03/05 02:02:30 INFO MemoryStore: Block broadcast_2 stored as values in 
memory (estimated size 451.5 KB, free 265.8 MB)
15/03/05 02:02:30 INFO MemoryStore: ensureFreeSpace(81645) called with 
curMem=1550187, maxMem=280248975
15/03/05 02:02:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in 
memory (estimated size 79.7 KB, free 265.7 MB)
15/03/05 02:02:30 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
c6402.ambari.apache.org:33696 (size: 79.7 KB, 
free: 267.0 MB)
15/03/05 02:02:30 INFO BlockManagerMaster: Updated info of block 
broadcast_2_piece0
15/03/05 02:02:30 INFO DefaultExecutionContext: Created broadcast 2 from 
broadcast at TableReader.scala:68
m: org.apache.spark.sql.SchemaRDD =
SchemaRDD[3] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
Filter Contains(value#5, Restaurant)
 HiveTableScan [key#4,value#5], (MetastoreRelation default, testtable, None), 
None

scala>


Thanks.

Zhan Zhang

On Mar 4, 2015, at 9:09 AM, Anusha Shamanur 
mailto:anushas...@gmail.com>> wrote:

I tried. I still get the same error.

15/03/04 09:01:50 INFO parse.ParseDriver: Parsing command: select * from 
TableName where value like '%Restaurant%'

15/03/04 09:01:50 INFO parse.ParseDriver: Parse Completed.

15/03/04 09:01:50 INFO metastore.HiveMetaStore: 0: get_table : db=default 
tbl=TableName

15/03/04 09:01:50 INFO HiveMetaStore.audit: ugi=as7339 ip=unknown-ip-addr 
cmd=get_table : db=default tbl=TableName
results: org.apache.spark.sql.SchemaRDD =

SchemaRDD[86] at RDD at SchemaRDD.scala:108
== Query Plan ==

== Physical Plan ==

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved 
attributes: *, tree:

'Project [*]

'Filter ('value LIKE Restaurant)
  MetastoreRelation default, TableName, None



On Wed, Mar 4, 2015 at 5:39 AM, Arush Kharbanda 
mailto:ar...@sigmoidanalytics.com>> wrote:
Why don't you formulate a string before you pass it to the hql function 
(appending strings), and hql function is deprecated. You should use sql.

http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext

On Wed, Mar 4, 2015 at 6:15 AM, Anusha Shamanur 
mailto:anushas...@gmail.com>> wrote:
Hi,

I am trying to run a simple select query on a table.

val restaurants=hiveCtx.hql("select * from TableName where column like 
'%SomeString%' ")
This gives an error as below:
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved 
attributes: *, tree:

How do I solve this?


--
Regards,
Anusha



--

[Sigmoid Analytics]

Arush Kharbanda || Technical Teamlead

ar...@sigmoidanalytics.com || 
www.sigmoidanalytics.com



--
Regards,
Anusha



Extra output from Spark run

2015-03-04 Thread cjwang
When I run Spark 1.2.1, I found these display that wasn't in the previous
releases:

[Stage 12:=>   (6 + 1) /
16]
[Stage 12:>(8 + 1) /
16]
[Stage 12:==> (11 + 1) /
16]
[Stage 12:=>  (14 + 1) /
16]

What do they mean and how can I get rid of them?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extra-output-from-Spark-run-tp21920.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD coalesce or repartition by #records or #bytes?

2015-03-04 Thread Zhan Zhang
It use HashPartitioner to distribute the record to different partitions, but 
the key is just integer  evenly across output partitions.

>From the code, each resulting partition will get very similar number of 
>records.

Thanks.

Zhan Zhang


On Mar 4, 2015, at 3:47 PM, Du Li 
mailto:l...@yahoo-inc.com.INVALID>> wrote:

Hi,

My RDD's are created from kafka stream. After receiving a RDD, I want to do 
coalesce/repartition it so that the data will be processed in a set of machines 
in parallel as even as possible. The number of processing nodes is larger than 
the receiving nodes.

My question is how the coalesce/repartition works. Does it distribute by the 
number of records or number of bytes? In my app, my observation is that the 
distribution seems by number of records. The consequence is, however, some 
executors have to process x1000 as much as data when the sizes of records are 
very skewed. Then we have to allocate memory by the worst case.

Is there a way to programmatically affect the coalesce /repartition scheme?

Thanks,
Du



In the HA master mode, how to identify the alive master?

2015-03-04 Thread Xuelin Cao
Hi,

  In our project, we use "stand alone duo master" + "zookeeper" to make
the HA of spark master.

  Now the problem is, how do we know which master is the current alive
master?

  We tried to read the info that the master stored in zookeeper. But we
found there is no information to identify the "current alive master".

  Any suggestions for us?

Thanks


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Marcelo,

Thanks.  The one in the CDH repo fixed it :)

On Wed, Mar 4, 2015 at 4:37 PM, Marcelo Vanzin  wrote:

> Hi Kevin,
>
> If you're using CDH, I'd recommend using the CDH repo [1], and also
> the CDH version when building your app.
>
> [1]
> http://www.cloudera.com/content/cloudera/en/documentation/core/v5-2-x/topics/cdh_vd_cdh5_maven_repo.html
>
> On Wed, Mar 4, 2015 at 4:34 PM, Kevin Peng  wrote:
> > Ted,
> >
> > I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not
> too
> > sure about the compatibility issues between 1.2.0 and 1.2.1, that is why
> I
> > would want to stick to 1.2.0.
> >
> >
> >
> > On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu  wrote:
> >>
> >> Kevin:
> >> You can try with 1.2.1
> >>
> >> See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1
> >>
> >> Cheers
> >>
> >> On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng  wrote:
> >>>
> >>> Marcelo,
> >>>
> >>> Yes that is correct, I am going through a mirror, but 1.1.0 works
> >>> properly, while 1.2.0 does not.  I suspect there is crc in the 1.2.0
> pom
> >>> file.
> >>>
> >>> On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin 
> >>> wrote:
> 
>  Seems like someone set up "m2.mines.com" as a mirror in your pom file
>  or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
>  in a messed up state).
> 
>  On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
>  > Hi All,
>  >
>  > I am currently having problem with the maven dependencies for
> version
>  > 1.2.0
>  > of spark-core and spark-hive.  Here are my dependencies:
>  > 
>  >   org.apache.spark
>  >   spark-core_2.10
>  >   1.2.0
>  > 
>  > 
>  >   org.apache.spark
>  >   spark-hive_2.10
>  >   1.2.0
>  > 
>  >
>  > When the dependencies are set to version 1.1.0, I do not get any
>  > errors.
>  > Here are the errors I am getting from artifactory for version 1.2.0
> of
>  > spark-core:
>  > error=Could not transfer artifact
>  > org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
>  > (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
>  > file\:
>  >
>  > https\://m2.mines.com
> \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
>  > Return code is\: 409 , ReasonPhrase\:Conflict.
>  >
>  > The error is the same for spark-hive.
>  >
>  >
>  >
>  >
>  > --
>  > View this message in context:
>  >
> http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
>  > Sent from the Apache Spark User List mailing list archive at
>  > Nabble.com.
>  >
>  >
> -
>  > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>  > For additional commands, e-mail: user-h...@spark.apache.org
>  >
> 
> 
> 
>  --
>  Marcelo
> >>>
> >>>
> >>
> >
>
>
>
> --
> Marcelo
>


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Marcelo Vanzin
Hi Kevin,

If you're using CDH, I'd recommend using the CDH repo [1], and also
the CDH version when building your app.

[1] 
http://www.cloudera.com/content/cloudera/en/documentation/core/v5-2-x/topics/cdh_vd_cdh5_maven_repo.html

On Wed, Mar 4, 2015 at 4:34 PM, Kevin Peng  wrote:
> Ted,
>
> I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not too
> sure about the compatibility issues between 1.2.0 and 1.2.1, that is why I
> would want to stick to 1.2.0.
>
>
>
> On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu  wrote:
>>
>> Kevin:
>> You can try with 1.2.1
>>
>> See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1
>>
>> Cheers
>>
>> On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng  wrote:
>>>
>>> Marcelo,
>>>
>>> Yes that is correct, I am going through a mirror, but 1.1.0 works
>>> properly, while 1.2.0 does not.  I suspect there is crc in the 1.2.0 pom
>>> file.
>>>
>>> On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin 
>>> wrote:

 Seems like someone set up "m2.mines.com" as a mirror in your pom file
 or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
 in a messed up state).

 On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
 > Hi All,
 >
 > I am currently having problem with the maven dependencies for version
 > 1.2.0
 > of spark-core and spark-hive.  Here are my dependencies:
 > 
 >   org.apache.spark
 >   spark-core_2.10
 >   1.2.0
 > 
 > 
 >   org.apache.spark
 >   spark-hive_2.10
 >   1.2.0
 > 
 >
 > When the dependencies are set to version 1.1.0, I do not get any
 > errors.
 > Here are the errors I am getting from artifactory for version 1.2.0 of
 > spark-core:
 > error=Could not transfer artifact
 > org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
 > (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
 > file\:
 >
 > https\://m2.mines.com\:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
 > Return code is\: 409 , ReasonPhrase\:Conflict.
 >
 > The error is the same for spark-hive.
 >
 >
 >
 >
 > --
 > View this message in context:
 > http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
 > Sent from the Apache Spark User List mailing list archive at
 > Nabble.com.
 >
 > -
 > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 > For additional commands, e-mail: user-h...@spark.apache.org
 >



 --
 Marcelo
>>>
>>>
>>
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Ted,

I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not too
sure about the compatibility issues between 1.2.0 and 1.2.1, that is why I
would want to stick to 1.2.0.



On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu  wrote:

> Kevin:
> You can try with 1.2.1
>
> See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1
>
> Cheers
>
> On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng  wrote:
>
>> Marcelo,
>>
>> Yes that is correct, I am going through a mirror, but 1.1.0 works
>> properly, while 1.2.0 does not.  I suspect there is crc in the 1.2.0 pom
>> file.
>>
>> On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin 
>> wrote:
>>
>>> Seems like someone set up "m2.mines.com" as a mirror in your pom file
>>> or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
>>> in a messed up state).
>>>
>>> On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
>>> > Hi All,
>>> >
>>> > I am currently having problem with the maven dependencies for version
>>> 1.2.0
>>> > of spark-core and spark-hive.  Here are my dependencies:
>>> > 
>>> >   org.apache.spark
>>> >   spark-core_2.10
>>> >   1.2.0
>>> > 
>>> > 
>>> >   org.apache.spark
>>> >   spark-hive_2.10
>>> >   1.2.0
>>> > 
>>> >
>>> > When the dependencies are set to version 1.1.0, I do not get any
>>> errors.
>>> > Here are the errors I am getting from artifactory for version 1.2.0 of
>>> > spark-core:
>>> > error=Could not transfer artifact
>>> > org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
>>> > (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
>>> file\:
>>> > https\://m2.mines.com
>>> \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
>>> > Return code is\: 409 , ReasonPhrase\:Conflict.
>>> >
>>> > The error is the same for spark-hive.
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>


Re: scala.Double vs java.lang.Double in RDD

2015-03-04 Thread Tobias Pfeiffer
Hi,

On Thu, Mar 5, 2015 at 12:20 AM, Imran Rashid  wrote:

> This doesn't involve spark at all, I think this is entirely an issue with
> how scala deals w/ primitives and boxing.  Often it can hide the details
> for you, but IMO it just leads to far more confusing errors when things
> don't work out.  The issue here is that your map has value type Any, which
> leads scala to leave it as a boxed java.lang.Double.
>

I see, thank you very much for your explanation and the code examples!
Helps very much!

Thanks
Tobias


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Ted,

I have tried wiping out ~/.m2/org.../spark directory multiple times.  It
doesn't seem to work.



On Wed, Mar 4, 2015 at 4:20 PM, Ted Yu  wrote:

> kpeng1:
> Try wiping out ~/.m2 and build again.
>
> Cheers
>
> On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin 
> wrote:
>
>> Seems like someone set up "m2.mines.com" as a mirror in your pom file
>> or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
>> in a messed up state).
>>
>> On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
>> > Hi All,
>> >
>> > I am currently having problem with the maven dependencies for version
>> 1.2.0
>> > of spark-core and spark-hive.  Here are my dependencies:
>> > 
>> >   org.apache.spark
>> >   spark-core_2.10
>> >   1.2.0
>> > 
>> > 
>> >   org.apache.spark
>> >   spark-hive_2.10
>> >   1.2.0
>> > 
>> >
>> > When the dependencies are set to version 1.1.0, I do not get any errors.
>> > Here are the errors I am getting from artifactory for version 1.2.0 of
>> > spark-core:
>> > error=Could not transfer artifact
>> > org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
>> > (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
>> file\:
>> > https\://m2.mines.com
>> \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
>> > Return code is\: 409 , ReasonPhrase\:Conflict.
>> >
>> > The error is the same for spark-hive.
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>>
>>
>> --
>> Marcelo
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Ted Yu
Kevin:
You can try with 1.2.1

See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1

Cheers

On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng  wrote:

> Marcelo,
>
> Yes that is correct, I am going through a mirror, but 1.1.0 works
> properly, while 1.2.0 does not.  I suspect there is crc in the 1.2.0 pom
> file.
>
> On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin 
> wrote:
>
>> Seems like someone set up "m2.mines.com" as a mirror in your pom file
>> or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
>> in a messed up state).
>>
>> On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
>> > Hi All,
>> >
>> > I am currently having problem with the maven dependencies for version
>> 1.2.0
>> > of spark-core and spark-hive.  Here are my dependencies:
>> > 
>> >   org.apache.spark
>> >   spark-core_2.10
>> >   1.2.0
>> > 
>> > 
>> >   org.apache.spark
>> >   spark-hive_2.10
>> >   1.2.0
>> > 
>> >
>> > When the dependencies are set to version 1.1.0, I do not get any errors.
>> > Here are the errors I am getting from artifactory for version 1.2.0 of
>> > spark-core:
>> > error=Could not transfer artifact
>> > org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
>> > (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
>> file\:
>> > https\://m2.mines.com
>> \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
>> > Return code is\: 409 , ReasonPhrase\:Conflict.
>> >
>> > The error is the same for spark-hive.
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>>
>>
>> --
>> Marcelo
>>
>
>


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Ted Yu
kpeng1:
Try wiping out ~/.m2 and build again.

Cheers

On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin  wrote:

> Seems like someone set up "m2.mines.com" as a mirror in your pom file
> or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
> in a messed up state).
>
> On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
> > Hi All,
> >
> > I am currently having problem with the maven dependencies for version
> 1.2.0
> > of spark-core and spark-hive.  Here are my dependencies:
> > 
> >   org.apache.spark
> >   spark-core_2.10
> >   1.2.0
> > 
> > 
> >   org.apache.spark
> >   spark-hive_2.10
> >   1.2.0
> > 
> >
> > When the dependencies are set to version 1.1.0, I do not get any errors.
> > Here are the errors I am getting from artifactory for version 1.2.0 of
> > spark-core:
> > error=Could not transfer artifact
> > org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
> > (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
> file\:
> > https\://m2.mines.com
> \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
> > Return code is\: 409 , ReasonPhrase\:Conflict.
> >
> > The error is the same for spark-hive.
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Marcelo,

Yes that is correct, I am going through a mirror, but 1.1.0 works properly,
while 1.2.0 does not.  I suspect there is crc in the 1.2.0 pom file.

On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin  wrote:

> Seems like someone set up "m2.mines.com" as a mirror in your pom file
> or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
> in a messed up state).
>
> On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
> > Hi All,
> >
> > I am currently having problem with the maven dependencies for version
> 1.2.0
> > of spark-core and spark-hive.  Here are my dependencies:
> > 
> >   org.apache.spark
> >   spark-core_2.10
> >   1.2.0
> > 
> > 
> >   org.apache.spark
> >   spark-hive_2.10
> >   1.2.0
> > 
> >
> > When the dependencies are set to version 1.1.0, I do not get any errors.
> > Here are the errors I am getting from artifactory for version 1.2.0 of
> > spark-core:
> > error=Could not transfer artifact
> > org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
> > (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
> file\:
> > https\://m2.mines.com
> \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
> > Return code is\: 409 , ReasonPhrase\:Conflict.
> >
> > The error is the same for spark-hive.
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>
>
> --
> Marcelo
>


Re: Driver disassociated

2015-03-04 Thread Thomas Gerber
Also,

I was experiencing another problem which might be related:
"Error communicating with MapOutputTracker" (see email in the ML today).

I just thought I would mention it in case it is relevant.

On Wed, Mar 4, 2015 at 4:07 PM, Thomas Gerber 
wrote:

> 1.2.1
>
> Also, I was using the following parameters, which are 10 times the default
> ones:
> spark.akka.timeout 1000
> spark.akka.heartbeat.pauses 6
> spark.akka.failure-detector.threshold 3000.0
> spark.akka.heartbeat.interval 1
>
> which should have helped *avoid* the problem if I understand correctly.
>
> Thanks,
> Thomas
>
> On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu  wrote:
>
>> What release are you using ?
>>
>> SPARK-3923 went into 1.2.0 release.
>>
>> Cheers
>>
>> On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber 
>> wrote:
>>
>>> Hello,
>>>
>>> sometimes, in the *middle* of a job, the job stops (status is then seen
>>> as FINISHED in the master).
>>>
>>> There isn't anything wrong in the shell/submit output.
>>>
>>> When looking at the executor logs, I see logs like this:
>>>
>>> 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
>>> actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
>>> :40019/user/MapOutputTracker#893807065]
>>> 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs
>>> for shuffle 38, fetching them
>>> 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver
>>> Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766]
>>> -> [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
>>> disassociated! Shutting down.
>>> 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with
>>> remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
>>> has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>>>
>>> How can I investigate further?
>>> Thanks
>>>
>>
>>
>


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Marcelo Vanzin
Seems like someone set up "m2.mines.com" as a mirror in your pom file
or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
in a messed up state).

On Wed, Mar 4, 2015 at 3:49 PM, kpeng1  wrote:
> Hi All,
>
> I am currently having problem with the maven dependencies for version 1.2.0
> of spark-core and spark-hive.  Here are my dependencies:
> 
>   org.apache.spark
>   spark-core_2.10
>   1.2.0
> 
> 
>   org.apache.spark
>   spark-hive_2.10
>   1.2.0
> 
>
> When the dependencies are set to version 1.1.0, I do not get any errors.
> Here are the errors I am getting from artifactory for version 1.2.0 of
> spark-core:
> error=Could not transfer artifact
> org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
> (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer file\:
> https\://m2.mines.com\:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
> Return code is\: 409 , ReasonPhrase\:Conflict.
>
> The error is the same for spark-hive.
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Driver disassociated

2015-03-04 Thread Thomas Gerber
1.2.1

Also, I was using the following parameters, which are 10 times the default
ones:
spark.akka.timeout 1000
spark.akka.heartbeat.pauses 6
spark.akka.failure-detector.threshold 3000.0
spark.akka.heartbeat.interval 1

which should have helped *avoid* the problem if I understand correctly.

Thanks,
Thomas

On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu  wrote:

> What release are you using ?
>
> SPARK-3923 went into 1.2.0 release.
>
> Cheers
>
> On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber 
> wrote:
>
>> Hello,
>>
>> sometimes, in the *middle* of a job, the job stops (status is then seen
>> as FINISHED in the master).
>>
>> There isn't anything wrong in the shell/submit output.
>>
>> When looking at the executor logs, I see logs like this:
>>
>> 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
>> actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
>> :40019/user/MapOutputTracker#893807065]
>> 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for
>> shuffle 38, fetching them
>> 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver
>> Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766]
>> -> [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
>> disassociated! Shutting down.
>> 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with
>> remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
>> has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>>
>> How can I investigate further?
>> Thanks
>>
>
>


Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread kpeng1
Hi All,

I am currently having problem with the maven dependencies for version 1.2.0
of spark-core and spark-hive.  Here are my dependencies:

  org.apache.spark
  spark-core_2.10
  1.2.0


  org.apache.spark
  spark-hive_2.10
  1.2.0


When the dependencies are set to version 1.1.0, I do not get any errors. 
Here are the errors I am getting from artifactory for version 1.2.0 of
spark-core:
error=Could not transfer artifact
org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
(https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer file\:
https\://m2.mines.com\:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
Return code is\: 409 , ReasonPhrase\:Conflict.

The error is the same for spark-hive.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RDD coalesce or repartition by #records or #bytes?

2015-03-04 Thread Du Li
Hi,
My RDD's are created from kafka stream. After receiving a RDD, I want to do 
coalesce/repartition it so that the data will be processed in a set of machines 
in parallel as even as possible. The number of processing nodes is larger than 
the receiving nodes.
My question is how the coalesce/repartition works. Does it distribute by the 
number of records or number of bytes? In my app, my observation is that the 
distribution seems by number of records. The consequence is, however, some 
executors have to process x1000 as much as data when the sizes of records are 
very skewed. Then we have to allocate memory by the worst case.
Is there a way to programmatically affect the coalesce /repartition scheme?
Thanks,Du

distribution of receivers in spark streaming

2015-03-04 Thread Du Li
Hi,
I have a set of machines (say 5) and want to evenly launch a number (say 8) of 
kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:        val streams = (1 to 
numReceivers).map(_ => ssc.receiverStream(new MyKafkaReceiver()))        
ssc.union(streams)
However, from the spark UI, I saw that some machines are not running any 
instance of the receiver while some get three. The mapping changed every time 
the system was restarted. This impacts the receiving and also the processing 
speeds.
I wonder if it's possible to control/suggest the distribution so that it would 
be more even. How is the decision made in spark?
Thanks,Du



Re: Driver disassociated

2015-03-04 Thread Ted Yu
What release are you using ?

SPARK-3923 went into 1.2.0 release.

Cheers

On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber 
wrote:

> Hello,
>
> sometimes, in the *middle* of a job, the job stops (status is then seen
> as FINISHED in the master).
>
> There isn't anything wrong in the shell/submit output.
>
> When looking at the executor logs, I see logs like this:
>
> 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
> actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
> :40019/user/MapOutputTracker#893807065]
> 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for
> shuffle 38, fetching them
> 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
> [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] ->
> [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated!
> Shutting down.
> 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote
> system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> How can I investigate further?
> Thanks
>


Re: issue Running Spark Job on Yarn Cluster

2015-03-04 Thread roni
look at the logs
yarn logs --applicationId 
That should give the error.

On Wed, Mar 4, 2015 at 9:21 AM, sachin Singh 
wrote:

> Not yet,
> Please let. Me know if you found solution,
>
> Regards
> Sachin
> On 4 Mar 2015 21:45, "mael2210 [via Apache Spark User List]" <[hidden
> email] > wrote:
>
>> Hello,
>>
>> I am facing the exact same issue. Could you solve the problem ?
>>
>> Kind regards
>>
>> --
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697p21909.html
>>  To unsubscribe from issue Running Spark Job on Yarn Cluster, click here.
>> NAML
>> 
>>
>
> --
> View this message in context: Re: issue Running Spark Job on Yarn Cluster
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: spark sql median and standard deviation

2015-03-04 Thread Ted Yu
Please take a look at DoubleRDDFunctions.scala :

  /** Compute the mean of this RDD's elements. */
  def mean(): Double = stats().mean

  /** Compute the variance of this RDD's elements. */
  def variance(): Double = stats().variance

  /** Compute the standard deviation of this RDD's elements. */
  def stdev(): Double = stats().stdev

Cheers

On Wed, Mar 4, 2015 at 10:51 AM, tridib  wrote:

> Hello,
> Is there in built function for getting median and standard deviation in
> spark sql? Currently I am converting the schemaRdd to DoubleRdd and calling
> doubleRDD.stats(). But still it does not have median.
>
> What is the most efficient way to get the median?
>
> Thanks & Regards
> Tridib
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-median-and-standard-deviation-tp21914.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Driver disassociated

2015-03-04 Thread Thomas Gerber
Hello,

sometimes, in the *middle* of a job, the job stops (status is then seen as
FINISHED in the master).

There isn't anything wrong in the shell/submit output.

When looking at the executor logs, I see logs like this:

15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
:40019/user/MapOutputTracker#893807065]
15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for
shuffle 38, fetching them
15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] ->
[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated!
Shutting down.
15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has
failed, address is now gated for [5000] ms. Reason is: [Disassociated].

How can I investigate further?
Thanks


Re: Spark SQL Static Analysis

2015-03-04 Thread Justin Pihony
Thanks!

On Wed, Mar 4, 2015 at 3:58 PM, Michael Armbrust 
wrote:

> It is somewhat out of data, but here is what we have so far:
> https://github.com/marmbrus/sql-typed
>
> On Wed, Mar 4, 2015 at 12:53 PM, Justin Pihony 
> wrote:
>
>> I am pretty sure that I saw a presentation where SparkSQL could be
>> executed
>> with static analysis, however I cannot find the presentation now, nor can
>> I
>> find any documentation or research papers on the topic. So, I am curious
>> if
>> there is indeed any work going on for this topic. The two things I would
>> be
>> interested in would be to be able to gain compile time safety, as well as
>> gain the ability to work on my data as a type instead of a row (ie,
>> result.map(x=>x.Age) instead of having to use Row.get)
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark SQL Static Analysis

2015-03-04 Thread Michael Armbrust
It is somewhat out of data, but here is what we have so far:
https://github.com/marmbrus/sql-typed

On Wed, Mar 4, 2015 at 12:53 PM, Justin Pihony 
wrote:

> I am pretty sure that I saw a presentation where SparkSQL could be executed
> with static analysis, however I cannot find the presentation now, nor can I
> find any documentation or research papers on the topic. So, I am curious if
> there is indeed any work going on for this topic. The two things I would be
> interested in would be to be able to gain compile time safety, as well as
> gain the ability to work on my data as a type instead of a row (ie,
> result.map(x=>x.Age) instead of having to use Row.get)
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark SQL Static Analysis

2015-03-04 Thread Justin Pihony
I am pretty sure that I saw a presentation where SparkSQL could be executed
with static analysis, however I cannot find the presentation now, nor can I
find any documentation or research papers on the topic. So, I am curious if
there is indeed any work going on for this topic. The two things I would be
interested in would be to be able to gain compile time safety, as well as
gain the ability to work on my data as a type instead of a row (ie,
result.map(x=>x.Age) instead of having to use Row.get)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Integer column in schema RDD from parquet being considered as string

2015-03-04 Thread gtinside
Hi ,

I am coverting jsonRDD to parquet by saving it as parquet file
(saveAsParquetFile)
cacheContext.jsonFile("file:///u1/sample.json").saveAsParquetFile("sample.parquet")

I am reading parquet file and registering it as a table :
val parquet = cacheContext.parquetFile("sample_trades.parquet")
parquet.registerTempTable("sample")

When I do a print schema , I see :
root
 |-- SAMPLE: struct (nullable = true)
 ||-- CODE: integer (nullable = true)
 ||-- DESC: string (nullable = true)

When I query :
cacheContext.sql("select SAMPLE.DESC from sample where
SAMPLE.CODE=1").map(t=>t).collect.foreach(println) , I get error that 
java.lang.IllegalArgumentException: Column [CODE] was not found in schema!

but if I put SAMPLE.CODE in single code (forcing it as string) , it works ,
for example :
cacheContext.sql("select SAMPLE.DESC from sample where
*SAMPLE.CODE='1'*").map(t=>t).collect.foreach(println) works

What am I missing here ? I understand catalyst will do optimization so data
type doesn't matter that much , but something is off here .

Regards,
Gaurav









--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integer-column-in-schema-RDD-from-parquet-being-considered-as-string-tp21917.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Save and read parquet from the same path

2015-03-04 Thread Michael Armbrust
No, this is not safe to do.

On Wed, Mar 4, 2015 at 7:14 AM, Karlson  wrote:

> Hi all,
>
> what would happen if I save a RDD via saveAsParquetFile to the same path
> that RDD is originally read from? Is that a safe thing to do in Pyspark?
>
> Thanks!
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


configure number of cached partition in memory on SparkSQL

2015-03-04 Thread Judy Nash
Hi,

I am tuning a hive dataset on Spark SQL deployed via thrift server.

How can I change the number of partitions after caching the table on thrift 
server?

I have tried the following but still getting the same number of partitions 
after caching:
Spark.default.parallelism
spark.sql.inMemoryColumnarStorage.batchSize


Thanks,
Judy


Re: Spark Monitoring UI for Hadoop Yarn Cluster

2015-03-04 Thread Srini Karri
Hi Marcelo,

I found the problem from
http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/%3cCAL+LEBfzzjugOoB2iFFdz_=9TQsH=DaiKY=cvydfydg3ac5...@mail.gmail.com%3e
this link. The problem is the application I am running, is not generating
"APPLICATION_COMPLETE" file. If I add this file manually it is showing
application in the UI. So the problem is with application which is not
calling Stop method on the spark context.

Thank you and Todd for helping. Hopefully I will be able to apply these on
the actual cluster.

Regards,
Srini.

On Wed, Mar 4, 2015 at 10:20 AM, Srini Karri  wrote:

> Yes. I do see files, actually I missed copying the other settings:
>
> spark.master spark://
> skarri-lt05.redmond.corp.microsoft.com:7077
> spark.eventLog.enabled   true
> spark.rdd.compress true
> spark.storage.memoryFraction 1
> spark.core.connection.ack.wait.timeout 6000
> spark.akka.frameSize 50
> spark.executor.extraClassPath
> D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
> spark.eventLog.dir
> D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
> spark.history.fs.logDirectory
> D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
>
>
> On Wed, Mar 4, 2015 at 10:15 AM, Marcelo Vanzin 
> wrote:
>
>> On Wed, Mar 4, 2015 at 10:08 AM, Srini Karri 
>> wrote:
>> > spark.executor.extraClassPath
>> >
>> D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
>> > spark.eventLog.dir
>> >
>> D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
>> > spark.history.fs.logDirectory
>> >
>> D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
>>
>> Do you see any files in that directory?
>>
>> spark.eventLog.dir won't do anything unless you also have
>> spark.eventLog.enabled=true somewhere. And these are application
>> configs, so make sure they're set when running your application (not
>> when starting the history server).
>>
>> --
>> Marcelo
>>
>
>


spark sql median and standard deviation

2015-03-04 Thread tridib
Hello,
Is there in built function for getting median and standard deviation in
spark sql? Currently I am converting the schemaRdd to DoubleRdd and calling
doubleRDD.stats(). But still it does not have median.

What is the most efficient way to get the median?

Thanks & Regards
Tridib



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-median-and-standard-deviation-tp21914.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does anyone integrate HBASE on Spark

2015-03-04 Thread gen tang
Hi,

There are some examples in spark/example
 and there are also
some examples in spark package .
And I find this blog

is quite good.

Hope it would be helpful

Cheers
Gen


On Wed, Mar 4, 2015 at 6:51 PM, sandeep vura  wrote:

> Hi Sparkers,
>
> How do i integrate hbase on spark !!!
>
> Appreciate for replies !!
>
> Regards,
> Sandeep.v
>


Re: Error communicating with MapOutputTracker

2015-03-04 Thread Thomas Gerber
I meant spark.default.parallelism of course.

On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber 
wrote:

> Follow up:
> We re-retried, this time after *decreasing* spark.parallelism. It was set
> to 16000 before, (5 times the number of cores in our cluster). It is now
> down to 6400 (2 times the number of cores).
>
> And it got past the point where it failed before.
>
> Does the MapOutputTracker have a limit on the number of tasks it can track?
>
>
> On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber 
> wrote:
>
>> Hello,
>>
>> We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge
>> workers). We use spark-submit to start an application.
>>
>> We got the following error which leads to a failed stage:
>>
>> Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
>> most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
>> ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
>> communicating with MapOutputTracker
>>
>>
>> We tried the whole application again, and it failed on the same stage
>> (but it got more tasks completed on that stage) with the same error.
>>
>> We then looked at executors stderr, and all show similar logs, on both
>> runs (see below). As far as we can tell, executors and master have disk
>> space left.
>>
>> *Any suggestion on where to look to understand why the communication with
>> the MapOutputTracker fails?*
>>
>> Thanks
>> Thomas
>> 
>> In case it matters, our akka settings:
>> spark.akka.frameSize 50
>> spark.akka.threads 8
>> // those below are 10* the default, to cope with large GCs
>> spark.akka.timeout 1000
>> spark.akka.heartbeat.pauses 6
>> spark.akka.failure-detector.threshold 3000.0
>> spark.akka.heartbeat.interval 1
>>
>> Appendix: executor logs, where it starts going awry
>>
>> 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525
>> 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 
>> 298525)
>> 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with 
>> curMem=5543008799, maxMem=18127202549
>> 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as 
>> bytes in memory (estimated size 1473.0 B, free 11.7 GB)
>> 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block 
>> broadcast_339_piece0
>> 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 
>> 224 ms
>> 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with 
>> curMem=5543010272, maxMem=18127202549
>> 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in 
>> memory (estimated size 2.5 KB, free 11.7 GB)
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
>> actor = 
>> Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
>> shuffle 18, fetching them
>> 15/03/04 11:45:00

Re: Error communicating with MapOutputTracker

2015-03-04 Thread Thomas Gerber
Follow up:
We re-retried, this time after *decreasing* spark.parallelism. It was set
to 16000 before, (5 times the number of cores in our cluster). It is now
down to 6400 (2 times the number of cores).

And it got past the point where it failed before.

Does the MapOutputTracker have a limit on the number of tasks it can track?


On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber 
wrote:

> Hello,
>
> We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers).
> We use spark-submit to start an application.
>
> We got the following error which leads to a failed stage:
>
> Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
> most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
> ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
> communicating with MapOutputTracker
>
>
> We tried the whole application again, and it failed on the same stage (but
> it got more tasks completed on that stage) with the same error.
>
> We then looked at executors stderr, and all show similar logs, on both
> runs (see below). As far as we can tell, executors and master have disk
> space left.
>
> *Any suggestion on where to look to understand why the communication with
> the MapOutputTracker fails?*
>
> Thanks
> Thomas
> 
> In case it matters, our akka settings:
> spark.akka.frameSize 50
> spark.akka.threads 8
> // those below are 10* the default, to cope with large GCs
> spark.akka.timeout 1000
> spark.akka.heartbeat.pauses 6
> spark.akka.failure-detector.threshold 3000.0
> spark.akka.heartbeat.interval 1
>
> Appendix: executor logs, where it starts going awry
>
> 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525
> 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 
> 298525)
> 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with 
> curMem=5543008799, maxMem=18127202549
> 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as 
> bytes in memory (estimated size 1473.0 B, free 11.7 GB)
> 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block 
> broadcast_339_piece0
> 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 
> 224 ms
> 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with 
> curMem=5543010272, maxMem=18127202549
> 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in 
> memory (estimated size 2.5 KB, free 11.7 GB)
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor 
> = 
> Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 18, fetching them
> 15/03/04 11:45:00 INFO Map

Spark logs in standalone clusters

2015-03-04 Thread Thomas Gerber
Hello,

I was wondering where all the logs files were located on a standalone
cluster:

   1. the executor logs are in the work directory on each slave machine
   (stdout/stderr)
  - I've notice that GC information is in stdout, and stage information
  in stderr
  - *Could we get more information on what is written in stdout vs
  stderr?*
   2. the master log
  - The path to the log file is shown went you launch the master,
  like 
/mnt/var/log/apps/spark-hadoop-org.apache.spark.deploy.master.Master-MACHINENAME.out;
  - *Could we get more information on where this path is configured?*
   3. driver logs
  - It seems they are only in the console by default (although you can
  override that in the log4j.properties file.
   4. communication manager logs?
   - *Are there any logs for the communication manager (aka the
  MapOutputTracker?)?*
   5. Any other log file?

Thanks,
Thomas


Re: Spark Monitoring UI for Hadoop Yarn Cluster

2015-03-04 Thread Srini Karri
Yes. I do see files, actually I missed copying the other settings:

spark.master spark://
skarri-lt05.redmond.corp.microsoft.com:7077
spark.eventLog.enabled   true
spark.rdd.compress true
spark.storage.memoryFraction 1
spark.core.connection.ack.wait.timeout 6000
spark.akka.frameSize 50
spark.executor.extraClassPath
D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
spark.eventLog.dir
D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
spark.history.fs.logDirectory
D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events


On Wed, Mar 4, 2015 at 10:15 AM, Marcelo Vanzin  wrote:

> On Wed, Mar 4, 2015 at 10:08 AM, Srini Karri  wrote:
> > spark.executor.extraClassPath
> >
> D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
> > spark.eventLog.dir
> >
> D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
> > spark.history.fs.logDirectory
> >
> D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
>
> Do you see any files in that directory?
>
> spark.eventLog.dir won't do anything unless you also have
> spark.eventLog.enabled=true somewhere. And these are application
> configs, so make sure they're set when running your application (not
> when starting the history server).
>
> --
> Marcelo
>


Re: Spark Monitoring UI for Hadoop Yarn Cluster

2015-03-04 Thread Marcelo Vanzin
On Wed, Mar 4, 2015 at 10:08 AM, Srini Karri  wrote:
> spark.executor.extraClassPath
> D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
> spark.eventLog.dir
> D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
> spark.history.fs.logDirectory
> D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events

Do you see any files in that directory?

spark.eventLog.dir won't do anything unless you also have
spark.eventLog.enabled=true somewhere. And these are application
configs, so make sure they're set when running your application (not
when starting the history server).

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Monitoring UI for Hadoop Yarn Cluster

2015-03-04 Thread Srini Karri
Hi Todd and Marcelo,

Thanks for helping me. I was to able to lunch the history server on windows
with out any issues. One problem I am running into right now. I always get
the message no completed applications found in history server UI. But I was
able to browse through these applications from Spark Master. Do you have
any thoughts what could be problem? Following are my settings in spark conf
file:

spark.executor.extraClassPath
D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
spark.eventLog.dir
D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
spark.history.fs.logDirectory
D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events

Also I have attached Spark Master and Spark History server UI screen shots
for convenience. And all the logs are available and I granted directory
permissions to "Everyone with full control". Following is the console
output from History server:

D:\Apache\spark-1.2.1-bin-hadoop2\spark-1.2.1-bin-hadoop2.4\bin>spark-class.cmd
org.apache.spark.deploy.history.HistoryServer
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/03/04 08:59:42 INFO SecurityManager: Changing view acls to: skarri
15/03/04 08:59:42 INFO SecurityManager: Changing modify acls to: skarri
15/03/04 08:59:42 INFO SecurityManager: SecurityManager: authentication
disabled
; ui acls disabled; users with view permissions: Set(skarri); users with
modify
permissions: Set(skarri)
15/03/04 08:59:49 WARN NativeCodeLoader: Unable to load native-hadoop
library fo
r your platform... using builtin-java classes where applicable
15/03/04 08:59:56 INFO Utils: Successfully started service on port 18080.
15/03/04 08:59:56 INFO HistoryServer: Started HistoryServer at
http://skarri-lt0
5.redmond.corp.microsoft.com:18080

Regards,
Srini.

On Tue, Mar 3, 2015 at 11:41 AM, Marcelo Vanzin  wrote:

> Spark applications shown in the RM's UI should have an "Application
> Master" link when they're running. That takes you to the Spark UI for
> that application where you can see all the information you're looking
> for.
>
> If you're running a history server and add
> "spark.yarn.historyServer.address" to your config, that link will
> become a "History" link after the application is finished, and will
> take you to the history server to view the app's UI.
>
>
>
> On Tue, Mar 3, 2015 at 9:47 AM, Srini Karri  wrote:
> > Hi All,
> >
> > I am having trouble finding data related to my requirement. Here is the
> > context, I have tried Standalone Spark Installation on Windows, I am
> able to
> > submit the logs, able to see the history of events. My question is, is it
> > possible to achieve the same monitoring UI experience with Yarn Cluster
> like
> > Viewing workers, running/completed job stages in the Web UI. Currently,
> if
> > we go to our Yarn Resource manager UI, we are able to see the Spark Jobs
> and
> > it's logs. But it is not as rich as Spark Standalone master UI. Is this
> > limitation for hadoop yarn cluster or is there any way we can hook this
> > Spark Standalone master to Yarn Cluster?
> >
> > Any help is highly appreciated.
> >
> > Regards,
> > Srini.
>
>
>
> --
> Marcelo
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Passing around SparkContext with in the Driver

2015-03-04 Thread kpeng1
Hi All,

I am trying to create a class that wraps functionalities that I need; some
of these functions require access to the SparkContext, which I would like to
pass in.  I know that the SparkContext is not seralizable, and I am not
planning on passing it to worker nodes or anything, I just want to wrap some
functionalities that require SparkContext's api.  As a preface, I am
basically using the spark shell to test the functionality of my code at the
moment, so I am not sure if that plays into any of the issues I am having. 
Here is my current class:

class MyClass(sparkContext: SparkContext) {
  import org.apache.spark.sql._
  import org.apache.spark.rdd._

  val sqlContext = new SQLContext(sparkContext)

  val DATA_TYPE_MAPPING = Map(
"int" -> IntegerType,
"double" -> DoubleType,
"float" -> FloatType,
"long" -> LongType,
"short" -> ShortType,
"binary" -> BinaryType,
"bool" -> BooleanType,
"byte" -> ByteType,
"string" -> StringType)

  //removes the first line of a text file
  def removeHeader(partitionIdx: Int, fileItr: Iterator[String]):
Iterator[String] ={
//header line is first line in first partition
if(partitionIdx == 0){
  fileItr.drop(1)
}
fileItr
  }

  //returns back a StructType for the schema
  def getSchema(rawSchema: Array[String]): StructType ={
//return backs a StructField
def getSchemaFieldHelper(schemaField: String): StructField ={
  val schemaParts = schemaField.split(' ')
  StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true)
}

val structFields = rawSchema.map(column => getSchemaFieldHelper(column))
StructType(structFields)
  }

  def getRow(strRow: String): Row ={
val spRow = strRow.split(',')
val tRow = spRow.map(_.trim)
Row(tRow:_*)
  }
  def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile:
String): SchemaRDD ={
//apply schema to rdd to create schemaRDD
def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType):
SchemaRDD ={
  val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct)
  schemaRDD
}
  
val rawSchema = sparkContext.textFile(schemaFile).collect
val schema = getSchema(rawSchema)
  
val rawCsvData = sparkContext.textFile(csvFile)
  
//if we want to keep header from csv file
if(includeHeader){
  val rowRDD = rawCsvData.map(getRow) 
  val schemaRDD = createSchemaRDD(rowRDD, schema)
  return schemaRDD
}
 
val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader)
val rowRDD = csvData.map(getRow)
val schemaRDD = createSchemaRDD(rowRDD, schema)
schemaRDD
  }

}

So in the spark shell I am basically creating an instance of this class and
calling applySchemaToCsv like so:
val test = new MyClass(sc)
test.applySchemaToCsv("/tmp/myFile.csv", false, "/tmp/schema.txt")

What I am getting is not serializable exception:
15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at
:62
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615)
   .
   .
   .
Caused by: java.io.NotSerializableException:


If I remove the class wrapper and make references to sc directly everything
works.  I am basically wondering what is causing the serialization issues
and if I can wrap a class around these functions.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does anyone integrate HBASE on Spark

2015-03-04 Thread sandeep vura
Hi Sparkers,

How do i integrate hbase on spark !!!

Appreciate for replies !!

Regards,
Sandeep.v


Re: issue Running Spark Job on Yarn Cluster

2015-03-04 Thread sachin Singh
Not yet,
Please let. Me know if you found solution,

Regards
Sachin
On 4 Mar 2015 21:45, "mael2210 [via Apache Spark User List]" <
ml-node+s1001560n21909...@n3.nabble.com> wrote:

> Hello,
>
> I am facing the exact same issue. Could you solve the problem ?
>
> Kind regards
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697p21909.html
>  To unsubscribe from issue Running Spark Job on Yarn Cluster, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697p21912.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: TreeNodeException: Unresolved attributes

2015-03-04 Thread Anusha Shamanur
I tried. I still get the same error.

15/03/04 09:01:50 INFO parse.ParseDriver: Parsing command: select * from
TableName where value like '%Restaurant%'

15/03/04 09:01:50 INFO parse.ParseDriver: Parse Completed.

15/03/04 09:01:50 INFO metastore.HiveMetaStore: 0: get_table : db=default
tbl=TableName

15/03/04 09:01:50 INFO HiveMetaStore.audit: ugi=as7339
ip=unknown-ip-addr cmd=get_table
: db=default tbl=TableName
results: org.apache.spark.sql.SchemaRDD =

SchemaRDD[86] at RDD at SchemaRDD.scala:108
== Query Plan ==

== Physical Plan ==

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
attributes: *, tree:

'Project [*]

'Filter ('value LIKE Restaurant)
  MetastoreRelation default, TableName, None



On Wed, Mar 4, 2015 at 5:39 AM, Arush Kharbanda 
wrote:

> Why don't you formulate a string before you pass it to the hql function
> (appending strings), and hql function is deprecated. You should use sql.
>
>
> http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>
> On Wed, Mar 4, 2015 at 6:15 AM, Anusha Shamanur 
> wrote:
>
>> Hi,
>>
>>
>> I am trying to run a simple select query on a table.
>>
>>
>> val restaurants=hiveCtx.hql("select * from TableName where column like
>> '%SomeString%' ")
>>
>> This gives an error as below:
>>
>> org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
>> Unresolved attributes: *, tree:
>>
>> How do I solve this?
>>
>>
>> --
>> Regards,
>> Anusha
>>
>
>
>
> --
>
> [image: Sigmoid Analytics] 
>
> *Arush Kharbanda* || Technical Teamlead
>
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>



-- 
Regards,
Anusha


Re: Does SparkSQL support "..... having count (fieldname)" in SQL statement?

2015-03-04 Thread shahab
Thanks Cheng, my problem was some misspelling problem which I just fixed,
unfortunately the exception message sometimes does not pin point to exact
reason.  Sorry my bad.



On Wed, Mar 4, 2015 at 5:02 PM, Cheng, Hao  wrote:

>  I’ve tried with latest code, seems it works, which version are you using
> Shahab?
>
>
>
> *From:* yana [mailto:yana.kadiy...@gmail.com]
> *Sent:* Wednesday, March 4, 2015 8:47 PM
> *To:* shahab; user@spark.apache.org
> *Subject:* RE: Does SparkSQL support ". having count (fieldname)" in
> SQL statement?
>
>
>
> I think the problem is that you are using an alias in the having clause. I
> am not able to try just now but see if HAVING count (*)> 2 works ( ie dont
> use cnt)
>
>
>
>
>
> Sent on the new Sprint Network from my Samsung Galaxy S®4.
>
>
>
>  Original message 
>
> From: shahab
>
> Date:03/04/2015 7:22 AM (GMT-05:00)
>
> To: user@spark.apache.org
>
> Subject: Does SparkSQL support ". having count (fieldname)" in SQL
> statement?
>
>
>
> Hi,
>
>
>
> It seems that SparkSQL, even the HiveContext, does not support SQL
> statements like :   SELECT category, count(1) AS cnt FROM products GROUP BY
> category HAVING cnt > 10;
>
>
>
> I get this exception:
>
>
>
> Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> Unresolved attributes: CAST(('cnt < 2), BooleanType), tree:
>
>
>
>
>
> I couldn't find anywhere is documentation whether "having" keyword is not
> supported ?
>
> If this is the case, what would be the work around? using two nested
> select statements?
>
>
>
> best,
>
> /Shahab
>


Re: GraphX path traversal

2015-03-04 Thread Robin East
Actually your Pregel code works for me:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val vertexlist = Array((1L,"One"), (2L,"Two"), (3L,"Three"), 
(4L,"Four"),(5L,"Five"),(6L,"Six"))
val edgelist = Array(Edge(6,5,"6 to 5"),Edge(5,4,"5 to 4"),Edge(4,3,"4 to 3"), 
Edge(3,2,"3 to 2"), Edge(2,1,"2 to 1"))
val vertices: RDD[(VertexId, String)] =  sc.parallelize(vertexlist)
val edges = sc.parallelize(edgelist)
val graph = Graph(vertices, edges)


val parentGraph = Pregel(
  graph.mapVertices((id, attr) => Set[VertexId]()),
  Set[VertexId](),
  Int.MaxValue,
  EdgeDirection.Out)(
(id, attr, msg) => (msg ++ attr),
edge => { if (edge.srcId != edge.dstId) 
  { Iterator((edge.dstId, (edge.srcAttr + edge.srcId))) 
  } 
  else Iterator.empty 
 },
(a, b) => (a ++ b))
parentGraph.vertices.collect.foreach(println(_))

Output:

(4,Set(6, 5))
(1,Set(5, 6, 2, 3, 4))
(5,Set(6))
(6,Set())
(2,Set(6, 5, 4, 3))
(3,Set(6, 5, 4))

Maybe your data.csv has edges the wrong way round

Robin

> On 3 Mar 2015, at 16:32, Madabhattula Rajesh Kumar  
> wrote:
> 
> Hi,
> 
> I have tried below program using pergel API but I'm not able to get my 
> required output. I'm getting exactly reverse output which I'm expecting. 
> 
> // Creating graph using above mail mentioned edgefile
>  val graph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, 
> "/home/rajesh/Downloads/graphdata/data.csv").cache()
> 
>  val parentGraph = Pregel(
>   graph.mapVertices((id, attr) => Set[VertexId]()),
>   Set[VertexId](),
>   Int.MaxValue,
>   EdgeDirection.Out)(
> (id, attr, msg) => (msg ++ attr),
> edge => { if (edge.srcId != edge.dstId) 
>   { Iterator((edge.dstId, (edge.srcAttr + edge.srcId))) 
>   } 
>   else Iterator.empty 
>  },
> (a, b) => (a ++ b))
> parentGraph.vertices.collect.foreach(println(_))
> 
> Output :
> 
> (4,Set(1, 2, 3))
> (1,Set())
> (6,Set(5, 1, 2, 3, 4))
> (3,Set(1, 2))
> (5,Set(1, 2, 3, 4))
> (2,Set(1))
> 
> But I'm looking below output. 
> 
> (4,Set(5, 6))
> (1,Set(2, 3, 4, 5, 6))
> (6,Set())
> (3,Set(4, 5, 6))
> (5,Set(6))
> (2,Set(3, 4, 5, 6))
> 
> Could you please correct me where I'm doing wrong.
> 
> Regards,
> Rajesh
>  
> 
> On Tue, Mar 3, 2015 at 8:42 PM, Madabhattula Rajesh Kumar 
> mailto:mrajaf...@gmail.com>> wrote:
> Hi Robin,
> 
> Thank you for your response. Please find below my question. I have a below 
> edge file
> 
> Source Vertex Destination Vertex
> 1 2
> 2 3
> 3 4
> 4 5
> 5 6
> 6 6
> 
> In this graph 1st vertex is connected to 2nd vertex, 2nd Vertex is connected 
> to 3rd vertex,. 6th vertex is connected to 6th vertex. So 6th vertex is a 
> root node. Please find below graph
> 
> 
> In this graph, How can I compute the 1st vertex parents like 2,3,4,5,6. 
> Similarly 2nd vertex parents like 3,4,5,6  6th vertex parent like 6 
> because this is the root node.
> 
> I'm planning to use pergel API but I'm not able to define messages and vertex 
> program in that API. Could you please help me on this.
> 
> Please let me know if you need more information.
> 
> Regards,
> Rajesh
> 
> 
> On Tue, Mar 3, 2015 at 8:15 PM, Robin East  > wrote:
> Rajesh
> 
> I'm not sure if I can help you, however I don't even understand the question. 
> Could you restate what you are trying to do.
> 
> Sent from my iPhone
> 
> On 2 Mar 2015, at 11:17, Madabhattula Rajesh Kumar  > wrote:
> 
>> Hi,
>> 
>> I have a below edge list. How to find the parents path for every vertex?
>> 
>> Example :
>> 
>> Vertex 1 path : 2, 3, 4, 5, 6
>> Vertex 2 path : 3, 4, 5, 6
>> Vertex 3 path : 4,5,6
>> vertex 4 path : 5,6
>> vertex 5 path : 6
>> 
>> Could you please let me know how to do this? (or) Any suggestion
>> 
>> Source VertexDestination Vertex
>> 12
>> 23
>> 34
>> 45
>> 56
>> 
>> Regards,
>> Rajesh
> 
> 



Re: how to save Word2VecModel

2015-03-04 Thread Xiangrui Meng
+user

On Wed, Mar 4, 2015, 8:21 AM Xiangrui Meng  wrote:

> You can use the save/load implementation in naive Bayes as reference:
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
>
> Ping me on the JIRA page to get the ticket assigned to you.
>
> Thanks,
> Xiangrui
>


Error communicating with MapOutputTracker

2015-03-04 Thread Thomas Gerber
Hello,

We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers).
We use spark-submit to start an application.

We got the following error which leads to a failed stage:

Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4
times, most recent failure: Lost task 3095.3 in stage 140.0 (TID
308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException:
Error communicating with MapOutputTracker


We tried the whole application again, and it failed on the same stage (but
it got more tasks completed on that stage) with the same error.

We then looked at executors stderr, and all show similar logs, on both runs
(see below). As far as we can tell, executors and master have disk space
left.

*Any suggestion on where to look to understand why the communication with
the MapOutputTracker fails?*

Thanks
Thomas

In case it matters, our akka settings:
spark.akka.frameSize 50
spark.akka.threads 8
// those below are 10* the default, to cope with large GCs
spark.akka.timeout 1000
spark.akka.heartbeat.pauses 6
spark.akka.failure-detector.threshold 3000.0
spark.akka.heartbeat.interval 1

Appendix: executor logs, where it starts going awry

15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525
15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 298525)
15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with
curMem=5543008799, maxMem=18127202549
15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored
as bytes in memory (estimated size 1473.0 B, free 11.7 GB)
15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block
broadcast_339_piece0
15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable
339 took 224 ms
15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with
curMem=5543010272, maxMem=18127202549
15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as
values in memory (estimated size 2.5 KB, free 11.7 GB)
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch;
tracker actor =
Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrack

Re: Is the RDD's Partitions determined before hand ?

2015-03-04 Thread Imran Rashid
You can set the number of partitions dynamically -- its just a parameter to
a method, so you can compute it however you want, it doesn't need to be
some static constant:

val dataSizeEstimate = yourMagicFunctionToEstimateDataSize()
val numberOfPartitions =
yourConversionFromDataSizeToNumPartitions(dataSizeEstimate)


val reducedRDD = someInputRDD.reduceByKey(f, numberOfPartitions) //or
whatever else that needs to know number of partitions

of course this means you need to do the work of figuring out those magic
functions, but its certainly possible.

I agree with all of Sean's recommendations, but I guess I might put a bit
more emphasis on "The one exception are operations that tend to pull data
into memory."  For me, I've found that to be a very important exception,
that can come up a lot.  And though in general a lot of partitions makes
sense, there have been recent questions on the user list about folks going
to far, using eg. 100K partitions and then having the bookkeeping overhead
dominating.  But thats a pretty big number -- you should still be able to
err on the side of too many partitions w/out going that far, I'd imagine.



On Wed, Mar 4, 2015 at 4:17 AM, Jeff Zhang  wrote:

> Hi Sean,
>
>  > If you know a stage needs unusually high parallelism for example you
> can repartition further for that stage.
>
> The problem is we may don't know whether high parallelism is needed. e.g.
> for the join operator, high parallelism may only be necessary for some
> dataset that lots of data can join together while for other dataset high
> parallelism may not be necessary if only a few data can join together.
>
> So my question is that unable changing parallelism at runtime dynamically
> may not be flexible.
>
>
>
> On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen  wrote:
>
>> Hm, what do you mean? You can control, to some extent, the number of
>> partitions when you read the data, and can repartition if needed.
>>
>> You can set the default parallelism too so that it takes effect for most
>> ops thay create an RDD. One # of partitions is usually about right for all
>> work (2x or so the number of execution slots).
>>
>> If you know a stage needs unusually high parallelism for example you can
>> repartition further for that stage.
>>  On Mar 4, 2015 1:50 AM, "Jeff Zhang"  wrote:
>>
>>> Thanks Sean.
>>>
>>> But if the partitions of RDD is determined before hand, it would not be
>>> flexible to run the same program on the different dataset. Although for the
>>> first stage the partitions can be determined by the input data set, for the
>>> intermediate stage it is not possible. Users have to create policy to
>>> repartition or coalesce based on the data set size.
>>>
>>>
>>> On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen  wrote:
>>>
 An RDD has a certain fixed number of partitions, yes. You can't change
 an RDD. You can repartition() or coalese() and RDD to make a new one
 with a different number of RDDs, possibly requiring a shuffle.

 On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang  wrote:
 > I mean is it possible to change the partition number at runtime.
 Thanks
 >
 >
 > --
 > Best Regards
 >
 > Jeff Zhang

>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


RE: Does SparkSQL support "..... having count (fieldname)" in SQL statement?

2015-03-04 Thread Cheng, Hao
I’ve tried with latest code, seems it works, which version are you using Shahab?

From: yana [mailto:yana.kadiy...@gmail.com]
Sent: Wednesday, March 4, 2015 8:47 PM
To: shahab; user@spark.apache.org
Subject: RE: Does SparkSQL support ". having count (fieldname)" in SQL 
statement?

I think the problem is that you are using an alias in the having clause. I am 
not able to try just now but see if HAVING count (*)> 2 works ( ie dont use cnt)


Sent on the new Sprint Network from my Samsung Galaxy S®4.

 Original message 
From: shahab
Date:03/04/2015 7:22 AM (GMT-05:00)
To: user@spark.apache.org
Subject: Does SparkSQL support ". having count (fieldname)" in SQL 
statement?

Hi,

It seems that SparkSQL, even the HiveContext, does not support SQL statements 
like :   SELECT category, count(1) AS cnt FROM products GROUP BY category 
HAVING cnt > 10;

I get this exception:

Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
Unresolved attributes: CAST(('cnt < 2), BooleanType), tree:


I couldn't find anywhere is documentation whether "having" keyword is not 
supported ?
If this is the case, what would be the work around? using two nested select 
statements?

best,
/Shahab


Re: Nested Case Classes (Found and Required Same)

2015-03-04 Thread Bojan Kostic
Did you find any other way for this issue?
I just found out that i have >22 columns data set... And now i am searching
for best solution.

Anyone else have experienced with this problem?

Best
Bojan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Nested-Case-Classes-Found-and-Required-Same-tp14096p21908.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: scala.Double vs java.lang.Double in RDD

2015-03-04 Thread Imran Rashid
This doesn't involve spark at all, I think this is entirely an issue with
how scala deals w/ primitives and boxing.  Often it can hide the details
for you, but IMO it just leads to far more confusing errors when things
don't work out.  The issue here is that your map has value type Any, which
leads scala to leave it as a boxed java.lang.Double.

scala> val x = 1.5
> x: Double = 1.5
> scala> x.getClass()
> res0: Class[Double] = double
> scala> x.getClass() == classOf[java.lang.Double]
> res1: Boolean = false
> scala> x.getClass() == classOf[Double]
> res2: Boolean = true
> scala> val arr = Array(1.5,2.5)
> arr: Array[Double] = Array(1.5, 2.5)
> scala> arr.getClass().getComponentType() == x.getClass()
> res5: Boolean = true
> scala> arr.getClass().getComponentType() == classOf[java.lang.Double]
> res6: Boolean = false

//this map has java.lang.Double
> scala> val map: Map[String, Any] = arr.map{x => x.toString -> x}.toMap
> map: Map[String,Any] = Map(1.5 -> 1.5, 2.5 -> 2.5)
> scala> map("1.5").getClass()
> res15: Class[_] = class java.lang.Double
> scala> map("1.5").getClass() == x.getClass()
> res10: Boolean = false
> scala> map("1.5").getClass() == classOf[java.lang.Double]
> res11: Boolean = true
> //this one has Double
> scala> val map2: Map[String, Double] = arr.map{x => x.toString -> x}.toMap
> map2: Map[String,Double] = Map(1.5 -> 1.5, 2.5 -> 2.5)
> scala> map2("1.5").getClass()
> res12: Class[Double] = double
> scala> map2("1.5").getClass() == x.getClass()
> res13: Boolean = true
> scala> map2("1.5").getClass() == classOf[java.lang.Double]
> res14: Boolean = false


On Wed, Mar 4, 2015 at 3:17 AM, Tobias Pfeiffer  wrote:

> Hi,
>
> I have a function with signature
>
>   def aggFun1(rdd: RDD[(Long, (Long, Double))]):
> RDD[(Long, Any)]
>
> and one with
>
>   def aggFun2[_Key: ClassTag, _Index](rdd: RDD[(_Key, (_Index, Double))]):
> RDD[(_Key, Double)]
>
> where all "Double" classes involved are "scala.Double" classes (according
> to IDEA) and my implementation of aggFun1 is just calling aggFun2 (type
> parameters _Key and _Index are inferred by the Scala compiler).
>
> Now I am writing a test as follows:
>
>   val result: Map[Long, Any] = aggFun1(input).collect().toMap
>   result.values.foreach(v => println(v.getClass))
>   result.values.foreach(_ shouldBe a[Double])
>
> and I get the following output:
>
>   class java.lang.Double
>   class java.lang.Double
>   [info] avg
>   [info] - should compute the average *** FAILED ***
>   [info]   1.75 was not an instance of double, but an instance of
> java.lang.Double
>
> So I am wondering about what magic is going on here. Are scala.Double
> values in RDDs automatically converted to java.lang.Doubles or am I just
> missing the implicit back-conversion etc.?
>
> Any help appreciated,
> Tobias
>
>


Save and read parquet from the same path

2015-03-04 Thread Karlson

Hi all,

what would happen if I save a RDD via saveAsParquetFile to the same path 
that RDD is originally read from? Is that a safe thing to do in Pyspark?


Thanks!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark Streaming Switchover Time

2015-03-04 Thread Nastooh Avessta (navesta)
Indeed. And am wondering if this switchover time can be decreased.
Cheers,

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com
Phone: +1 604 647 1527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.com





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000; Fax: 416-306-7099. 
Preferences - 
Unsubscribe – 
Privacy

From: Tathagata Das [mailto:t...@databricks.com]
Sent: Tuesday, March 03, 2015 11:11 PM
To: Nastooh Avessta (navesta)
Cc: user@spark.apache.org
Subject: Re: Spark Streaming Switchover Time

I am confused. Are you killing the 1st worker node to see whether the system 
restarts the receiver on the second worker?

TD

On Tue, Mar 3, 2015 at 10:49 PM, Nastooh Avessta (navesta) 
mailto:nave...@cisco.com>> wrote:
This is the time that it takes for the driver to start receiving data once 
again, from the 2nd worker, when the 1st worker, where streaming thread was 
initially running, is shutdown.
Cheers,

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com
Phone: +1 604 647 1527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.com





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000; Fax: 416-306-7099. 
Preferences - 
Unsubscribe – 
Privacy

From: Tathagata Das [mailto:t...@databricks.com]
Sent: Tuesday, March 03, 2015 10:24 PM
To: Nastooh Avessta (navesta)
Cc: user@spark.apache.org
Subject: Re: Spark Streaming Switchover Time

Can you elaborate on what is this switchover time?

TD

On Tue, Mar 3, 2015 at 9:57 PM, Nastooh Avessta (navesta) 
mailto:nave...@cisco.com>> wrote:
Hi
On a standalone, Spark 1.0.0, with 1 master and 2 workers, operating in client 
mode, running a udp streaming application, I am noting around 2 second elapse 
time on switchover, upon shutting down the streaming worker, where streaming 
window length is 1 sec. I am wondering what parameters are available to the 
developer to shorten this switchover time.
Cheers,

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com
Phone: +1 604 647 1527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.com





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000; Fax: 416-306-7099. 
Preferences - 
Unsubscribe – 
Privacy





Re: Unable to submit spark job to mesos cluster

2015-03-04 Thread Akhil Das
Looks like you are having 2 netty jars in the classpath.

Thanks
Best Regards

On Wed, Mar 4, 2015 at 5:14 PM, Sarath Chandra <
sarathchandra.jos...@algofusiontech.com> wrote:

> From the lines pointed in the exception log, I figured out that my code is
> unable to get the spark context. To isolate the problem, I've written a
> small code as below -
>
> *import org.apache.spark.SparkConf;*
> *import org.apache.spark.SparkContext;*
>
> *public class Test {*
> *public static void main(String[] args) throws Exception {*
> *SparkConf sparkConf = new
> SparkConf().setMaster("mesos://node2.algofusiontech.com:5050
> ").setAppName("test");*
> *SparkContext context = new SparkContext(sparkConf);*
> *}*
> *}*
>
> When I run this code as -  *java -cp ".:/opt/cloudera/parcels/CDH/jars/*"
> Test*
> I'm getting the below exception dump. Please help.
>
> *1[sparkDriver-akka.actor.default-dispatcher-2] ERROR
> akka.actor.ActorSystemImpl  - Uncaught fatal error from thread
> [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem
> [sparkDriver]*
> *java.lang.NoSuchMethodError:
> org.jboss.netty.channel.socket.nio.NioWorkerPool.(Ljava/util/concurrent/Executor;I)V*
> * at
> akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:282)*
> * at
> akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:239)*
> * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)*
> * at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
> * at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
> * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
> * at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)*
> * at scala.util.Try$.apply(Try.scala:161)*
> * at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)*
> * at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
> * at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
> * at scala.util.Success.flatMap(Try.scala:200)*
> * at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)*
> * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)*
> * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)*
> * at
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)*
> * at scala.collection.Iterator$class.foreach(Iterator.scala:727)*
> * at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)*
> * at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)*
> * at scala.collection.AbstractIterable.foreach(Iterable.scala:54)*
> * at
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)*
> * at
> akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)*
> * at
> akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)*
> * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)*
> * at akka.actor.ActorCell.invoke(ActorCell.scala:456)*
> * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)*
> * at akka.dispatch.Mailbox.run(Mailbox.scala:219)*
> * at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)*
> * at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
> * at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
> * at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
> * at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
> *[ERROR] [03/04/2015 17:13:23.745] [main] [Remoting] Remoting error:
> [Startup timed out] [*
> *akka.remote.RemoteTransportException: Startup timed out*
> * at
> akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)*
> * at akka.remote.Remoting.start(Remoting.scala:191)*
> * at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)*
> * at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)*
> * at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)*
> * at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)*
> * at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)*
> * at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)*
> * at
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)*
> * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)*
> * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)*
> * at
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)*
> * at scala.collection.immutable.Range.foreach$mVc$sp(Ra

Re: TreeNodeException: Unresolved attributes

2015-03-04 Thread Arush Kharbanda
Why don't you formulate a string before you pass it to the hql function
(appending strings), and hql function is deprecated. You should use sql.

http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext

On Wed, Mar 4, 2015 at 6:15 AM, Anusha Shamanur 
wrote:

> Hi,
>
>
> I am trying to run a simple select query on a table.
>
>
> val restaurants=hiveCtx.hql("select * from TableName where column like
> '%SomeString%' ")
>
> This gives an error as below:
>
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
> attributes: *, tree:
>
> How do I solve this?
>
>
> --
> Regards,
> Anusha
>



-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Speed Benchmark

2015-03-04 Thread Guillaume Guy
Sorry for the confusion.

All are running Hadoop services. Node 1 is the namenode whereas Nodes 2 and
3 are datanodes.



Best,

Guillaume Guy

* +1 919 - 972 - 8750*

On Sat, Feb 28, 2015 at 1:09 AM, Sean Owen  wrote:

> Is machine 1 the only one running an HDFS data node? You describe it as
> one running Hadoop services.
> On Feb 27, 2015 9:44 PM, "Guillaume Guy" 
> wrote:
>
>> Hi Jason:
>>
>> Thanks for your feedback.
>>
>> Beside the information above I mentioned, there are 3 machines in the
>> cluster.
>>
>> *1st one*: Driver + has a bunch of Hadoop services. 32GB of RAM, 8 cores
>> (2 used)
>> *2nd + 3rd: *16B of RAM, 4 cores (2 used each)
>>
>>  I hope this helps clarify.
>>
>> Thx.
>>
>> GG
>>
>>
>>
>> Best,
>>
>> Guillaume Guy
>>
>> * +1 919 - 972 - 8750 <%2B1%20919%20-%20972%20-%208750>*
>>
>> On Fri, Feb 27, 2015 at 9:06 AM, Jason Bell  wrote:
>>
>>> How many machines are on the cluster?
>>> And what is the configuration of those machines (Cores/RAM)?
>>>
>>> "Small cluster" is very subjective statement.
>>>
>>>
>>>
>>> Guillaume Guy wrote:
>>>
 Dear Spark users:

 I want to see if anyone has an idea of the performance for a small
 cluster.


>>


Re: Unable to submit spark job to mesos cluster

2015-03-04 Thread Arush Kharbanda
You can try increasing the Akka time out in the config, you can set the
following in your config.

spark.core.connection.ack.wait.timeout: 600
spark.akka.timeout: 1000 (In secs)
spark.akka.frameSize:50

On Wed, Mar 4, 2015 at 5:14 PM, Sarath Chandra <
sarathchandra.jos...@algofusiontech.com> wrote:

> From the lines pointed in the exception log, I figured out that my code is
> unable to get the spark context. To isolate the problem, I've written a
> small code as below -
>
> *import org.apache.spark.SparkConf;*
> *import org.apache.spark.SparkContext;*
>
> *public class Test {*
> *public static void main(String[] args) throws Exception {*
> *SparkConf sparkConf = new
> SparkConf().setMaster("mesos://node2.algofusiontech.com:5050
> ").setAppName("test");*
> *SparkContext context = new SparkContext(sparkConf);*
> *}*
> *}*
>
> When I run this code as -  *java -cp ".:/opt/cloudera/parcels/CDH/jars/*"
> Test*
> I'm getting the below exception dump. Please help.
>
> *1[sparkDriver-akka.actor.default-dispatcher-2] ERROR
> akka.actor.ActorSystemImpl  - Uncaught fatal error from thread
> [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem
> [sparkDriver]*
> *java.lang.NoSuchMethodError:
> org.jboss.netty.channel.socket.nio.NioWorkerPool.(Ljava/util/concurrent/Executor;I)V*
> * at
> akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:282)*
> * at
> akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:239)*
> * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)*
> * at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
> * at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
> * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
> * at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)*
> * at scala.util.Try$.apply(Try.scala:161)*
> * at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)*
> * at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
> * at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
> * at scala.util.Success.flatMap(Try.scala:200)*
> * at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)*
> * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)*
> * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)*
> * at
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)*
> * at scala.collection.Iterator$class.foreach(Iterator.scala:727)*
> * at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)*
> * at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)*
> * at scala.collection.AbstractIterable.foreach(Iterable.scala:54)*
> * at
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)*
> * at
> akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)*
> * at
> akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)*
> * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)*
> * at akka.actor.ActorCell.invoke(ActorCell.scala:456)*
> * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)*
> * at akka.dispatch.Mailbox.run(Mailbox.scala:219)*
> * at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)*
> * at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
> * at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
> * at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
> * at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
> *[ERROR] [03/04/2015 17:13:23.745] [main] [Remoting] Remoting error:
> [Startup timed out] [*
> *akka.remote.RemoteTransportException: Startup timed out*
> * at
> akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)*
> * at akka.remote.Remoting.start(Remoting.scala:191)*
> * at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)*
> * at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)*
> * at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)*
> * at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)*
> * at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)*
> * at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)*
> * at
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)*
> * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)*
> * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)*
> * at
> org.apache.spark.util.Ut

Re: LBGFS optimizer performace

2015-03-04 Thread Gustavo Enrique Salazar Torres
Yeah, without caching makes it gets really slow. I will try to minimize the
number of columns on my tables, that may save lots of memory and will
eventually work.
I will let you know.

Thanks!
Gustavo

On Tue, Mar 3, 2015 at 8:58 PM, Joseph Bradley 
wrote:

> I would recommend caching; if you can't persist, iterative algorithms will
> not work well.
>
> I don't think calling count on the dataset is problematic; every iteration
> in LBFGS iterates over the whole dataset and does a lot more computation
> than count().
>
> It would be helpful to see some error occurring within LBFGS.  With the
> given stack trace, I'm not sure what part of LBFGS it's happening in.
>
> On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres <
> gsala...@ime.usp.br> wrote:
>
>> Yeah, I can call count before that and it works. Also I was over caching
>> tables but I removed those. Now there is no caching but it gets really slow
>> since it calculates my table RDD many times.
>> Also hacked the LBFGS code to pass the number of examples which I
>> calculated outside in a Spark SQL query but just moved the location of the
>> problem.
>>
>> The query I'm running looks like this:
>>
>> s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB
>>  ON tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "
>>
>> mappedFields contains a list of fields which I'm interested in. The
>> result of that query goes through (including sampling) some transformations
>> before being input to LBFGS.
>>
>> My dataset has 180GB just for feature selection, I'm planning to use
>> 450GB to train the final model and I'm using 16 c3.2xlarge EC2 instances,
>> that means I have 240GB of RAM available.
>>
>> Any suggestion? I'm starting to check the algorithm because I don't
>> understand why it needs to count the dataset.
>>
>> Thanks
>>
>> Gustavo
>>
>> On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley 
>> wrote:
>>
>>> Is that error actually occurring in LBFGS?  It looks like it might be
>>> happening before the data even gets to LBFGS.  (Perhaps the outer join
>>> you're trying to do is making the dataset size explode a bit.)  Are you
>>> able to call count() (or any RDD action) on the data before you pass it to
>>> LBFGS?
>>>
>>> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres <
>>> gsala...@ime.usp.br> wrote:
>>>
 Just did with the same error.
 I think the problem is the "data.count()" call in LBFGS because for
 huge datasets that's naive to do.
 I was thinking to write my version of LBFGS but instead of doing
 data.count() I will pass that parameter which I will calculate from a Spark
 SQL query.

 I will let you know.

 Thanks


 On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das 
 wrote:

> Can you try increasing your driver memory, reducing the executors and
> increasing the executor memory?
>
> Thanks
> Best Regards
>
> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
> gsala...@ime.usp.br> wrote:
>
>> Hi there:
>>
>> I'm using LBFGS optimizer to train a logistic regression model. The
>> code I implemented follows the pattern showed in
>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
>> training data is obtained from a Spark SQL RDD.
>> The problem I'm having is that LBFGS tries to count the elements in
>> my RDD and that results in a OOM exception since my dataset is huge.
>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on
>> Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of 
>> the
>> data) it in order to scale logistic regression.
>> The exception I'm getting is this:
>>
>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOfRange(Arrays.java:2694)
>> at java.lang.String.(String.java:203)
>> at
>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>> at
>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>> at
>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>> at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>> at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java

RE: Does SparkSQL support "..... having count (fieldname)" in SQL statement?

2015-03-04 Thread yana
I think the problem is that you are using an alias in the having clause. I am 
not able to try just now but see if HAVING count (*)> 2 works ( ie dont use cnt)


Sent on the new Sprint Network from my Samsung Galaxy S®4.

 Original message From: shahab 
 Date:03/04/2015  7:22 AM  (GMT-05:00) 
To: user@spark.apache.org Subject: Does SparkSQL support 
". having count (fieldname)" in SQL statement? 
Hi,

It seems that SparkSQL, even the HiveContext, does not support SQL statements 
like :   SELECT category, count(1) AS cnt FROM products GROUP BY category 
HAVING cnt > 10;

I get this exception:

Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
Unresolved attributes: CAST(('cnt < 2), BooleanType), tree:


I couldn't find anywhere is documentation whether "having" keyword is not 
supported ?
If this is the case, what would be the work around? using two nested select 
statements?

best,
/Shahab

Does SparkSQL support "..... having count (fieldname)" in SQL statement?

2015-03-04 Thread shahab
Hi,

It seems that SparkSQL, even the HiveContext, does not support SQL
statements like :   SELECT category, count(1) AS cnt FROM products GROUP BY
category HAVING cnt > 10;

I get this exception:

Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Unresolved attributes: CAST(('cnt < 2), BooleanType), tree:


I couldn't find anywhere is documentation whether "having" keyword is not
supported ?
If this is the case, what would be the work around? using two nested select
statements?

best,
/Shahab


Re: Problems running version 1.3.0-rc1

2015-03-04 Thread Yiannis Gkoufas
Hi Sean,

thanks a lot for helping me pinpoint the issue!
The zips I tried to download were just fine.
The problem was fixed when I deleted my .m2 folder.
Probably something wasn't downloaded properly from Maven Repositories.

Thanks!

On 3 March 2015 at 09:26, Sean Owen  wrote:

> Is that really the error?
>
> java.lang.NoClassDefFoundError: L akka/event/LogSou
>
> Looks like an incomplete class name. Is something corrupt, maybe a config
> file?
>
> On Tue, Mar 3, 2015 at 2:13 AM, Yiannis Gkoufas 
> wrote:
> > Hi all,
> >
> > I have downloaded version 1.3.0-rc1 from
> > https://github.com/apache/spark/archive/v1.3.0-rc1.zip, extracted it and
> > built it using:
> > mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.5.0 -DskipTests clean package
> >
> > It doesn't complain for any issues, but when I call sbin/start-all.sh I
> get
> > on logs:
> >
> > 15/03/02 21:28:24 ERROR ActorSystemImpl: Uncaught fatal error from thread
> > [sparkWorker-akka.remote.default-remote-dispatcher-6] shutting down
> > ActorSystem [sparkWorker]
> > java.lang.NoClassDefFoundError: L akka/event/LogSou
> > at java.lang.Class.getDeclaredConstructors0(Native Method)
> > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2663)
> > at java.lang.Class.getConstructor0(Class.java:3067)
> > at java.lang.Class.getDeclaredConstructor(Class.java:2170)
> > at
> >
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:76)
> > at scala.util.Try$.apply(Try.scala:161)
> > at
> >
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
> > at
> >
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
> > at
> >
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
> > at scala.util.Success.flatMap(Try.scala:200)
> > at
> >
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
> > at akka.remote.EndpointManager$$anonfun$9.apply(Remoting.scala:692)
> > at akka.remote.EndpointManager$$anonfun$9.apply(Remoting.scala:684)
> > at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > at
> >
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
> > at
> >
> akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:684)
> > at
> >
> akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:492)
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> > at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: java.lang.ClassNotFoundException: L akka.event.LogSou
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > ... 32 more
> >
> > I tried to search online but couldn't find anything similar.
> > Any ideas what could the error be?
> > I tried compiling with java 7 and java 8 but with the same result.
> >
> > Thanks a lot!
>


Re: Unable to submit spark job to mesos cluster

2015-03-04 Thread Sarath Chandra
>From the lines pointed in the exception log, I figured out that my code is
unable to get the spark context. To isolate the problem, I've written a
small code as below -

*import org.apache.spark.SparkConf;*
*import org.apache.spark.SparkContext;*

*public class Test {*
*public static void main(String[] args) throws Exception {*
*SparkConf sparkConf = new
SparkConf().setMaster("mesos://node2.algofusiontech.com:5050
").setAppName("test");*
*SparkContext context = new SparkContext(sparkConf);*
*}*
*}*

When I run this code as -  *java -cp ".:/opt/cloudera/parcels/CDH/jars/*"
Test*
I'm getting the below exception dump. Please help.

*1[sparkDriver-akka.actor.default-dispatcher-2] ERROR
akka.actor.ActorSystemImpl  - Uncaught fatal error from thread
[sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem
[sparkDriver]*
*java.lang.NoSuchMethodError:
org.jboss.netty.channel.socket.nio.NioWorkerPool.(Ljava/util/concurrent/Executor;I)V*
* at
akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:282)*
* at
akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:239)*
* at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)*
* at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
* at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
* at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
* at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)*
* at scala.util.Try$.apply(Try.scala:161)*
* at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)*
* at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
* at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
* at scala.util.Success.flatMap(Try.scala:200)*
* at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)*
* at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)*
* at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)*
* at
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)*
* at scala.collection.Iterator$class.foreach(Iterator.scala:727)*
* at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)*
* at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)*
* at scala.collection.AbstractIterable.foreach(Iterable.scala:54)*
* at
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)*
* at
akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)*
* at
akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)*
* at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)*
* at akka.actor.ActorCell.invoke(ActorCell.scala:456)*
* at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)*
* at akka.dispatch.Mailbox.run(Mailbox.scala:219)*
* at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)*
* at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
* at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
* at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
* at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
*[ERROR] [03/04/2015 17:13:23.745] [main] [Remoting] Remoting error:
[Startup timed out] [*
*akka.remote.RemoteTransportException: Startup timed out*
* at
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)*
* at akka.remote.Remoting.start(Remoting.scala:191)*
* at
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)*
* at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)*
* at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)*
* at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)*
* at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)*
* at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)*
* at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)*
* at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)*
* at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)*
* at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)*
* at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)*
* at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)*
* at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)*
* at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156)*
* at org.apache.spark.SparkContext.(SparkContext.scala:203)*
* at Test.main(Test.java:7)*
*Caused by: java.util.concurrent.TimeoutExcepti

Unable to submit spark job to mesos cluster

2015-03-04 Thread Sarath Chandra
Hi,

I have a cluster running on CDH5.2.1 and I have a Mesos cluster (version
0.18.1). Through a Oozie java action I'm want to submit a Spark job to
mesos cluster. Before configuring it as Oozie job I'm testing the java
action from command line and getting exception as below. While running I'm
pointing the classpath to "/jars" folder.

What is going wrong? Is there any additional configuration to be done which
I'm missing?

[ERROR] [03/04/2015 17:00:49.968] [main] [Remoting] Remoting error:
[Startup timed out] [
akka.remote.RemoteTransportException: Startup timed out
at
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)
at akka.remote.Remoting.start(Remoting.scala:191)
at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156)
at org.apache.spark.SparkContext.(SparkContext.scala:203)
at
com.algofusion.reconciliation.execution.utils.ExecutionUtils.(ExecutionUtils.java:130)
at
com.algofusion.reconciliation.execution.ReconExecutionController.initialize(ReconExecutionController.java:257)
at
com.algofusion.reconciliation.execution.ReconExecutionController.main(ReconExecutionController.java:105)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at akka.remote.Remoting.start(Remoting.scala:173)
... 18 more
]
Exception in thread "main" java.lang.ExceptionInInitializerError
at
com.algofusion.reconciliation.execution.ReconExecutionController.initialize(ReconExecutionController.java:257)
at
com.algofusion.reconciliation.execution.ReconExecutionController.main(ReconExecutionController.java:105)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at akka.remote.Remoting.start(Remoting.scala:173)
at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156)
at org.apache.spark.SparkContext.(SparkContext.scala:203)
at
com.algofusion.reconciliation.execution.utils.ExecutionUtils.(ExecutionUtils.java:130)
... 2 more

Regards,
Sarath.


Re: insert Hive table with RDD

2015-03-04 Thread patcharee

Hi,

I guess that toDF() api in spark 1.3 which is required build from source 
code?


Patcharee

On 03. mars 2015 13:42, Cheng, Hao wrote:

Using the SchemaRDD / DataFrame API via HiveContext

Assume you're using the latest code, something probably like:

val hc = new HiveContext(sc)
import hc.implicits._
existedRdd.toDF().insertInto("hivetable")
or

existedRdd.toDF().registerTempTable("mydata")
hc.sql("insert into hivetable as select xxx from mydata")



-Original Message-
From: patcharee [mailto:patcharee.thong...@uni.no]
Sent: Tuesday, March 3, 2015 7:09 PM
To: user@spark.apache.org
Subject: insert Hive table with RDD

Hi,

How can I insert an existing hive table with an RDD containing my data?
Any examples?

Best,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is the RDD's Partitions determined before hand ?

2015-03-04 Thread Sean Owen
Parallelism doesn't really affect the throughput as long as it's:

- not less than the number of available execution slots,
- ... and probably some low multiple of them to even out task size effects
- not so high that the bookkeeping overhead dominates

Although you may need to select different scales of parallelism for
different stages (like a join), you shouldn't in general have to
change it according to data size.

However you could count the input size and make parallelism some
function of that if you found that was consistently better.

The one exception are operations that tend to pull data into memory.
You may need more parallelism as scale increases to keep in-memory
data size small enough. There again you usually just err on the side
of 'too much' parallelism, or avoid patterns that can pull a lot of
data into memory, but this is usually the pain point if there is one.

The problem I run into when thinking about this is that I don't think
Spark can do much better, since it doesn't have the info above needed
to decide these things in general. The calling program has to tell it.

On Wed, Mar 4, 2015 at 10:17 AM, Jeff Zhang  wrote:
> Hi Sean,
>
>  > If you know a stage needs unusually high parallelism for example you can
> repartition further for that stage.
>
> The problem is we may don't know whether high parallelism is needed. e.g.
> for the join operator, high parallelism may only be necessary for some
> dataset that lots of data can join together while for other dataset high
> parallelism may not be necessary if only a few data can join together.
>
> So my question is that unable changing parallelism at runtime dynamically
> may not be flexible.
>
>
>
> On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen  wrote:
>>
>> Hm, what do you mean? You can control, to some extent, the number of
>> partitions when you read the data, and can repartition if needed.
>>
>> You can set the default parallelism too so that it takes effect for most
>> ops thay create an RDD. One # of partitions is usually about right for all
>> work (2x or so the number of execution slots).
>>
>> If you know a stage needs unusually high parallelism for example you can
>> repartition further for that stage.
>>
>> On Mar 4, 2015 1:50 AM, "Jeff Zhang"  wrote:
>>>
>>> Thanks Sean.
>>>
>>> But if the partitions of RDD is determined before hand, it would not be
>>> flexible to run the same program on the different dataset. Although for the
>>> first stage the partitions can be determined by the input data set, for the
>>> intermediate stage it is not possible. Users have to create policy to
>>> repartition or coalesce based on the data set size.
>>>
>>>
>>> On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen  wrote:

 An RDD has a certain fixed number of partitions, yes. You can't change
 an RDD. You can repartition() or coalese() and RDD to make a new one
 with a different number of RDDs, possibly requiring a shuffle.

 On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang  wrote:
 > I mean is it possible to change the partition number at runtime.
 > Thanks
 >
 >
 > --
 > Best Regards
 >
 > Jeff Zhang
>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>
>
>
>
> --
> Best Regards
>
> Jeff Zhang

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

2015-03-04 Thread Emre Sevinc
I'm adding this 3rd party library to my Maven pom.xml file so that it's
embedded into the JAR I send to spark-submit:

  
  json-mapreduce
  json-mapreduce
  1.0-SNAPSHOT
  

  javax.servlet
  *


  commons-io
  *


  commons-lang
  *


  org.apache.hadoop
  hadoop-common

  



Then I build my über JAR, and then I run my Spark Streaming application via
the command line:

 spark-submit --class com.example.schemavalidator.SchemaValidatorDriver
--master local[4] --deploy-mode client target/myapp-1.0-SNAPSHOT.jar

--
Emre Sevinç


On Wed, Mar 4, 2015 at 11:19 AM, Tathagata Das  wrote:

> That could be a corner case bug. How do you add the 3rd party library to
> the class path of the driver? Through spark-submit? Could you give the
> command you used?
>
> TD
>
> On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc 
> wrote:
>
>> I've also tried the following:
>>
>> Configuration hadoopConfiguration = new Configuration();
>> hadoopConfiguration.set("multilinejsoninputformat.member", "itemSet");
>>
>> JavaStreamingContext ssc =
>> JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
>> factory, false);
>>
>>
>> but I still get the same exception.
>>
>> Why doesn't getOrCreate ignore that Hadoop configuration part (which
>> normally works, e.g. when not recovering)?
>>
>> --
>> Emre
>>
>>
>> On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc 
>> wrote:
>>
>>> Hello,
>>>
>>> I have a Spark Streaming application (that uses Spark 1.2.1) that
>>> listens to an input directory, and when new JSON files are copied to that
>>> directory processes them, and writes them to an output directory.
>>>
>>> It uses a 3rd party library to process the multi-line JSON files (
>>> https://github.com/alexholmes/json-mapreduce). You can see the relevant
>>> part of the streaming application at:
>>>
>>>   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a
>>>
>>> When I run this application locally, it works perfectly fine. But then I
>>> wanted to test whether it could recover from failure, e.g. if I stopped it
>>> right in the middle of processing some files. I started the streaming
>>> application, copied 100 files to the input directory, and hit Ctrl+C when
>>> it has alread processed about 50 files:
>>>
>>> ...
>>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> [Stage
>>> 0:==>
>>> (65 + 4) / 100]
>>> ^C
>>>
>>> Then I started the application again, expecting that it could recover
>>> from the checkpoint. For a while it started to read files again and then
>>> gave an exception:
>>>
>>> ...
>>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
>>>  * * * hadoopConfiguration: itemSet
>>> 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage
>>> 0.0 (TID 0)
>>> java.io.IOException: Missing configuration value for
>>> multilinejsoninputformat.member
>>> at
>>> com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
>>> at
>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:133)
>>> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
>>> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>> at
>>> java.util.concurr

Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

2015-03-04 Thread Tathagata Das
That could be a corner case bug. How do you add the 3rd party library to
the class path of the driver? Through spark-submit? Could you give the
command you used?

TD

On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc  wrote:

> I've also tried the following:
>
> Configuration hadoopConfiguration = new Configuration();
> hadoopConfiguration.set("multilinejsoninputformat.member", "itemSet");
>
> JavaStreamingContext ssc =
> JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
> factory, false);
>
>
> but I still get the same exception.
>
> Why doesn't getOrCreate ignore that Hadoop configuration part (which
> normally works, e.g. when not recovering)?
>
> --
> Emre
>
>
> On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc  wrote:
>
>> Hello,
>>
>> I have a Spark Streaming application (that uses Spark 1.2.1) that listens
>> to an input directory, and when new JSON files are copied to that directory
>> processes them, and writes them to an output directory.
>>
>> It uses a 3rd party library to process the multi-line JSON files (
>> https://github.com/alexholmes/json-mapreduce). You can see the relevant
>> part of the streaming application at:
>>
>>   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a
>>
>> When I run this application locally, it works perfectly fine. But then I
>> wanted to test whether it could recover from failure, e.g. if I stopped it
>> right in the middle of processing some files. I started the streaming
>> application, copied 100 files to the input directory, and hit Ctrl+C when
>> it has alread processed about 50 files:
>>
>> ...
>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> [Stage
>> 0:==>
>> (65 + 4) / 100]
>> ^C
>>
>> Then I started the application again, expecting that it could recover
>> from the checkpoint. For a while it started to read files again and then
>> gave an exception:
>>
>> ...
>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
>>  * * * hadoopConfiguration: itemSet
>> 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage
>> 0.0 (TID 0)
>> java.io.IOException: Missing configuration value for
>> multilinejsoninputformat.member
>> at
>> com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
>> at
>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:133)
>> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
>> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Since in the exception it refers to a missing configuration
>> "multilinejsoninputformat.member", I think it is about the following line:
>>
>>ssc.ssc().sc().hadoopConfiguration().set("
>> multilinejsoninputformat.member", "itemSet");
>>
>> And this is why I also log the value of it, and as you can see above,
>> just before it gives the exception in the recovery process, it shows that 
>> "multilinejsoninputformat.member"
>> is set to "itemSet". But somehow it is not found during the recovery.
>> This exception happens only when it tries to recover from a previously
>> interrupted run.
>>
>> I've also tried moving the above line 

Re: Is the RDD's Partitions determined before hand ?

2015-03-04 Thread Jeff Zhang
Hi Sean,

 > If you know a stage needs unusually high parallelism for example you can
repartition further for that stage.

The problem is we may don't know whether high parallelism is needed. e.g.
for the join operator, high parallelism may only be necessary for some
dataset that lots of data can join together while for other dataset high
parallelism may not be necessary if only a few data can join together.

So my question is that unable changing parallelism at runtime dynamically
may not be flexible.



On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen  wrote:

> Hm, what do you mean? You can control, to some extent, the number of
> partitions when you read the data, and can repartition if needed.
>
> You can set the default parallelism too so that it takes effect for most
> ops thay create an RDD. One # of partitions is usually about right for all
> work (2x or so the number of execution slots).
>
> If you know a stage needs unusually high parallelism for example you can
> repartition further for that stage.
>  On Mar 4, 2015 1:50 AM, "Jeff Zhang"  wrote:
>
>> Thanks Sean.
>>
>> But if the partitions of RDD is determined before hand, it would not be
>> flexible to run the same program on the different dataset. Although for the
>> first stage the partitions can be determined by the input data set, for the
>> intermediate stage it is not possible. Users have to create policy to
>> repartition or coalesce based on the data set size.
>>
>>
>> On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen  wrote:
>>
>>> An RDD has a certain fixed number of partitions, yes. You can't change
>>> an RDD. You can repartition() or coalese() and RDD to make a new one
>>> with a different number of RDDs, possibly requiring a shuffle.
>>>
>>> On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang  wrote:
>>> > I mean is it possible to change the partition number at runtime. Thanks
>>> >
>>> >
>>> > --
>>> > Best Regards
>>> >
>>> > Jeff Zhang
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


-- 
Best Regards

Jeff Zhang


Re: Is FileInputDStream returned by fileStream method a reliable receiver?

2015-03-04 Thread Tathagata Das
The file stream does not use receiver. May be that was not clear in the
programming guide. I am updating it for 1.3 release right now, I will make
it more clear.
And file stream has full reliability. Read this in the programming guide.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-with-files-as-input-source

On Wed, Mar 4, 2015 at 2:14 AM, Emre Sevinc  wrote:

> Is FileInputDStream returned by fileStream method a reliable receiver?
>
> In the Spark Streaming Guide it says:
>
>   "There can be two kinds of data sources based on their *reliability*.
> Sources (like Kafka and Flume) allow the transferred data to be
> acknowledged. If the system receiving data from these *reliable* sources
> acknowledge the received data correctly, it can be ensured that no data
> gets lost due to any kind of failure. This leads to two kinds of receivers.
>
>1. *Reliable Receiver* - A *reliable receiver* correctly acknowledges
>a reliable source that the data has been received and stored in Spark with
>replication.
>2. *Unreliable Receiver* - These are receivers for sources that do not
>support acknowledging. Even for reliable sources, one may implement an
>unreliable receiver that do not go into the complexity of acknowledging
>correctly."
>
>
> So I wonder whether the receivers for HDFS (and local file system) are
> reliable, e.g. when I'm using fileStream method to process files in a
> directory locally or on HDFS?
>
>
> --
> Emre Sevinç
>


Is FileInputDStream returned by fileStream method a reliable receiver?

2015-03-04 Thread Emre Sevinc
Is FileInputDStream returned by fileStream method a reliable receiver?

In the Spark Streaming Guide it says:

  "There can be two kinds of data sources based on their *reliability*.
Sources (like Kafka and Flume) allow the transferred data to be
acknowledged. If the system receiving data from these *reliable* sources
acknowledge the received data correctly, it can be ensured that no data
gets lost due to any kind of failure. This leads to two kinds of receivers.

   1. *Reliable Receiver* - A *reliable receiver* correctly acknowledges a
   reliable source that the data has been received and stored in Spark with
   replication.
   2. *Unreliable Receiver* - These are receivers for sources that do not
   support acknowledging. Even for reliable sources, one may implement an
   unreliable receiver that do not go into the complexity of acknowledging
   correctly."


So I wonder whether the receivers for HDFS (and local file system) are
reliable, e.g. when I'm using fileStream method to process files in a
directory locally or on HDFS?


-- 
Emre Sevinç


Re: spark master shut down suddenly

2015-03-04 Thread Akhil Das
You can check in the mesos logs and see whats really happening.

Thanks
Best Regards

On Wed, Mar 4, 2015 at 3:10 PM, lisendong  wrote:

> 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard
> from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
> connection and attempting reconnect
> 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
> 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost
> leadership
> 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
> shutting down.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark Streaming and SchemaRDD usage

2015-03-04 Thread Haopu Wang
Hi, in the roadmap of Spark in 2015 (link:
http://files.meetup.com/3138542/Spark%20in%202015%20Talk%20-%20Wendell.p
ptx), I saw SchemaRDD is designed to be the basis of BOTH Spark
Streaming and Spark SQL.

My question is: what's the typical usage of SchemaRDD in a Spark
Streaming application? Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark master shut down suddenly

2015-03-04 Thread lisendong
15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard
from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
connection and attempting reconnect
15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost leadership
15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
shutting down.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is the RDD's Partitions determined before hand ?

2015-03-04 Thread Sean Owen
Hm, what do you mean? You can control, to some extent, the number of
partitions when you read the data, and can repartition if needed.

You can set the default parallelism too so that it takes effect for most
ops thay create an RDD. One # of partitions is usually about right for all
work (2x or so the number of execution slots).

If you know a stage needs unusually high parallelism for example you can
repartition further for that stage.
 On Mar 4, 2015 1:50 AM, "Jeff Zhang"  wrote:

> Thanks Sean.
>
> But if the partitions of RDD is determined before hand, it would not be
> flexible to run the same program on the different dataset. Although for the
> first stage the partitions can be determined by the input data set, for the
> intermediate stage it is not possible. Users have to create policy to
> repartition or coalesce based on the data set size.
>
>
> On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen  wrote:
>
>> An RDD has a certain fixed number of partitions, yes. You can't change
>> an RDD. You can repartition() or coalese() and RDD to make a new one
>> with a different number of RDDs, possibly requiring a shuffle.
>>
>> On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang  wrote:
>> > I mean is it possible to change the partition number at runtime. Thanks
>> >
>> >
>> > --
>> > Best Regards
>> >
>> > Jeff Zhang
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


scala.Double vs java.lang.Double in RDD

2015-03-04 Thread Tobias Pfeiffer
Hi,

I have a function with signature

  def aggFun1(rdd: RDD[(Long, (Long, Double))]):
RDD[(Long, Any)]

and one with

  def aggFun2[_Key: ClassTag, _Index](rdd: RDD[(_Key, (_Index, Double))]):
RDD[(_Key, Double)]

where all "Double" classes involved are "scala.Double" classes (according
to IDEA) and my implementation of aggFun1 is just calling aggFun2 (type
parameters _Key and _Index are inferred by the Scala compiler).

Now I am writing a test as follows:

  val result: Map[Long, Any] = aggFun1(input).collect().toMap
  result.values.foreach(v => println(v.getClass))
  result.values.foreach(_ shouldBe a[Double])

and I get the following output:

  class java.lang.Double
  class java.lang.Double
  [info] avg
  [info] - should compute the average *** FAILED ***
  [info]   1.75 was not an instance of double, but an instance of
java.lang.Double

So I am wondering about what magic is going on here. Are scala.Double
values in RDDs automatically converted to java.lang.Doubles or am I just
missing the implicit back-conversion etc.?

Any help appreciated,
Tobias


Re: Connecting a PHP/Java applications to Spark SQL Thrift Server

2015-03-04 Thread أنس الليثي
Thanks very much, I used it and works fine with me.



On 4 March 2015 at 11:56, Arush Kharbanda 
wrote:

> For java You can use hive-jdbc connectivity jars to connect to Spark-SQL.
>
> The driver is inside the hive-jdbc Jar.
>
> *http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/jdbc/HiveDriver.html
> *
>
>
>
>
> On Wed, Mar 4, 2015 at 1:26 PM,  wrote:
>
>> SparkSQL supports JDBC/ODBC connectivity, so if that's the route you
>> needed/wanted to connect through you could do so via java/php apps.
>> Havent
>> used either so cant speak to the developer experience, assume its pretty
>> good as would be preferred method for lots of third party enterprise
>> apps/tooling
>>
>> If you prefer using the thrift server/interface, if they don't exist
>> already
>> in open source land you can use thrift definitions to generate client libs
>> in any supported thrift language and use that for connectivity.  Seems one
>> issue with thrift-server is when running in cluster mode.  Seems like it
>> still exists but UX of error has been cleaned up in 1.3:
>>
>> https://issues.apache.org/jira/browse/SPARK-5176
>>
>>
>>
>> -Original Message-
>> From: fanooos [mailto:dev.fano...@gmail.com]
>> Sent: Tuesday, March 3, 2015 11:15 PM
>> To: user@spark.apache.org
>> Subject: Connecting a PHP/Java applications to Spark SQL Thrift Server
>>
>> We have installed hadoop cluster with hive and spark and the spark sql
>> thrift server is up and running without any problem.
>>
>> Now we have set of applications need to use spark sql thrift server to
>> query
>> some data.
>>
>> Some of these applications are java applications and the others are PHP
>> applications.
>>
>> As I am an old fashioned java developer, I used to connect java
>> applications
>> to BD servers like Mysql using a JDBC driver. Is there a corresponding
>> driver for connecting with Spark Sql Thrift server ? Or what is the
>> library
>> I need to use to connect to it?
>>
>>
>> For PHP, what are the ways we can use to connect PHP applications to Spark
>> Sql Thrift Server?
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-a-PHP-Java-ap
>> plications-to-Spark-SQL-Thrift-Server-tp21902.html
>> 
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
>
> [image: Sigmoid Analytics] 
>
> *Arush Kharbanda* || Technical Teamlead
>
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>



-- 
Anas Rabei
Senior Software Developer
Mubasher.info
anas.ra...@mubasher.info


Spark RDD Python, Numpy Shape command

2015-03-04 Thread rui li
I am a beginner to Spark, having some simple questions regarding the use of
RDD in python.

Suppose I have a matrix called data_matrix, I pass it to RDD using

RDD_matrix = sc.parallelize(data_matrix)

but I will have a problem if I want to know the dimension of the matrix in
Spark, because Sparkk RDD does not know the Python (Numpy package) command
"shape"

In this case, how should I deal with it?

In general, do I need to "translate" all my piece of Python code in RDD
acceptable syntax, so that my Python program can run using Pyspark?

Thanks in advance for any helps!

Best

Rui


  1   2   >