Re: Hive permanent functions are not available in Spark SQL

2015-10-01 Thread Pala M Muthaia
SparkILoop.scala:997)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


On Thu, Oct 1, 2015 at 12:27 PM, Yin Huai <yh...@databricks.com> wrote:

> Hi Pala,
>
> Can you add the full stacktrace of the exception? For now, can you use
> create temporary function to workaround the issue?
>
> Thanks,
>
> Yin
>
> On Wed, Sep 30, 2015 at 11:01 AM, Pala M Muthaia <
> mchett...@rocketfuelinc.com.invalid> wrote:
>
>> +user list
>>
>> On Tue, Sep 29, 2015 at 3:43 PM, Pala M Muthaia <
>> mchett...@rocketfuelinc.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to use internal UDFs that we have added as permanent
>>> functions to Hive, from within Spark SQL query (using HiveContext), but i
>>> encounter NoSuchObjectException, i.e. the function could not be found.
>>>
>>> However, if i execute 'show functions' command in spark SQL, the
>>> permanent functions appear in the list.
>>>
>>> I am using Spark 1.4.1 with Hive 0.13.1. I tried to debug this by
>>> looking at the log and code, but it seems both the show functions command
>>> as well as udf query both go through essentially the same code path, but
>>> the former can see the UDF but the latter can't.
>>>
>>> Any ideas on how to debug/fix this?
>>>
>>>
>>> Thanks,
>>> pala
>>>
>>
>>
>


Re: Hive permanent functions are not available in Spark SQL

2015-09-30 Thread Pala M Muthaia
+user list

On Tue, Sep 29, 2015 at 3:43 PM, Pala M Muthaia <mchett...@rocketfuelinc.com
> wrote:

> Hi,
>
> I am trying to use internal UDFs that we have added as permanent functions
> to Hive, from within Spark SQL query (using HiveContext), but i encounter
> NoSuchObjectException, i.e. the function could not be found.
>
> However, if i execute 'show functions' command in spark SQL, the permanent
> functions appear in the list.
>
> I am using Spark 1.4.1 with Hive 0.13.1. I tried to debug this by looking
> at the log and code, but it seems both the show functions command as well
> as udf query both go through essentially the same code path, but the former
> can see the UDF but the latter can't.
>
> Any ideas on how to debug/fix this?
>
>
> Thanks,
> pala
>


LogisticRegressionWithLBFGS with large feature set

2015-05-14 Thread Pala M Muthaia
Hi,

I am trying to validate our modeling data pipeline by running
LogisticRegressionWithLBFGS on a dataset with ~3.7 million features,
basically to compute AUC. This is on Spark 1.3.0.

I am using 128 executors with 4 GB each + driver with 8 GB. The number of
data partitions is 3072

The execution fails with the following messages:

*Total size of serialized results of 54 tasks (10.4 GB) is bigger than
spark.driver.maxResultSize (3.0 GB)*

The associated stage in the job is treeAggregate at StandardScaler.scala:52
http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3attempt=0
:
The call stack looks as below:

org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996)
org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52)
org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233)
org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190)


I am trying to both understand why such large amount of data needs to be
passed back to driver as well as figure out a way around this. I also want
to understand how much memory is required, as a function of dataset size,
feature set size, and number of iterations performed, for future
experiments.

From looking at the MLLib code, the largest data structure seems to be a
dense vector of the same size as feature set. I am not familiar with
algorithm or its implementation I would guess 3.7 million features would
lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset
size become so large?

I looked into the treeAggregate and it looks like hierarchical aggregation.
If the data being sent to the driver is basically the aggregated
coefficients (i.e. dense vectors) for the final aggregation, can't the
dense vectors from executors be pulled in one at a time and merged in
memory, rather than pulling all of them in together? (This is totally
uneducated guess so i may be completely off here).

Is there a way to get this running?

Thanks,
pala


Re: Building spark 1.2 from source requires more dependencies

2015-03-26 Thread Pala M Muthaia
+spark-dev

Yes, the dependencies are there. I guess my question is how come the build
is succeeding in the mainline then, without adding these dependencies?

