Re: Hive permanent functions are not available in Spark SQL
SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) On Thu, Oct 1, 2015 at 12:27 PM, Yin Huai <yh...@databricks.com> wrote: > Hi Pala, > > Can you add the full stacktrace of the exception? For now, can you use > create temporary function to workaround the issue? > > Thanks, > > Yin > > On Wed, Sep 30, 2015 at 11:01 AM, Pala M Muthaia < > mchett...@rocketfuelinc.com.invalid> wrote: > >> +user list >> >> On Tue, Sep 29, 2015 at 3:43 PM, Pala M Muthaia < >> mchett...@rocketfuelinc.com> wrote: >> >>> Hi, >>> >>> I am trying to use internal UDFs that we have added as permanent >>> functions to Hive, from within Spark SQL query (using HiveContext), but i >>> encounter NoSuchObjectException, i.e. the function could not be found. >>> >>> However, if i execute 'show functions' command in spark SQL, the >>> permanent functions appear in the list. >>> >>> I am using Spark 1.4.1 with Hive 0.13.1. I tried to debug this by >>> looking at the log and code, but it seems both the show functions command >>> as well as udf query both go through essentially the same code path, but >>> the former can see the UDF but the latter can't. >>> >>> Any ideas on how to debug/fix this? >>> >>> >>> Thanks, >>> pala >>> >> >> >
Re: Hive permanent functions are not available in Spark SQL
+user list On Tue, Sep 29, 2015 at 3:43 PM, Pala M Muthaia <mchett...@rocketfuelinc.com > wrote: > Hi, > > I am trying to use internal UDFs that we have added as permanent functions > to Hive, from within Spark SQL query (using HiveContext), but i encounter > NoSuchObjectException, i.e. the function could not be found. > > However, if i execute 'show functions' command in spark SQL, the permanent > functions appear in the list. > > I am using Spark 1.4.1 with Hive 0.13.1. I tried to debug this by looking > at the log and code, but it seems both the show functions command as well > as udf query both go through essentially the same code path, but the former > can see the UDF but the latter can't. > > Any ideas on how to debug/fix this? > > > Thanks, > pala >
LogisticRegressionWithLBFGS with large feature set
Hi, I am trying to validate our modeling data pipeline by running LogisticRegressionWithLBFGS on a dataset with ~3.7 million features, basically to compute AUC. This is on Spark 1.3.0. I am using 128 executors with 4 GB each + driver with 8 GB. The number of data partitions is 3072 The execution fails with the following messages: *Total size of serialized results of 54 tasks (10.4 GB) is bigger than spark.driver.maxResultSize (3.0 GB)* The associated stage in the job is treeAggregate at StandardScaler.scala:52 http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3attempt=0 : The call stack looks as below: org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996) org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190) I am trying to both understand why such large amount of data needs to be passed back to driver as well as figure out a way around this. I also want to understand how much memory is required, as a function of dataset size, feature set size, and number of iterations performed, for future experiments. From looking at the MLLib code, the largest data structure seems to be a dense vector of the same size as feature set. I am not familiar with algorithm or its implementation I would guess 3.7 million features would lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset size become so large? I looked into the treeAggregate and it looks like hierarchical aggregation. If the data being sent to the driver is basically the aggregated coefficients (i.e. dense vectors) for the final aggregation, can't the dense vectors from executors be pulled in one at a time and merged in memory, rather than pulling all of them in together? (This is totally uneducated guess so i may be completely off here). Is there a way to get this running? Thanks, pala
Re: Building spark 1.2 from source requires more dependencies
+spark-dev Yes, the dependencies are there. I guess my question is how come the build is succeeding in the mainline then, without adding these dependencies? On Thu, Mar 26, 2015 at 3:44 PM, Ted Yu yuzhih...@gmail.com wrote: Looking at output from dependency:tree, servlet-api is brought in by the following: [INFO] +- org.apache.cassandra:cassandra-all:jar:1.2.6:compile [INFO] | +- org.antlr:antlr:jar:3.2:compile [INFO] | +- com.googlecode.json-simple:json-simple:jar:1.1:compile [INFO] | +- org.yaml:snakeyaml:jar:1.6:compile [INFO] | +- edu.stanford.ppl:snaptree:jar:0.1:compile [INFO] | +- org.mindrot:jbcrypt:jar:0.3m:compile [INFO] | +- org.apache.thrift:libthrift:jar:0.7.0:compile [INFO] | | \- javax.servlet:servlet-api:jar:2.5:compile FYI On Thu, Mar 26, 2015 at 3:36 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, We are trying to build spark 1.2 from source (tip of the branch-1.2 at the moment). I tried to build spark using the following command: mvn -U -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package I encountered various missing class definition exceptions (e.g: class javax.servlet.ServletException not found). I eventually got the build to succeed after adding the following set of dependencies to the spark-core's pom.xml: dependency groupIdjavax.servlet/groupId artifactId*servlet-api*/artifactId version3.0/version /dependency dependency groupIdorg.eclipse.jetty/groupId artifactId*jetty-io*/artifactId /dependency dependency groupIdorg.eclipse.jetty/groupId artifactId*jetty-http*/artifactId /dependency dependency groupIdorg.eclipse.jetty/groupId artifactId*jetty-servlet*/artifactId /dependency Pretty much all of the missing class definition errors came up while building HttpServer.scala, and went away after the above dependencies were included. My guess is official build for spark 1.2 is working already. My question is what is wrong with my environment or setup, that requires me to add dependencies to pom.xml in this manner, to get this build to succeed. Also, i am not sure if this build would work at runtime for us, i am still testing this out. Thanks, pala
Building spark 1.2 from source requires more dependencies
Hi, We are trying to build spark 1.2 from source (tip of the branch-1.2 at the moment). I tried to build spark using the following command: mvn -U -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package I encountered various missing class definition exceptions (e.g: class javax.servlet.ServletException not found). I eventually got the build to succeed after adding the following set of dependencies to the spark-core's pom.xml: dependency groupIdjavax.servlet/groupId artifactId*servlet-api*/artifactId version3.0/version /dependency dependency groupIdorg.eclipse.jetty/groupId artifactId*jetty-io*/artifactId /dependency dependency groupIdorg.eclipse.jetty/groupId artifactId*jetty-http*/artifactId /dependency dependency groupIdorg.eclipse.jetty/groupId artifactId*jetty-servlet*/artifactId /dependency Pretty much all of the missing class definition errors came up while building HttpServer.scala, and went away after the above dependencies were included. My guess is official build for spark 1.2 is working already. My question is what is wrong with my environment or setup, that requires me to add dependencies to pom.xml in this manner, to get this build to succeed. Also, i am not sure if this build would work at runtime for us, i am still testing this out. Thanks, pala
Re: Issues with constants in Spark HiveQL queries
By typo i meant that the column name had a spelling error: conversion_aciton_id. It should have been conversion_action_id. No, we tried it a few times, and we didn't have + signs or anything like that - we tried it with columns of different types too - string, double etc and saw the same error. On Tue, Jan 20, 2015 at 8:59 PM, yana yana.kadiy...@gmail.com wrote: I run Spark 1.2 and do not have this issue. I dont believe the Hive version would matter(I run spark1.2 with Hive12 profile) but that would be a good test. The last version I tried for you was a cdh4.2 spark1.2 prebuilt without pointing to an external hive install(in fact I tried it on a machine w/ no other hadoop/hive jars). So download, unzip and run spark shell. I dont believe it's a bug personally. When you say typo do you mean there was indeed token Plus in your string? If you remove that token what stacktrace do you get? Sent on the new Sprint Network from my Samsung Galaxy S®4. Original message From: Pala M Muthaia Date:01/19/2015 8:26 PM (GMT-05:00) To: Yana Kadiyska Cc: Cheng, Hao ,user@spark.apache.org Subject: Re: Issues with constants in Spark HiveQL queries Yes we tried the master branch (sometime last week) and there was no issue, but the above repro is for branch 1.2 and Hive 0.13. Isn't that the final release branch for Spark 1.2? If so, a patch needs to be created or back-ported from master? (Yes the obvious typo in the column name was introduced in this email only, so is irrelevant to the error). On Wed, Jan 14, 2015 at 5:52 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: yeah, that makes sense. Pala, are you on a prebuild version of Spark -- I just tried the CDH4 prebuilt...Here is what I get for the = token: [image: Inline image 1] The literal type shows as 290, not 291, and 290 is numeric. According to this http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hive/hive-exec/0.13.1/org/apache/hadoop/hive/ql/parse/HiveParser.java#HiveParser 291 is token PLUS which is really weird... On Wed, Jan 14, 2015 at 7:47 PM, Cheng, Hao hao.ch...@intel.com wrote: The log showed it failed in parsing, so the typo stuff shouldn’t be the root cause. BUT I couldn’t reproduce that with master branch. I did the test as follow: sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.13.1 hive/console scala sql(“SELECT user_id FROM actions where conversion_aciton_id=20141210”) sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.12.0 hive/console scala sql(“SELECT user_id FROM actions where conversion_aciton_id=20141210”) *From:* Yana Kadiyska [mailto:yana.kadiy...@gmail.com] *Sent:* Wednesday, January 14, 2015 11:12 PM *To:* Pala M Muthaia *Cc:* user@spark.apache.org *Subject:* Re: Issues with constants in Spark HiveQL queries Just a guess but what is the type of conversion_aciton_id? I do queries over an epoch all the time with no issues(where epoch's type is bigint). You can see the source here https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala -- not sure what ASTNode type: 291 but it sounds like it's not considered numeric? If it's a string it should be conversion_aciton_id=*'*20141210*' *(single quotes around the string) On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some simple queries successfully, but we hit the following issue whenever we attempt to use a constant in the query predicate. It seems like an issue with parsing constant. Query: SELECT user_id FROM actions where conversion_aciton_id=20141210 Error: scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 : 20141210 Any ideas? This seems very basic, so we may be missing something basic, but i haven't figured out what it is. --- Full shell output below: scala sqlContext.sql(SELECT user_id FROM actions where conversion_aciton_id=20141210) 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed java.lang.RuntimeException: Unsupported language features in query: SELECT user_id FROM actions where conversion_aciton_id=20141210 TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME actions TOK_INSERT TOK_DESTINATION TOK_DIR TOK_TMP_FILE TOK_SELECT TOK_SELEXPR TOK_TABLE_OR_COL user_id TOK_WHERE = TOK_TABLE_OR_COL conversion_aciton_id 20141210 scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210
Re: Issues with constants in Spark HiveQL queries
Yes we tried the master branch (sometime last week) and there was no issue, but the above repro is for branch 1.2 and Hive 0.13. Isn't that the final release branch for Spark 1.2? If so, a patch needs to be created or back-ported from master? (Yes the obvious typo in the column name was introduced in this email only, so is irrelevant to the error). On Wed, Jan 14, 2015 at 5:52 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: yeah, that makes sense. Pala, are you on a prebuild version of Spark -- I just tried the CDH4 prebuilt...Here is what I get for the = token: [image: Inline image 1] The literal type shows as 290, not 291, and 290 is numeric. According to this http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hive/hive-exec/0.13.1/org/apache/hadoop/hive/ql/parse/HiveParser.java#HiveParser 291 is token PLUS which is really weird... On Wed, Jan 14, 2015 at 7:47 PM, Cheng, Hao hao.ch...@intel.com wrote: The log showed it failed in parsing, so the typo stuff shouldn’t be the root cause. BUT I couldn’t reproduce that with master branch. I did the test as follow: sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.13.1 hive/console scala sql(“SELECT user_id FROM actions where conversion_aciton_id=20141210”) sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.12.0 hive/console scala sql(“SELECT user_id FROM actions where conversion_aciton_id=20141210”) *From:* Yana Kadiyska [mailto:yana.kadiy...@gmail.com] *Sent:* Wednesday, January 14, 2015 11:12 PM *To:* Pala M Muthaia *Cc:* user@spark.apache.org *Subject:* Re: Issues with constants in Spark HiveQL queries Just a guess but what is the type of conversion_aciton_id? I do queries over an epoch all the time with no issues(where epoch's type is bigint). You can see the source here https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala -- not sure what ASTNode type: 291 but it sounds like it's not considered numeric? If it's a string it should be conversion_aciton_id=*'*20141210*' *(single quotes around the string) On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some simple queries successfully, but we hit the following issue whenever we attempt to use a constant in the query predicate. It seems like an issue with parsing constant. Query: SELECT user_id FROM actions where conversion_aciton_id=20141210 Error: scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 : 20141210 Any ideas? This seems very basic, so we may be missing something basic, but i haven't figured out what it is. --- Full shell output below: scala sqlContext.sql(SELECT user_id FROM actions where conversion_aciton_id=20141210) 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed java.lang.RuntimeException: Unsupported language features in query: SELECT user_id FROM actions where conversion_aciton_id=20141210 TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME actions TOK_INSERT TOK_DESTINATION TOK_DIR TOK_TMP_FILE TOK_SELECT TOK_SELEXPR TOK_TABLE_OR_COL user_id TOK_WHERE = TOK_TABLE_OR_COL conversion_aciton_id 20141210 scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 : 20141210 + org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1110) at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:251) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply
Re: Issues with constants in Spark HiveQL queries
conversion_action_id is a int. We also tried a string column predicate with single quotes string value and hit the same error stack. On Wed, Jan 14, 2015 at 7:11 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Just a guess but what is the type of conversion_aciton_id? I do queries over an epoch all the time with no issues(where epoch's type is bigint). You can see the source here https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala -- not sure what ASTNode type: 291 but it sounds like it's not considered numeric? If it's a string it should be conversion_aciton_id=*'*20141210*' *(single quotes around the string) On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some simple queries successfully, but we hit the following issue whenever we attempt to use a constant in the query predicate. It seems like an issue with parsing constant. Query: SELECT user_id FROM actions where conversion_aciton_id=20141210 Error: scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 : 20141210 Any ideas? This seems very basic, so we may be missing something basic, but i haven't figured out what it is. --- Full shell output below: scala sqlContext.sql(SELECT user_id FROM actions where conversion_aciton_id=20141210) 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM actions where conversion_aciton_id=20141210 15/01/13 16:55:54 INFO ParseDriver: Parse Completed java.lang.RuntimeException: Unsupported language features in query: SELECT user_id FROM actions where conversion_aciton_id=20141210 TOK_QUERY TOK_FROM TOK_TABREF TOK_TABNAME actions TOK_INSERT TOK_DESTINATION TOK_DIR TOK_TMP_FILE TOK_SELECT TOK_SELEXPR TOK_TABLE_OR_COL user_id TOK_WHERE = TOK_TABLE_OR_COL conversion_aciton_id 20141210 scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 : 20141210 + org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1110) at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:251) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:133) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:133) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242
Re: OOM exception during row deserialization
Does anybody have insight on this? Thanks. On Fri, Jan 9, 2015 at 6:30 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during a join step. Basically, i have a RDD of rows, that i am joining with another RDD of tuples. Some of the tasks succeed but a fair number failed with OOM exception with stack below. The stack belongs to the 'reducer' that is reading shuffle output from the 'mapper'. My question is what's the object being deserialized here - just a portion of an RDD or the whole RDD partition assigned to current reducer? The rows in the RDD could be large, but definitely not something that would run to 100s of MBs in size, and thus run out of memory. Also, is there a way to determine size of the object being deserialized that results in the error (either by looking at some staging hdfs dir or logs)? java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded} java.util.Arrays.copyOf(Arrays.java:2367) java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535) java.lang.StringBuilder.append(StringBuilder.java:204) java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142) java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050) java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863) java.io.ObjectInputStream.readString(ObjectInputStream.java:1636) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) java.util.ArrayList.readObject(ArrayList.java:771) sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031) Thanks, pala
Broadcast joins on RDD
Hi, How do i do broadcast/map join on RDDs? I have a large RDD that i want to inner join with a small RDD. Instead of having the large RDD repartitioned and shuffled for join, i would rather send a copy of a small RDD to each task, and then perform the join locally. How would i specify this in Spark code? I didn't find much documentation online. I attempted to create a broadcast variable out of the small RDD and then access that in the join operator: largeRdd.join(smallRddBroadCastVar.value) but that didn't work as expected ( I found that all rows with same key were on same task) I am using Spark version 1.0.1 Thanks, pala
OOM exception during row deserialization
Hi, I am using Spark 1.0.1. I am trying to debug a OOM exception i saw during a join step. Basically, i have a RDD of rows, that i am joining with another RDD of tuples. Some of the tasks succeed but a fair number failed with OOM exception with stack below. The stack belongs to the 'reducer' that is reading shuffle output from the 'mapper'. My question is what's the object being deserialized here - just a portion of an RDD or the whole RDD partition assigned to current reducer? The rows in the RDD could be large, but definitely not something that would run to 100s of MBs in size, and thus run out of memory. Also, is there a way to determine size of the object being deserialized that results in the error (either by looking at some staging hdfs dir or logs)? java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded} java.util.Arrays.copyOf(Arrays.java:2367) java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:535) java.lang.StringBuilder.append(StringBuilder.java:204) java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3142) java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3050) java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2863) java.io.ObjectInputStream.readString(ObjectInputStream.java:1636) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1339) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) java.util.ArrayList.readObject(ArrayList.java:771) sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031) Thanks, pala
Re: Executor memory
Thanks for the clarifications. I misunderstood what the number on UI meant. On Mon, Dec 15, 2014 at 7:00 PM, Sean Owen so...@cloudera.com wrote: I believe this corresponds to the 0.6 of the whole heap that is allocated for caching partitions. See spark.storage.memoryFraction on http://spark.apache.org/docs/latest/configuration.html 0.6 of 4GB is about 2.3GB. The note there is important, that you probably don't want to exceed the JVM old generation size with this parameter. On Tue, Dec 16, 2014 at 12:53 AM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, Running Spark 1.0.1 on Yarn 2.5 When i specify --executor-memory 4g, the spark UI shows each executor as having only 2.3 GB, and similarly for 8g, only 4.6 GB. I am guessing that the executor memory corresponds to the container memory, and that the task JVM gets only a percentage of the container total memory. Is there a yarn or spark parameter to tune this so that my task JVM actually gets 6GB out of the 8GB for example? Thanks.
Re: Lost executors
Just to close the loop, it seems no issues pop up when i submit the job using 'spark submit' so that the driver process also runs on a container in the YARN cluster. In the above, the driver was running on the gateway machine through which the job was submitted, which led to quite a few issues. On Tue, Nov 18, 2014 at 5:01 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Sandy, Good point - i forgot about NM logs. When i looked up the NM logs, i only see the following statements that align with the driver side log about lost executor. Many executors show the same log statement at the same time, so it seems like the decision to kill many if not all executors happened centrally, and all executors got notified somehow: 14/11/18 00:18:25 INFO Executor: Executor is trying to kill task 2013 14/11/18 00:18:25 INFO Executor: Executor killed task 2013 In general, i also see quite a few instances of the following exception across many executors/nodes. : 14/11/17 23:58:00 INFO HadoopRDD: Input split: hdfs dir path/sorted_keys-1020_3-r-00255.deflate:0+415841 14/11/17 23:58:00 WARN BlockReaderLocal: error creating DomainSocket java.net.ConnectException: connect(2) error: Connection refused when trying to connect to '/srv/var/hadoop/runs/hdfs/dn_socket' at org.apache.hadoop.net.unix.DomainSocket.connect0(Native Method) at org.apache.hadoop.net.unix.DomainSocket.connect(DomainSocket.java:250) at org.apache.hadoop.hdfs.DomainSocketFactory.createSocket(DomainSocketFactory.java:158) at org.apache.hadoop.hdfs.BlockReaderFactory.nextDomainPeer(BlockReaderFactory.java:721) at org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:441) at org.apache.hadoop.hdfs.client.ShortCircuitCache.create(ShortCircuitCache.java:780) at org.apache.hadoop.hdfs.client.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:714) at org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:395) at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:303) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:567) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:790) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837) at java.io.DataInputStream.read(DataInputStream.java:149) at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159) at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:201) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:184) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.isEmpty(Iterator.scala:256) at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157) at $line57.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:51) at $line57.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:50) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262
Lost executors
Hi, I am using Spark 1.0.1 on Yarn 2.5, and doing everything through spark shell. I am running a job that essentially reads a bunch of HBase keys, looks up HBase data, and performs some filtering and aggregation. The job works fine in smaller datasets, but when i try to execute on the full dataset, the job never completes. The few symptoms i notice are: a. The job shows progress for a while and then starts throwing lots of the following errors: 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] INFO org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - *Executor 906 disconnected, so removing it* 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] ERROR org.apache.spark.scheduler.cluster.YarnClientClusterScheduler - *Lost executor 906 on machine name: remote Akka client disassociated* 2014-11-18 16:52:02,283 [spark-akka.actor.default-dispatcher-22] WARN org.apache.spark.storage.BlockManagerMasterActor - *Removing BlockManager BlockManagerId(9186, machine name, 54600, 0) with no recent heart beats: 82313ms exceeds 45000ms* Looking at the logs, the job never recovers from these errors, and continues to show errors about lost executors and launching new executors, and this just continues for a long time. Could this be because the executors are running out of memory? In terms of memory usage, the intermediate data could be large (after the HBase lookup), but partial and fully aggregated data set size should be quite small - essentially a bunch of ids and counts ( 1 mil in total). b. In the Spark UI, i am seeing the following errors (redacted for brevity), not sure if they are transient or real issue: java.net.SocketTimeoutException (java.net.SocketTimeoutException: Read timed out} ... org.apache.spark.util.Utils$.fetchFile(Utils.scala:349) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ... java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:724) I was trying to get more data to investigate but haven't been able to figure out how to enable logging on the executors. The Spark UI appears stuck and i only see driver side logs in the jobhistory directory specified in the job. Thanks, pala
Re: Lost executors
) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) 14/11/17 23:58:00 WARN ShortCircuitCache: ShortCircuitCache(0x71a8053d): failed to load 1276010498_BP-1416824317-172.22.48.2-1387241776581 However, in some of the nodes, it seems execution proceeded after the error, so the above could just be a transient error. Finally, in the driver logs, i was looking for hint on the decision to kill many executors, around the 00:18:25 timestamp when many tasks were killed across many executors, but i didn't find anything different. On Tue, Nov 18, 2014 at 1:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Pala, Do you have access to your YARN NodeManager logs? Are you able to check whether they report killing any containers for exceeding memory limits? -Sandy On Tue, Nov 18, 2014 at 1:54 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am using Spark 1.0.1 on Yarn 2.5, and doing everything through spark shell. I am running a job that essentially reads a bunch of HBase keys, looks up HBase data, and performs some filtering and aggregation. The job works fine in smaller datasets, but when i try to execute on the full dataset, the job never completes. The few symptoms i notice are: a. The job shows progress for a while and then starts throwing lots of the following errors: 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] INFO org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - *Executor 906 disconnected, so removing it* 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] ERROR org.apache.spark.scheduler.cluster.YarnClientClusterScheduler - *Lost executor 906 on machine name: remote Akka client disassociated* 2014-11-18 16:52:02,283 [spark-akka.actor.default-dispatcher-22] WARN org.apache.spark.storage.BlockManagerMasterActor - *Removing BlockManager BlockManagerId(9186, machine name, 54600, 0) with no recent heart beats: 82313ms exceeds 45000ms* Looking at the logs, the job never recovers from these errors, and continues to show errors about lost executors and launching new executors, and this just continues for a long time. Could this be because the executors are running out of memory? In terms of memory usage, the intermediate data could be large (after the HBase lookup), but partial and fully aggregated data set size should be quite small - essentially a bunch of ids and counts ( 1 mil in total). b. In the Spark UI, i am seeing the following errors (redacted for brevity), not sure if they are transient or real issue: java.net.SocketTimeoutException (java.net.SocketTimeoutException: Read timed out} ... org.apache.spark.util.Utils$.fetchFile(Utils.scala:349) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ... java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:724) I was trying to get more data to investigate but haven't been able to figure out how to enable logging on the executors. The Spark UI appears stuck and i only see driver side logs in the jobhistory directory specified in the job. Thanks, pala
Re: Assigning input files to spark partitions
Hi Daniel, Yes that should work also. However, is it possible to setup so that each RDD has exactly one partition, without repartitioning (and thus incurring extra cost)? Is there a mechanism similar to MR where we can ensure each partition is assigned some amount of data by size, by setting some block size parameter? On Thu, Nov 13, 2014 at 1:05 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: On Thu, Nov 13, 2014 at 3:24 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote No i don't want separate RDD because each of these partitions are being processed the same way (in my case, each partition corresponds to HBase keys belonging to one region server, and i will do HBase lookups). After that i have aggregations too, hence all these partitions should be in the same RDD. The reason to follow the partition structure is to limit concurrent HBase lookups targeting a single region server. Neither of these is necessarily a barrier to using separate RDDs. You can define the function you want to use and then pass it to multiple map methods. Then you could union all the RDDs to do your aggregations. For example, it might look something like this: val paths: String = ... // the paths to the files you want to load def myFunc(t: T) = ... // the function to apply to every RDD val rdds = paths.map { path = sc.textFile(path).map(myFunc) } val completeRdd = sc.union(rdds) Does that make any sense? -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: Assigning input files to spark partitions
Thanks for the responses Daniel and Rishi. No i don't want separate RDD because each of these partitions are being processed the same way (in my case, each partition corresponds to HBase keys belonging to one region server, and i will do HBase lookups). After that i have aggregations too, hence all these partitions should be in the same RDD. The reason to follow the partition structure is to limit concurrent HBase lookups targeting a single region server. Not sure what the block size is here (HDFS block size?), but my files may get large over time, so cannot depend on block size assumption. That said, from your description, it seems like i don't have to worry too much because Spark does assign files to partitions while maintaining 'locality' (i.e. a given file's data would fit in ceil(filesize/blocksize) partitions, as opposed to spread across numerous partitions). Yes, i saw the wholeTextFile(), it won't apply in my case because input file size can be quite large. On Thu, Nov 13, 2014 at 8:04 AM, Daniel Siegmann daniel.siegm...@velos.io wrote: I believe Rishi is correct. I wouldn't rely on that though - all it would take is for one file to exceed the block size and you'd be setting yourself up for pain. Also, if your files are small - small enough to fit in a single record - you could use SparkContext.wholeTextFile. On Thu, Nov 13, 2014 at 10:11 AM, Rishi Yadav ri...@infoobjects.com wrote: If your data is in hdfs and you are reading as textFile and each file is less than block size, my understanding is it would always have one partition per file. On Thursday, November 13, 2014, Daniel Siegmann daniel.siegm...@velos.io wrote: Would it make sense to read each file in as a separate RDD? This way you would be guaranteed the data is partitioned as you expected. Possibly you could then repartition each of those RDDs into a single partition and then union them. I think that would achieve what you expect. But it would be easy to accidentally screw this up (have some operation that causes a shuffle), so I think you're better off just leaving them as separate RDDs. On Wed, Nov 12, 2014 at 10:27 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I have a set of input files for a spark program, with each file corresponding to a logical data partition. What is the API/mechanism to assign each input file (or a set of files) to a spark partition, when initializing RDDs? When i create a spark RDD pointing to the directory of files, my understanding is it's not guaranteed that each input file will be treated as separate partition. My job semantics require that the data is partitioned, and i want to leverage the partitioning that has already been done, rather than repartitioning again in the spark job. I tried to lookup online but haven't found any pointers so far. Thanks pala -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io -- - Rishi -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Assigning input files to spark partitions
Hi, I have a set of input files for a spark program, with each file corresponding to a logical data partition. What is the API/mechanism to assign each input file (or a set of files) to a spark partition, when initializing RDDs? When i create a spark RDD pointing to the directory of files, my understanding is it's not guaranteed that each input file will be treated as separate partition. My job semantics require that the data is partitioned, and i want to leverage the partitioning that has already been done, rather than repartitioning again in the spark job. I tried to lookup online but haven't found any pointers so far. Thanks pala
Re: Cannot instantiate hive context
Thanks Akhil. I realized that earlier, and i thought mvn -Phive should have captured and included all these dependencies. In any case, i proceeded with that, included other such dependencies that were missing, and finally hit the guava version mismatch issue. (Spark with Guava 14 vs Hadoop/Hive with Guava 11). There are 2 parts: 1. Spark includes Guava library within its jars and that may conflict with Hadoop/Hive components depending on older version of the library. It seems this has been solved with SPARK-2848 https://issues.apache.org/jira/browse/SPARK-2848 patch to shade the Guava libraries. 2. Spark actually uses interfaces from newer version of Guava library, that needs to be rewritten to use older version (i.e. downgrade Spark dependency on Guava). I wasn't able to find the related patches (I need them since i am on Spark 1.0.1). Applying patch for #1 above, i still hit the following error: 14/11/03 15:01:32 WARN storage.BlockManager: Putting block broadcast_0 failed java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) stack continues I haven't been able to find the other patches that actually downgrade the dependency. Please point me to those patches, or any other ideas about fixing these dependency issues. Thanks. On Sun, Nov 2, 2014 at 8:41 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Adding the libthrift jar http://mvnrepository.com/artifact/org.apache.thrift/libthrift/0.9.0 in the class path would resolve this issue. Thanks Best Regards On Sat, Nov 1, 2014 at 12:34 AM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am trying to load hive datasets using HiveContext, in spark shell. Spark ver 1.0.1 and Hive ver 0.12. We are trying to get Spark work with hive datasets. I already have existing Spark deployment. Following is what i did on top of that: 1. Build spark using 'mvn -Pyarn,hive -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package' 2. Copy over spark-assembly-1.0.1-hadoop2.4.0.jar into spark deployment directory. 3. Launch spark-shell with the spark hive jar included in the list. When i execute *'* *val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)* i get the following error stack: java.lang.NoClassDefFoundError: org/apache/thrift/TBase at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:792) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.thrift.TBase at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 55 more I thought that building with -Phive option should include all the necessary hive packages into the assembly jar (according to here https://spark.apache.org/docs/1.0.1/sql-programming-guide.html#hive-tables). I tried searching online and in this mailing list archive but haven't found any instructions on how to get this working. I know that there is additional step of updating the assembly jar across the whole cluster, not just client side, but right now, even the client is not working. Would appreciate instructions (or link to them) on how to get this working end-to-end. Thanks, pala