On Thu, Mar 26, 2015 at 3:44 PM, Ted Yu yuzhih...@gmail.com wrote:

 Looking at output from dependency:tree, servlet-api is brought in by the
 following:

 [INFO] +- org.apache.cassandra:cassandra-all:jar:1.2.6:compile
 [INFO] |  +- org.antlr:antlr:jar:3.2:compile
 [INFO] |  +- com.googlecode.json-simple:json-simple:jar:1.1:compile
 [INFO] |  +- org.yaml:snakeyaml:jar:1.6:compile
 [INFO] |  +- edu.stanford.ppl:snaptree:jar:0.1:compile
 [INFO] |  +- org.mindrot:jbcrypt:jar:0.3m:compile
 [INFO] |  +- org.apache.thrift:libthrift:jar:0.7.0:compile
 [INFO] |  |  \- javax.servlet:servlet-api:jar:2.5:compile

 FYI

 On Thu, Mar 26, 2015 at 3:36 PM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote:

 Hi,

 We are trying to build spark 1.2 from source (tip of the branch-1.2 at
 the moment). I tried to build spark using the following command:

 mvn -U -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
 -Phive-thriftserver -DskipTests clean package

 I encountered various missing class definition exceptions (e.g: class
 javax.servlet.ServletException not found).

 I eventually got the build to succeed after adding the following set of
 dependencies to the spark-core's pom.xml:

 dependency
   groupIdjavax.servlet/groupId
   artifactId*servlet-api*/artifactId
   version3.0/version
 /dependency

 dependency
   groupIdorg.eclipse.jetty/groupId
   artifactId*jetty-io*/artifactId
 /dependency

 dependency
   groupIdorg.eclipse.jetty/groupId
   artifactId*jetty-http*/artifactId
 /dependency

 dependency
   groupIdorg.eclipse.jetty/groupId
   artifactId*jetty-servlet*/artifactId
 /dependency

 Pretty much all of the missing class definition errors came up while
 building HttpServer.scala, and went away after the above dependencies were
 included.

 My guess is official build for spark 1.2 is working already. My question
 is what is wrong with my environment or setup, that requires me to add
 dependencies to pom.xml in this manner, to get this build to succeed.

 Also, i am not sure if this build would work at runtime for us, i am
 still testing this out.


 Thanks,
 pala





Building spark 1.2 from source requires more dependencies

2015-03-26 Thread Pala M Muthaia
Hi,

We are trying to build spark 1.2 from source (tip of the branch-1.2 at the
moment). I tried to build spark using the following command:

mvn -U -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
-Phive-thriftserver -DskipTests clean package

I encountered various missing class definition exceptions (e.g: class
javax.servlet.ServletException not found).

I eventually got the build to succeed after adding the following set of
dependencies to the spark-core's pom.xml:

dependency
  groupIdjavax.servlet/groupId
  artifactId*servlet-api*/artifactId
  version3.0/version
/dependency

dependency
  groupIdorg.eclipse.jetty/groupId
  artifactId*jetty-io*/artifactId
/dependency

dependency
  groupIdorg.eclipse.jetty/groupId
  artifactId*jetty-http*/artifactId
/dependency

dependency
  groupIdorg.eclipse.jetty/groupId
  artifactId*jetty-servlet*/artifactId
/dependency

Pretty much all of the missing class definition errors came up while
building HttpServer.scala, and went away after the above dependencies were
included.

My guess is official build for spark 1.2 is working already. My question is
what is wrong with my environment or setup, that requires me to add
dependencies to pom.xml in this manner, to get this build to succeed.

Also, i am not sure if this build would work at runtime for us, i am still
testing this out.


Thanks,
pala


Re: Issues with constants in Spark HiveQL queries

2015-01-28 Thread Pala M Muthaia
By typo i meant that the column name had a spelling error:
conversion_aciton_id.
It should have been conversion_action_id.

No, we tried it a few times, and we didn't have + signs or anything like
that - we tried it with columns of different types too - string, double etc
and saw the same error.




On Tue, Jan 20, 2015 at 8:59 PM, yana yana.kadiy...@gmail.com wrote:

 I run Spark 1.2 and do not have this issue. I dont believe the Hive
 version would matter(I run spark1.2 with Hive12 profile) but that would be
 a good test. The last version I tried for you was a cdh4.2 spark1.2
 prebuilt without pointing to an external hive install(in fact I tried it on
 a machine w/ no other hadoop/hive jars). So download, unzip and run spark
 shell. I dont believe it's a bug personally. When you say typo do you mean
 there was indeed token Plus in your string? If you remove that token what
 stacktrace do you get?


 Sent on the new Sprint Network from my Samsung Galaxy S®4.


  Original message 
 From: Pala M Muthaia
 Date:01/19/2015 8:26 PM (GMT-05:00)
 To: Yana Kadiyska
 Cc: Cheng, Hao ,user@spark.apache.org
 Subject: Re: Issues with constants in Spark HiveQL queries

 Yes we tried the master branch (sometime last week) and there was no
 issue, but the above repro is for branch 1.2 and Hive 0.13. Isn't that the
 final release branch for Spark 1.2?

 If so, a patch needs to be created or back-ported from master?

 (Yes the obvious typo in the column name was introduced in this email
 only, so is irrelevant to the error).

 On Wed, Jan 14, 2015 at 5:52 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 yeah, that makes sense. Pala, are you on a prebuild version of Spark -- I
 just tried the CDH4 prebuilt...Here is what I get for the = token:

 [image: Inline image 1]

 The literal type shows as 290, not 291, and 290 is numeric. According to
 this
 http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hive/hive-exec/0.13.1/org/apache/hadoop/hive/ql/parse/HiveParser.java#HiveParser
 291 is token PLUS which is really weird...


 On Wed, Jan 14, 2015 at 7:47 PM, Cheng, Hao hao.ch...@intel.com wrote:

  The log showed it failed in parsing, so the typo stuff shouldn’t be
 the root cause. BUT I couldn’t reproduce that with master branch.



 I did the test as follow:



 sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.13.1 hive/console

 scala sql(“SELECT user_id FROM actions where
 conversion_aciton_id=20141210”)



 sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.12.0 hive/console

 scala sql(“SELECT user_id FROM actions where
 conversion_aciton_id=20141210”)





 *From:* Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
 *Sent:* Wednesday, January 14, 2015 11:12 PM
 *To:* Pala M Muthaia
 *Cc:* user@spark.apache.org
 *Subject:* Re: Issues with constants in Spark HiveQL queries



 Just a guess but what is the type of conversion_aciton_id? I do queries
 over an epoch all the time with no issues(where epoch's type is bigint).
 You can see the source here
 https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
  --
 not sure what ASTNode type: 291 but it sounds like it's not considered
 numeric? If it's a string it should be conversion_aciton_id=*'*20141210*'
 *(single quotes around the string)



 On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote:

  Hi,



 We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some
 simple queries successfully, but we hit the following issue whenever we
 attempt to use a constant in the query predicate.



 It seems like an issue with parsing constant.



 Query: SELECT user_id FROM actions where conversion_aciton_id=20141210



 Error:

 scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
 20141210 :

 20141210



 Any ideas? This seems very basic, so we may be missing something basic,
 but i haven't figured out what it is.



 ---



 Full shell output below:



 scala sqlContext.sql(SELECT user_id FROM actions where
 conversion_aciton_id=20141210)

 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM
 actions where conversion_aciton_id=20141210

 15/01/13 16:55:54 INFO ParseDriver: Parse Completed

 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM
 actions where conversion_aciton_id=20141210

 15/01/13 16:55:54 INFO ParseDriver: Parse Completed

 java.lang.RuntimeException:

 Unsupported language features in query: SELECT user_id FROM actions
 where conversion_aciton_id=20141210

 TOK_QUERY

   TOK_FROM

 TOK_TABREF

   TOK_TABNAME

 actions

   TOK_INSERT

 TOK_DESTINATION

   TOK_DIR

 TOK_TMP_FILE

 TOK_SELECT

   TOK_SELEXPR

 TOK_TABLE_OR_COL

   user_id

 TOK_WHERE

   =

 TOK_TABLE_OR_COL

   conversion_aciton_id

 20141210



 scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
 20141210

Re: Issues with constants in Spark HiveQL queries

2015-01-19 Thread Pala M Muthaia
Yes we tried the master branch (sometime last week) and there was no issue,
but the above repro is for branch 1.2 and Hive 0.13. Isn't that the final
release branch for Spark 1.2?

If so, a patch needs to be created or back-ported from master?

(Yes the obvious typo in the column name was introduced in this email only,
so is irrelevant to the error).

On Wed, Jan 14, 2015 at 5:52 PM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 yeah, that makes sense. Pala, are you on a prebuild version of Spark -- I
 just tried the CDH4 prebuilt...Here is what I get for the = token:

 [image: Inline image 1]

 The literal type shows as 290, not 291, and 290 is numeric. According to
 this
 http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hive/hive-exec/0.13.1/org/apache/hadoop/hive/ql/parse/HiveParser.java#HiveParser
 291 is token PLUS which is really weird...


 On Wed, Jan 14, 2015 at 7:47 PM, Cheng, Hao hao.ch...@intel.com wrote:

  The log showed it failed in parsing, so the typo stuff shouldn’t be the
 root cause. BUT I couldn’t reproduce that with master branch.



 I did the test as follow:



 sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.13.1 hive/console

 scala sql(“SELECT user_id FROM actions where
 conversion_aciton_id=20141210”)



 sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.12.0 hive/console

 scala sql(“SELECT user_id FROM actions where
 conversion_aciton_id=20141210”)





 *From:* Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
 *Sent:* Wednesday, January 14, 2015 11:12 PM
 *To:* Pala M Muthaia
 *Cc:* user@spark.apache.org
 *Subject:* Re: Issues with constants in Spark HiveQL queries



 Just a guess but what is the type of conversion_aciton_id? I do queries
 over an epoch all the time with no issues(where epoch's type is bigint).
 You can see the source here
 https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
  --
 not sure what ASTNode type: 291 but it sounds like it's not considered
 numeric? If it's a string it should be conversion_aciton_id=*'*20141210*'
 *(single quotes around the string)



 On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote:

  Hi,



 We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some simple
 queries successfully, but we hit the following issue whenever we attempt to
 use a constant in the query predicate.



 It seems like an issue with parsing constant.



 Query: SELECT user_id FROM actions where conversion_aciton_id=20141210



 Error:

 scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
 20141210 :

 20141210



 Any ideas? This seems very basic, so we may be missing something basic,
 but i haven't figured out what it is.



 ---



 Full shell output below:



 scala sqlContext.sql(SELECT user_id FROM actions where
 conversion_aciton_id=20141210)

 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM
 actions where conversion_aciton_id=20141210

 15/01/13 16:55:54 INFO ParseDriver: Parse Completed

 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM
 actions where conversion_aciton_id=20141210

 15/01/13 16:55:54 INFO ParseDriver: Parse Completed

 java.lang.RuntimeException:

 Unsupported language features in query: SELECT user_id FROM actions where
 conversion_aciton_id=20141210

 TOK_QUERY

   TOK_FROM

 TOK_TABREF

   TOK_TABNAME

 actions

   TOK_INSERT

 TOK_DESTINATION

   TOK_DIR

 TOK_TMP_FILE

 TOK_SELECT

   TOK_SELEXPR

 TOK_TABLE_OR_COL

   user_id

 TOK_WHERE

   =

 TOK_TABLE_OR_COL

   conversion_aciton_id

 20141210



 scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
 20141210 :

 20141210

  +



 org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1110)



 at scala.sys.package$.error(package.scala:27)

 at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:251)

 at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)

 at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)

 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)

 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)

 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)

 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)

 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)

 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)

 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)

 at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)

 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply

Re: Issues with constants in Spark HiveQL queries

2015-01-14 Thread Pala M Muthaia
conversion_action_id is a int. We also tried a string column predicate with
single quotes string value and hit the same error stack.

On Wed, Jan 14, 2015 at 7:11 AM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 Just a guess but what is the type of conversion_aciton_id? I do queries
 over an epoch all the time with no issues(where epoch's type is bigint).
 You can see the source here
 https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
  --
 not sure what ASTNode type: 291 but it sounds like it's not considered
 numeric? If it's a string it should be conversion_aciton_id=*'*20141210*'
 *(single quotes around the string)

 On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote:

 Hi,

 We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some simple
 queries successfully, but we hit the following issue whenever we attempt to
 use a constant in the query predicate.

 It seems like an issue with parsing constant.

 Query: SELECT user_id FROM actions where conversion_aciton_id=20141210

 Error:
 scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
 20141210 :
 20141210

 Any ideas? This seems very basic, so we may be missing something basic,
 but i haven't figured out what it is.

 ---

 Full shell output below:

 scala sqlContext.sql(SELECT user_id FROM actions where
 conversion_aciton_id=20141210)
 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM
 actions where conversion_aciton_id=20141210
 15/01/13 16:55:54 INFO ParseDriver: Parse Completed
 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM
 actions where conversion_aciton_id=20141210
 15/01/13 16:55:54 INFO ParseDriver: Parse Completed
 java.lang.RuntimeException:
 Unsupported language features in query: SELECT user_id FROM actions where
 conversion_aciton_id=20141210
 TOK_QUERY
   TOK_FROM
 TOK_TABREF
   TOK_TABNAME
 actions
   TOK_INSERT
 TOK_DESTINATION
   TOK_DIR
 TOK_TMP_FILE
 TOK_SELECT
   TOK_SELEXPR
 TOK_TABLE_OR_COL
   user_id
 TOK_WHERE
   =
 TOK_TABLE_OR_COL
   conversion_aciton_id
 20141210

 scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
 20141210 :
 20141210
  +

 org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1110)

 at scala.sys.package$.error(package.scala:27)
 at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:251)
 at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)
 at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
 at
 scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
 at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
 at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:133)
 at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:133)
 at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
 at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242

Re: OOM exception during row deserialization

2015-01-12 Thread Pala M Muthaia
Does anybody have insight on this? Thanks.

On Fri, Jan 9, 2015 at 6:30 PM, Pala M Muthaia mchett...@rocketfuelinc.com
wrote:

 Hi,

 I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during
 a join step.

 Basically, i have a RDD of rows, that i am joining with another RDD of
 tuples.

 Some of the tasks succeed but a fair number failed with OOM exception with
 stack below. The stack belongs to the 'reducer' that is reading shuffle
 output from the 'mapper'.

 My question is what's the object being deserialized here - just a portion
 of an RDD or the whole RDD partition assigned to current reducer? The rows
 in the RDD could be large, but definitely not something that would run to
 100s of MBs in size, and thus run out of memory.

 Also, is there a way to determine size of the object being deserialized
 that results in the error (either by looking at some staging hdfs dir or
 logs)?

 java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit 
 exceeded}
 java.util.Arrays.copyOf(Arrays.java:2367)
 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535)
 java.lang.StringBuilder.append(StringBuilder.java:204)
 java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142)
 java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050)
 java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863)
 java.io.ObjectInputStream.readString(ObjectInputStream.java:1636)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 java.util.ArrayList.readObject(ArrayList.java:771)
 sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:606)
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)



 Thanks,
 pala



Broadcast joins on RDD

2015-01-12 Thread Pala M Muthaia
Hi,


How do i do broadcast/map join on RDDs? I have a large RDD that i want to
inner join with a small RDD. Instead of having the large RDD repartitioned
and shuffled for join, i would rather send a copy of a small RDD to each
task, and then perform the join locally.

How would i specify this in Spark code? I didn't find much documentation
online. I attempted to create a broadcast variable out of the small RDD and
then access that in the join operator:

largeRdd.join(smallRddBroadCastVar.value)

but that didn't work as expected ( I found that all rows with same key were
on same task)

I am using Spark version 1.0.1


Thanks,
pala


OOM exception during row deserialization

2015-01-09 Thread Pala M Muthaia
Hi,

I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during a
join step.

Basically, i have a RDD of rows, that i am joining with another RDD of
tuples.

Some of the tasks succeed but a fair number failed with OOM exception with
stack below. The stack belongs to the 'reducer' that is reading shuffle
output from the 'mapper'.

My question is what's the object being deserialized here - just a portion
of an RDD or the whole RDD partition assigned to current reducer? The rows
in the RDD could be large, but definitely not something that would run to
100s of MBs in size, and thus run out of memory.

Also, is there a way to determine size of the object being deserialized
that results in the error (either by looking at some staging hdfs dir or
logs)?

java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead
limit exceeded}
java.util.Arrays.copyOf(Arrays.java:2367)
java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535)
java.lang.StringBuilder.append(StringBuilder.java:204)
java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142)
java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050)
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863)
java.io.ObjectInputStream.readString(ObjectInputStream.java:1636)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
java.util.ArrayList.readObject(ArrayList.java:771)
sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)



Thanks,
pala


Re: Executor memory

2014-12-16 Thread Pala M Muthaia
Thanks for the clarifications. I misunderstood what the number on UI meant.

On Mon, Dec 15, 2014 at 7:00 PM, Sean Owen so...@cloudera.com wrote:

 I believe this corresponds to the 0.6 of the whole heap that is
 allocated for caching partitions. See spark.storage.memoryFraction on
 http://spark.apache.org/docs/latest/configuration.html 0.6 of 4GB is
 about 2.3GB.

 The note there is important, that you probably don't want to exceed
 the JVM old generation size with this parameter.

 On Tue, Dec 16, 2014 at 12:53 AM, Pala M Muthaia
 mchett...@rocketfuelinc.com wrote:
  Hi,
 
  Running Spark 1.0.1 on Yarn 2.5
 
  When i specify --executor-memory 4g, the spark UI shows each executor as
  having only 2.3 GB, and similarly for 8g, only 4.6 GB.
 
  I am guessing that the executor memory corresponds to the container
 memory,
  and that the task JVM gets only a percentage of the container total
 memory.
  Is there a yarn or spark parameter to tune this so that my task JVM
 actually
  gets 6GB out of the 8GB for example?
 
 
  Thanks.
 
 



Re: Lost executors

2014-11-20 Thread Pala M Muthaia
Just to close the loop, it seems no issues pop up when i submit the job
using 'spark submit' so that the driver process also runs on a container in
the YARN cluster.

In the above, the driver was running on the gateway machine through which
the job was submitted, which led to quite a few issues.

On Tue, Nov 18, 2014 at 5:01 PM, Pala M Muthaia mchett...@rocketfuelinc.com
 wrote:

 Sandy,

 Good point - i forgot about NM logs.

 When i looked up the NM logs, i only see the following statements that
 align with the driver side log about lost executor. Many executors show the
 same log statement at the same time, so it seems like the decision to kill
 many if not all executors happened centrally, and all executors got
 notified somehow:

 14/11/18 00:18:25 INFO Executor: Executor is trying to kill task 2013
 14/11/18 00:18:25 INFO Executor: Executor killed task 2013


 In general, i also see quite a few instances of the following exception 
 across many executors/nodes. :

 14/11/17 23:58:00 INFO HadoopRDD: Input split: hdfs dir 
 path/sorted_keys-1020_3-r-00255.deflate:0+415841

 14/11/17 23:58:00 WARN BlockReaderLocal: error creating DomainSocket
 java.net.ConnectException: connect(2) error: Connection refused when trying 
 to connect to '/srv/var/hadoop/runs/hdfs/dn_socket'
   at org.apache.hadoop.net.unix.DomainSocket.connect0(Native Method)
   at 
 org.apache.hadoop.net.unix.DomainSocket.connect(DomainSocket.java:250)
   at 
 org.apache.hadoop.hdfs.DomainSocketFactory.createSocket(DomainSocketFactory.java:158)
   at 
 org.apache.hadoop.hdfs.BlockReaderFactory.nextDomainPeer(BlockReaderFactory.java:721)
   at 
 org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:441)
   at 
 org.apache.hadoop.hdfs.client.ShortCircuitCache.create(ShortCircuitCache.java:780)
   at 
 org.apache.hadoop.hdfs.client.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:714)
   at 
 org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:395)
   at 
 org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:303)
   at 
 org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:567)
   at 
 org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:790)
   at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
   at java.io.DataInputStream.read(DataInputStream.java:149)
   at 
 org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159)
   at 
 org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
   at 
 org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
   at java.io.InputStream.read(InputStream.java:101)
   at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
   at 
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
   at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
   at 
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
   at 
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:201)
   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:184)
   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
   at 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at scala.collection.Iterator$class.isEmpty(Iterator.scala:256)
   at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157)
   at 
 $line57.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:51)
   at 
 $line57.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:50)
   at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
   at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
   at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262

Lost executors

2014-11-18 Thread Pala M Muthaia
Hi,

I am using Spark 1.0.1 on Yarn 2.5, and doing everything through spark
shell.

I am running a job that essentially reads a bunch of HBase keys, looks up
HBase data, and performs some filtering and aggregation. The job works fine
in smaller datasets, but when i try to execute on the full dataset, the job
never completes. The few symptoms i notice are:

a. The job shows progress for a while and then starts throwing lots of the
following errors:

2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] INFO
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - *Executor
906 disconnected, so removing it*
2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] ERROR
org.apache.spark.scheduler.cluster.YarnClientClusterScheduler - *Lost
executor 906 on machine name: remote Akka client disassociated*

2014-11-18 16:52:02,283 [spark-akka.actor.default-dispatcher-22] WARN
 org.apache.spark.storage.BlockManagerMasterActor - *Removing BlockManager
BlockManagerId(9186, machine name, 54600, 0) with no recent heart beats:
82313ms exceeds 45000ms*

Looking at the logs, the job never recovers from these errors, and
continues to show errors about lost executors and launching new executors,
and this just continues for a long time.

Could this be because the executors are running out of memory?

In terms of memory usage, the intermediate data could be large (after the
HBase lookup), but partial and fully aggregated data set size should be
quite small - essentially a bunch of ids and counts ( 1 mil in total).



b. In the Spark UI, i am seeing the following errors (redacted for
brevity), not sure if they are transient or real issue:

java.net.SocketTimeoutException (java.net.SocketTimeoutException: Read
timed out}
...
org.apache.spark.util.Utils$.fetchFile(Utils.scala:349)
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
...
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:724)




I was trying to get more data to investigate but haven't been able to
figure out how to enable logging on the executors. The Spark UI appears
stuck and i only see driver side logs in the jobhistory directory specified
in the job.


Thanks,
pala


Re: Lost executors

2014-11-18 Thread Pala M Muthaia
)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

14/11/17 23:58:00 WARN ShortCircuitCache:
ShortCircuitCache(0x71a8053d): failed to load
1276010498_BP-1416824317-172.22.48.2-1387241776581


However, in some of the nodes, it seems execution proceeded after the
error, so the above could just be a transient error.

Finally, in the driver logs, i was looking for hint on the decision to kill
many executors, around the 00:18:25 timestamp when many tasks were killed
across many executors, but i didn't find anything different.



On Tue, Nov 18, 2014 at 1:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Pala,

 Do you have access to your YARN NodeManager logs?  Are you able to check
 whether they report killing any containers for exceeding memory limits?

 -Sandy

 On Tue, Nov 18, 2014 at 1:54 PM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote:

 Hi,

 I am using Spark 1.0.1 on Yarn 2.5, and doing everything through spark
 shell.

 I am running a job that essentially reads a bunch of HBase keys, looks up
 HBase data, and performs some filtering and aggregation. The job works fine
 in smaller datasets, but when i try to execute on the full dataset, the job
 never completes. The few symptoms i notice are:

 a. The job shows progress for a while and then starts throwing lots of
 the following errors:

 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] INFO
  org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - *Executor
 906 disconnected, so removing it*
 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] ERROR
 org.apache.spark.scheduler.cluster.YarnClientClusterScheduler - *Lost
 executor 906 on machine name: remote Akka client disassociated*

 2014-11-18 16:52:02,283 [spark-akka.actor.default-dispatcher-22] WARN
  org.apache.spark.storage.BlockManagerMasterActor - *Removing
 BlockManager BlockManagerId(9186, machine name, 54600, 0) with no recent
 heart beats: 82313ms exceeds 45000ms*

 Looking at the logs, the job never recovers from these errors, and
 continues to show errors about lost executors and launching new executors,
 and this just continues for a long time.

 Could this be because the executors are running out of memory?

 In terms of memory usage, the intermediate data could be large (after the
 HBase lookup), but partial and fully aggregated data set size should be
 quite small - essentially a bunch of ids and counts ( 1 mil in total).



 b. In the Spark UI, i am seeing the following errors (redacted for
 brevity), not sure if they are transient or real issue:

 java.net.SocketTimeoutException (java.net.SocketTimeoutException: Read timed 
 out}
 ...
 org.apache.spark.util.Utils$.fetchFile(Utils.scala:349)
 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 ...
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:724)




 I was trying to get more data to investigate but haven't been able to
 figure out how to enable logging on the executors. The Spark UI appears
 stuck and i only see driver side logs in the jobhistory directory specified
 in the job.


 Thanks,
 pala






Re: Assigning input files to spark partitions

2014-11-17 Thread Pala M Muthaia
Hi Daniel,

Yes that should work also. However, is it possible to setup so that each
RDD has exactly one partition, without repartitioning (and thus incurring
extra cost)? Is there a mechanism similar to MR where we can ensure each
partition is assigned some amount of data by size, by setting some block
size parameter?



On Thu, Nov 13, 2014 at 1:05 PM, Daniel Siegmann daniel.siegm...@velos.io
wrote:

 On Thu, Nov 13, 2014 at 3:24 PM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote


 No i don't want separate RDD because each of these partitions are being
 processed the same way (in my case, each partition corresponds to HBase
 keys belonging to one region server, and i will do HBase lookups). After
 that i have aggregations too, hence all these partitions should be in the
 same RDD. The reason to follow the partition structure is to limit
 concurrent HBase lookups targeting a single region server.


 Neither of these is necessarily a barrier to using separate RDDs. You can
 define the function you want to use and then pass it to multiple map
 methods. Then you could union all the RDDs to do your aggregations. For
 example, it might look something like this:

 val paths: String = ... // the paths to the files you want to load
 def myFunc(t: T) = ... // the function to apply to every RDD
 val rdds = paths.map { path =
 sc.textFile(path).map(myFunc)
 }
 val completeRdd = sc.union(rdds)

 Does that make any sense?

 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 54 W 40th St, New York, NY 10018
 E: daniel.siegm...@velos.io W: www.velos.io



Re: Assigning input files to spark partitions

2014-11-13 Thread Pala M Muthaia
Thanks for the responses Daniel and Rishi.

No i don't want separate RDD because each of these partitions are being
processed the same way (in my case, each partition corresponds to HBase
keys belonging to one region server, and i will do HBase lookups). After
that i have aggregations too, hence all these partitions should be in the
same RDD. The reason to follow the partition structure is to limit
concurrent HBase lookups targeting a single region server.

Not sure what the block size is here (HDFS block size?), but my files may
get large over time, so cannot depend on block size assumption. That said,
from your description, it seems like i don't have to worry too much because
Spark does assign files to partitions while maintaining 'locality' (i.e. a
given file's data would fit in ceil(filesize/blocksize) partitions, as
opposed to spread across numerous partitions).

Yes, i saw the wholeTextFile(), it won't apply in my case because input
file size can be quite large.

On Thu, Nov 13, 2014 at 8:04 AM, Daniel Siegmann daniel.siegm...@velos.io
wrote:

 I believe Rishi is correct. I wouldn't rely on that though - all it would
 take is for one file to exceed the block size and you'd be setting yourself
 up for pain. Also, if your files are small - small enough to fit in a
 single record - you could use SparkContext.wholeTextFile.

 On Thu, Nov 13, 2014 at 10:11 AM, Rishi Yadav ri...@infoobjects.com
 wrote:

 If your data is in hdfs and you are reading as textFile and each file is
 less than block size, my understanding is it would always have one
 partition per file.


 On Thursday, November 13, 2014, Daniel Siegmann daniel.siegm...@velos.io
 wrote:

 Would it make sense to read each file in as a separate RDD? This way you
 would be guaranteed the data is partitioned as you expected.

 Possibly you could then repartition each of those RDDs into a single
 partition and then union them. I think that would achieve what you expect.
 But it would be easy to accidentally screw this up (have some operation
 that causes a shuffle), so I think you're better off just leaving them as
 separate RDDs.

 On Wed, Nov 12, 2014 at 10:27 PM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote:

 Hi,

 I have a set of input files for a spark program, with each file
 corresponding to a logical data partition. What is the API/mechanism to
 assign each input file (or a set of files) to a spark partition, when
 initializing RDDs?

 When i create a spark RDD pointing to the directory of files, my
 understanding is it's not guaranteed that each input file will be treated
 as separate partition.

 My job semantics require that the data is partitioned, and i want to
 leverage the partitioning that has already been done, rather than
 repartitioning again in the spark job.

 I tried to lookup online but haven't found any pointers so far.


 Thanks
 pala




 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 54 W 40th St, New York, NY 10018
 E: daniel.siegm...@velos.io W: www.velos.io



 --
 - Rishi




 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 54 W 40th St, New York, NY 10018
 E: daniel.siegm...@velos.io W: www.velos.io



Assigning input files to spark partitions

2014-11-12 Thread Pala M Muthaia
Hi,

I have a set of input files for a spark program, with each file
corresponding to a logical data partition. What is the API/mechanism to
assign each input file (or a set of files) to a spark partition, when
initializing RDDs?

When i create a spark RDD pointing to the directory of files, my
understanding is it's not guaranteed that each input file will be treated
as separate partition.

My job semantics require that the data is partitioned, and i want to
leverage the partitioning that has already been done, rather than
repartitioning again in the spark job.

I tried to lookup online but haven't found any pointers so far.


Thanks
pala


Re: Cannot instantiate hive context

2014-11-03 Thread Pala M Muthaia
Thanks Akhil.

I realized that earlier, and i thought mvn -Phive should have captured and
included all these dependencies.

In any case, i proceeded with that, included other such dependencies that
were missing, and  finally hit the guava version mismatch issue. (Spark
with Guava 14 vs Hadoop/Hive with Guava 11). There are 2 parts:

1. Spark includes Guava library within its jars and that may conflict with
Hadoop/Hive components depending on older version of the library.

It seems this has been solved with SPARK-2848
https://issues.apache.org/jira/browse/SPARK-2848 patch to shade the Guava
libraries.


2. Spark actually uses interfaces from newer version of Guava library, that
needs to be rewritten to use older version (i.e. downgrade Spark dependency
on Guava).

I wasn't able to find the related patches (I need them since i am on Spark
1.0.1). Applying patch for #1 above, i still hit the following error:

14/11/03 15:01:32 WARN storage.BlockManager: Putting block broadcast_0
failed
java.lang.NoSuchMethodError:
com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;
at org.apache.spark.util.collection.OpenHashSet.org
$apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261)
at
org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165)
at
org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102)
 stack continues

I haven't been able to find the other patches that actually downgrade the
dependency.


Please point me to those patches, or any other ideas about fixing these
dependency issues.


Thanks.



On Sun, Nov 2, 2014 at 8:41 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Adding the libthrift jar
 http://mvnrepository.com/artifact/org.apache.thrift/libthrift/0.9.0 in
 the class path would resolve this issue.

 Thanks
 Best Regards

 On Sat, Nov 1, 2014 at 12:34 AM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote:

 Hi,

 I am trying to load hive datasets using HiveContext, in spark shell.
 Spark ver 1.0.1 and Hive ver 0.12.

 We are trying to get Spark work with hive datasets. I already have
 existing Spark deployment. Following is what i did on top of that:
 1. Build spark using 'mvn -Pyarn,hive -Phadoop-2.4 -Dhadoop.version=2.4.0
 -DskipTests clean package'
 2. Copy over spark-assembly-1.0.1-hadoop2.4.0.jar into spark deployment
 directory.
 3. Launch spark-shell with the spark hive jar included in the list.

 When i execute *'*

 *val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)*

 i get the following error stack:

 java.lang.NoClassDefFoundError: org/apache/thrift/TBase
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
 at
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 
 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException: org.apache.thrift.TBase
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 ... 55 more

 I thought that building with -Phive option should include all the
 necessary hive packages into the assembly jar (according to here
 https://spark.apache.org/docs/1.0.1/sql-programming-guide.html#hive-tables).
 I tried searching online and in this mailing list archive but haven't found
 any instructions on how to get this working.

 I know that there is additional step of updating the assembly jar across
 the whole cluster, not just client side, but right now, even the client is
 not working.

 Would appreciate instructions (or link to them) on how to get this
 working end-to-end.


 Thanks,
 pala