need info on Spark submit on yarn-cluster mode
Hi , I observed that we have installed only one cluster, and submiting job as yarn-cluster then getting below error, so is this cause that installation is only one cluster? Please correct me, if this is not cause then why I am not able to run in cluster mode, spark submit command is - spark-submit --jars some dependent jars... --master yarn --class com.java.jobs.sparkAggregation mytest-1.0.0.jar 2015-04-08 19:16:50 INFO Client - Application report for application_1427895906171_0087 (state: FAILED) 2015-04-08 19:16:50 DEBUG Client - client token: N/A diagnostics: Application application_1427895906171_0087 failed 2 times due to AM Container for appattempt_1427895906171_0087_02 exited with exitCode: 15 due to: Exception from container-launch. Container id: container_1427895906171_0087_02_01 Exit code: 15 Stack trace: ExitCodeException exitCode=15: at org.apache.hadoop.util.Shell.runCommand(Shell.java:538) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:197) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Container exited with a non-zero exit code 15 .Failing this attempt.. Failing the application. ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: root.hdfs start time: 1428500770818 final status: FAILED Exception in thread main org.apache.spark.SparkException: Application finished with failed status at org.apache.spark.deploy.yarn.ClientBase$class.run(ClientBase.scala:509) at org.apache.spark.deploy.yarn.Client.run(Client.scala:35) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:139) at org.apache.spark.deploy.yarn.Client.main(Client.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/need-info-on-Spark-submit-on-yarn-cluster-mode-tp22420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Need subscription process
Check your spam or any filter, On Wed, Apr 8, 2015 at 2:17 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All how can I subscribe myself in this group so that every mail sent to this group comes to me as well. I already sent request to user-subscr...@spark.apache.org ,still Iam not getting mail sent to this group by other persons. Regards Jeetendra -- Deepak
partition by category
Hi folks, I am writing to ask how to filter and partition a set of files thru Spark. The situation is that I have N big files (cannot fit into single machine). And each line of files starts with a category (say Sport, Food, etc), while only have less than 100 categories actually. I need a program to scan the file set and aggregate each line by category and save them separately in different folders with right partition. For instance, I want the program to generate a Sport folder which contains all lines of data with category sport. Also not like to put all things into a single file which might be too big. Any ideas how to implement this logic efficiently by Spark? I believe groupBy is not acceptable since even all data belongs to a single category is too big to fit into a single machine. RegardsYunsima
Spark Tasks failing with Cannot find address
I have a spark stage that has 8 tasks. 7/8 have completed. However 1 task is failing with Cannot find address Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)19CANNOT FIND ADDRESS24 min1101248.9 MB / 561940060.0 B / 00.0 B0.0 B 47CANNOT FIND ADDRESS2.3 h1101295.3 MB / 562020370.0 B / 00.0 B0.0 B Any suggestions ? -- Deepak
Need subscription process
Hi All how can I subscribe myself in this group so that every mail sent to this group comes to me as well. I already sent request to user-subscr...@spark.apache.org ,still Iam not getting mail sent to this group by other persons. Regards Jeetendra
Re: Parquet Hive table become very slow on 1.3?
Hi Cheng, I tried both these patches, and seems still not resolve my issue. And I found the most time is spend on this line in newParquet.scala: ParquetFileReader.readAllFootersInParallel( sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData) Which need read all the files under the Parquet folder, while our Parquet folder has a lot of Parquet files (near 2000), read one file need about 2 seconds, so it become very slow ... And the PR 5231 did not skip this steps so it not resolve my issue. As our Parquet files are generated by a Spark job, so the number of .parquet files is same with the number of tasks, that is why we have so many files. But these files actually have the same schema. Is there any way to merge these files into one, or avoid scan each of them? On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Xudong, We had been digging this issue for a while, and believe PR 5339 http://github.com/apache/spark/pull/5339 and PR 5334 http://github.com/apache/spark/pull/5339 should fix this issue. There two problems: 1. Normally we cache Parquet table metadata for better performance, but when converting Hive metastore Hive tables, the cache is not used. Thus heavy operations like schema discovery is done every time a metastore Parquet table is converted. 2. With Parquet task side metadata reading (which is turned on by default), we can actually skip the row group information in the footer. However, we accidentally called a Parquet function which doesn't skip row group information. For your question about schema merging, Parquet allows different part-files have different but compatible schemas. For example, part-1.parquet has columns a and b, while part-2.parquet may has columns a and c. In some cases, the summary files (_metadata and _common_metadata) contains the merged schema (a, b, and c), but it's not guaranteed. For example, when the user defined metadata stored different part-files contain different values for the same key, Parquet simply gives up writing summary files. That's why all part-files must be touched to get a precise merged schema. However, in scenarios where a centralized arbitrative schema is available (e.g. Hive metastore schema, or the schema provided by user via data source DDL), we don't need to do schema merging on driver side, but defer it to executor side and each task only needs to reconcile those part-files it needs to touch. This is also what the Parquet developers did recently for parquet-hadoop https://github.com/apache/incubator-parquet-mr/pull/45. Cheng On 3/31/15 11:49 PM, Zheng, Xudong wrote: Thanks Cheng! Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues, but the PR 5231 seems not. Not sure any other things I did wrong ... BTW, actually, we are very interested in the schema merging feature in Spark 1.3, so both these two solution will disable this feature, right? It seems that Parquet metadata is store in a file named _metadata in the Parquet file folder (each folder is a partition as we use partition table), why we need scan all Parquet part files? Is there any other solutions could keep schema merging feature at the same time? We are really like this feature :) On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com wrote: Hi Xudong, This is probably because of Parquet schema merging is turned on by default. This is generally useful for Parquet files with different but compatible schemas. But it needs to read metadata from all Parquet part-files. This can be problematic when reading Parquet files with lots of part-files, especially when the user doesn't need schema merging. This issue is tracked by SPARK-6575, and here is a PR for it: https://github.com/apache/spark/pull/5231. This PR adds a configuration to disable schema merging by default when doing Hive metastore Parquet table conversion. Another workaround is to fallback to the old Parquet code by setting spark.sql.parquet.useDataSourceApi to false. Cheng On 3/31/15 2:47 PM, Zheng, Xudong wrote: Hi all, We are using Parquet Hive table, and we are upgrading to Spark 1.3. But we find that, just a simple COUNT(*) query will much slower (100x) than Spark 1.2. I find the most time spent on driver to get HDFS blocks. I find large amount of get below logs printed: 15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{ fileLength=77153436 underConstruction=false blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}] lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0;
Re: RDD collect hangs on large input data
I use EMR 3.3.1 which comes with Java 7. Do you think that this may cause the issue? Did you test it with Java 8?
RE: Difference between textFile Vs hadoopFile (textInoutFormat) on HDFS data
Thanks From: Nick Pentreath [mailto:nick.pentre...@gmail.com] Sent: Tuesday, April 07, 2015 5:52 PM To: Puneet Kumar Ojha Cc: user@spark.apache.org Subject: Re: Difference between textFile Vs hadoopFile (textInoutFormat) on HDFS data There is no difference - textFile calls hadoopFile with a TextInputFormat, and maps each value to a String. — Sent from Mailboxhttps://www.dropbox.com/mailbox On Tue, Apr 7, 2015 at 1:46 PM, Puneet Kumar Ojha puneet.ku...@pubmatic.commailto:puneet.ku...@pubmatic.com wrote: Hi , Is there any difference between Difference between textFile Vs hadoopFile (textInoutFormat) when data is present in HDFS? Will there be any performance gain that can be observed? Puneet Kumar Ojha Data Architect | PubMatichttp://www.pubmatic.com/
Re: Spark Tasks failing with Cannot find address
Spark Version 1.3 Command: ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-company-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-company/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-company/share/hadoop/hdfs/hadoop-hdfs-2.4.1-company-2.jar --num-executors 100 --driver-memory 4g --driver-java-options -XX:MaxPermSize=4G --executor-memory 8g --executor-cores 1 --queue hdmi-express --class com. company.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-6 endDate=2015-04-7 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem On Wed, Apr 8, 2015 at 2:30 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have a spark stage that has 8 tasks. 7/8 have completed. However 1 task is failing with Cannot find address Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)19CANNOT FIND ADDRESS24 min1101248.9 MB / 561940060.0 B / 00.0 B0.0 B 47CANNOT FIND ADDRESS2.3 h1101295.3 MB / 562020370.0 B / 00.0 B0.0 B Any suggestions ? -- Deepak -- Deepak
Re: The differentce between SparkSql/DataFram join and Rdd join
Hi Michael, In fact, I find that all workers are hanging when SQL/DF join is running. So I picked the master and one of the workers. jstack is the following: Master 2015-04-08 09:08:22 Full thread dump OpenJDK 64-Bit Server VM (24.65-b04 mixed mode): Thread-3 prio=10 tid=0x7fdbe0013000 nid=0x2836 runnable [0x7fdd0cd89000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:272) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) - locked 0x7fed62503a00 (a java.lang.UNIXProcess$ProcessPipeInputStream) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) - locked 0x7fed95527048 (a java.io.InputStreamReader) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.readLine(BufferedReader.java:317) - locked 0x7fed95527048 (a java.io.InputStreamReader) at java.io.BufferedReader.readLine(BufferedReader.java:382) at sbt.BasicIO$$anonfun$processFully$1$$anonfun$apply$8.apply(ProcessImpl.scala:58) at sbt.BasicIO$$anonfun$processFully$1$$anonfun$apply$8.apply(ProcessImpl.scala:58) at sbt.BasicIO$.readFully$1(ProcessImpl.scala:63) at sbt.BasicIO$.processLinesFully(ProcessImpl.scala:69) at sbt.BasicIO$$anonfun$processFully$1.apply(ProcessImpl.scala:58) at sbt.BasicIO$$anonfun$processFully$1.apply(ProcessImpl.scala:55) at sbt.SimpleProcessBuilder$$anonfun$4.apply$mcV$sp(ProcessImpl.scala:357) at sbt.Spawn$$anon$3.run(ProcessImpl.scala:17) Thread-2 prio=10 tid=0x7fdbe001 nid=0x2835 runnable [0x7fdd0d995000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:272) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) - locked 0x7fed625018b8 (a java.lang.UNIXProcess$ProcessPipeInputStream) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) - locked 0x7fed8cc9d8b8 (a java.io.InputStreamReader) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.readLine(BufferedReader.java:317) - locked 0x7fed8cc9d8b8 (a java.io.InputStreamReader) at java.io.BufferedReader.readLine(BufferedReader.java:382) at sbt.BasicIO$$anonfun$processFully$1$$anonfun$apply$8.apply(ProcessImpl.scala:58) at sbt.BasicIO$$anonfun$processFully$1$$anonfun$apply$8.apply(ProcessImpl.scala:58) at sbt.BasicIO$.readFully$1(ProcessImpl.scala:63) at sbt.BasicIO$.processLinesFully(ProcessImpl.scala:69) at sbt.BasicIO$$anonfun$processFully$1.apply(ProcessImpl.scala:58) at sbt.BasicIO$$anonfun$processFully$1.apply(ProcessImpl.scala:55) at sbt.SimpleProcessBuilder$$anonfun$3.apply$mcV$sp(ProcessImpl.scala:354) at sbt.Spawn$$anon$3.run(ProcessImpl.scala:17) pool-5-thread-5 prio=10 tid=0x7ff16034a000 nid=0x2832 waiting on condition [0x7fdd0da96000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7feacd0621d0 (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359) at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) pool-5-thread-4 prio=10 tid=0x7ff160349800 nid=0x2831 waiting on condition [0x7fdd0d894000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7feacd0621d0 (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359) at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) at
Re: need info on Spark submit on yarn-cluster mode
This means the spark workers exited with code 15; probably nothing YARN related itself (unless there are classpath-related problems). Have a look at the logs of the app/container via the resource manager. You can also increase the time that logs get kept on the nodes themselves to something like 10 minutes or longer property nameyarn.nodemanager.delete.debug-delay-sec/name value600/value /property On 8 Apr 2015, at 07:24, sachin Singh sachin.sha...@gmail.com wrote: Hi , I observed that we have installed only one cluster, and submiting job as yarn-cluster then getting below error, so is this cause that installation is only one cluster? Please correct me, if this is not cause then why I am not able to run in cluster mode, spark submit command is - spark-submit --jars some dependent jars... --master yarn --class com.java.jobs.sparkAggregation mytest-1.0.0.jar 2015-04-08 19:16:50 INFO Client - Application report for application_1427895906171_0087 (state: FAILED) 2015-04-08 19:16:50 DEBUG Client - client token: N/A diagnostics: Application application_1427895906171_0087 failed 2 times due to AM Container for appattempt_1427895906171_0087_02 exited with exitCode: 15 due to: Exception from container-launch. Container id: container_1427895906171_0087_02_01 Exit code: 15 Stack trace: ExitCodeException exitCode=15: at org.apache.hadoop.util.Shell.runCommand(Shell.java:538) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:197) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Container exited with a non-zero exit code 15 .Failing this attempt.. Failing the application. ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: root.hdfs start time: 1428500770818 final status: FAILED Exception in thread main org.apache.spark.SparkException: Application finished with failed status at org.apache.spark.deploy.yarn.ClientBase$class.run(ClientBase.scala:509) at org.apache.spark.deploy.yarn.Client.run(Client.scala:35) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:139) at org.apache.spark.deploy.yarn.Client.main(Client.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/need-info-on-Spark-submit-on-yarn-cluster-mode-tp22420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue with pyspark 1.3.0, sql package and rows
I will look into this today. On Wed, Apr 8, 2015 at 7:35 AM, Stefano Parmesan parme...@spaziodati.eu wrote: Did anybody by any chance had a look at this bug? It keeps on happening to me, and it's quite blocking, I would like to understand if there's something wrong in what I'm doing, or whether there's a workaround or not. Thank you all, -- Dott. Stefano Parmesan Backend Web Developer and Data Lover ~ SpazioDati s.r.l. Via Adriano Olivetti, 13 – 4th floor Le Albere district – 38122 Trento – Italy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-pyspark-1-3-0-sql-package-and-rows-tp22405p22423.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Exception in thread main java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds] when create context
Hi All, In some cases, I have below exception when I run spark in local mode (I haven't see this in a cluster). This is weird but also affect my local unit test case (it is not always happen, but usually one per 4-5 times run). From the stack, looks like error happen when create the context, but I don't know why and what kind of parameters that I can set to solve this issue. Exception in thread main java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223 ) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockConte xt.scala:53) at scala.concurrent.Await$.result(package.scala:107) at akka.remote.Remoting.start(Remoting.scala:180) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala: 184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doC reateActorSystem(AkkaUtils.scala:122) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$ sp(Utils.scala:1832) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1823) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57 ) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:223) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267) at org.apache.spark.SparkContext.init(SparkContext.scala:270) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.sc ala:61) at com.***.executor.FinancialEngineExecutor.run(F inancialEngineExecutor.java:110) Regards, Shuai
[ThriftServer] User permissions warning
Hi folks, I am noticing a pesky and persistent warning in my logs (this is from Spark 1.2.1): 15/04/08 15:23:05 WARN ShellBasedUnixGroupsMapping: got exception trying to get groups for user anonymous org.apache.hadoop.util.Shell$ExitCodeException: id: anonymous: No such user at org.apache.hadoop.util.Shell.runCommand(Shell.java:261) at org.apache.hadoop.util.Shell.run(Shell.java:188) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:381) at org.apache.hadoop.util.Shell.execCommand(Shell.java:467) at org.apache.hadoop.util.Shell.execCommand(Shell.java:450) at org.apache.hadoop.security.ShellBasedUnixGroupsMapping.getUnixGroups(ShellBasedUnixGroupsMapping.java:86) at org.apache.hadoop.security.ShellBasedUnixGroupsMapping.getGroups(ShellBasedUnixGroupsMapping.java:55) at org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.getGroups(JniBasedUnixGroupsMappingWithFallback.java:50) at org.apache.hadoop.security.Groups.getGroups(Groups.java:89) at org.apache.hadoop.security.UserGroupInformation.getGroupNames(UserGroupInformation.java:1292) at org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator.setConf(HadoopDefaultAuthenticator.java:62) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:70) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130) at org.apache.hadoop.hive.ql.metadata.HiveUtils.getAuthenticator(HiveUtils.java:365) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:278) I cannot figure out what I might be missing -- the thrift server is started via sbin/start-thriftserver --master ..., I can see that the process is running under my user. I don't have any functional issues but this is annoying (filling up my logs/making it heard to read). Can someone give me pointers on what to check? Things I've tried: 1. hive.server2.enable.doAs is NOT set in hive-site.xml so I expect user should at least show up as my id, not anonymous 2.export HADOOP_USER_NAME=someusername -- error still shows up about anonymous Curious if anyone has solved this
RE: Advice using Spark SQL and Thrift JDBC Server
+1 Interestingly, I ran into the exactly the same issue yesterday. I couldn’t find any documentation about which project to include as a dependency in build.sbt to use HiveThriftServer2. Would appreciate help. Mohammed From: Todd Nist [mailto:tsind...@gmail.com] Sent: Wednesday, April 8, 2015 5:49 AM To: James Aley Cc: Michael Armbrust; user Subject: Re: Advice using Spark SQL and Thrift JDBC Server To use the HiveThriftServer2.startWithContext, I thought one would use the following artifact in the build: org.apache.spark%% spark-hive-thriftserver % 1.3.0 But I am unable to resolve the artifact. I do not see it in maven central or any other repo. Do I need to build Spark and publish locally or just missing something obvious here? Basic class is like this: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveMetastoreTypes._ import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.thriftserver._ object MyThriftServer { val sparkConf = new SparkConf() // master is passed to spark-submit, but could also be specified explicitely // .setMaster(sparkMaster) .setAppName(My ThriftServer) .set(spark.cores.max, 2) val sc = new SparkContext(sparkConf) val sparkContext = sc import sparkContext._ val sqlContext = new HiveContext(sparkContext) import sqlContext._ import sqlContext.implicits._ // register temp tables here HiveThriftServer2.startWithContext(sqlContext) } Build has the following: scalaVersion := 2.10.4 val SPARK_VERSION = 1.3.0 libraryDependencies ++= Seq( org.apache.spark %% spark-streaming-kafka % SPARK_VERSION exclude(org.apache.spark, spark-core_2.10) exclude(org.apache.spark, spark-streaming_2.10) exclude(org.apache.spark, spark-sql_2.10) exclude(javax.jms, jms), org.apache.spark %% spark-core % SPARK_VERSION % provided, org.apache.spark %% spark-streaming % SPARK_VERSION % provided, org.apache.spark %% spark-sql % SPARK_VERSION % provided, org.apache.spark %% spark-hive % SPARK_VERSION % provided, org.apache.spark %% spark-hive-thriftserver % SPARK_VERSION % provided, org.apache.kafka %% kafka % 0.8.1.1 exclude(javax.jms, jms) exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri), joda-time % joda-time % 2.7, log4j % log4j % 1.2.14 exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri) ) Appreciate the assistance. -Todd On Tue, Apr 7, 2015 at 4:09 PM, James Aley james.a...@swiftkey.commailto:james.a...@swiftkey.com wrote: Excellent, thanks for your help, I appreciate your advice! On 7 Apr 2015 20:43, Michael Armbrust mich...@databricks.commailto:mich...@databricks.com wrote: That should totally work. The other option would be to run a persistent metastore that multiple contexts can talk to and periodically run a job that creates missing tables. The trade-off here would be more complexity, but less downtime due to the server restarting. On Tue, Apr 7, 2015 at 12:34 PM, James Aley james.a...@swiftkey.commailto:james.a...@swiftkey.com wrote: Hi Michael, Thanks so much for the reply - that really cleared a lot of things up for me! Let me just check that I've interpreted one of your suggestions for (4) correctly... Would it make sense for me to write a small wrapper app that pulls in hive-thriftserver as a dependency, iterates my Parquet directory structure to discover tables and registers each as a temp table in some context, before calling HiveThriftServer2.createWithContext as you suggest? This would mean that to add new content, all I need to is restart that app, which presumably could also be avoided fairly trivially by periodically restarting the server with a new context internally. That certainly beats manual curation of Hive table definitions, if it will work? Thanks again, James. On 7 April 2015 at 19:30, Michael Armbrust mich...@databricks.commailto:mich...@databricks.com wrote: 1) What exactly is the relationship between the thrift server and Hive? I'm guessing Spark is just making use of the Hive metastore to access table definitions, and maybe some other things, is that the case? Underneath the covers, the Spark SQL thrift server is executing queries using a HiveContext. In this mode, nearly all computation is done with Spark SQL but we try to maintain compatibility with Hive wherever possible. This means that you can write your queries in HiveQL, read tables from the Hive metastore, and use Hive UDFs UDTs UDAFs, etc. The one exception here is Hive DDL operations (CREATE TABLE, etc). These are passed directly to Hive code and executed there. The Spark SQL DDL is sufficiently different that we always try to parse that first, and fall back to Hive when it does not parse. One possibly confusing point here, is that you can persist Spark SQL tables into
Re: Issue with pyspark 1.3.0, sql package and rows
Did anybody by any chance had a look at this bug? It keeps on happening to me, and it's quite blocking, I would like to understand if there's something wrong in what I'm doing, or whether there's a workaround or not. Thank you all, -- Dott. Stefano Parmesan Backend Web Developer and Data Lover ~ SpazioDati s.r.l. Via Adriano Olivetti, 13 – 4th floor Le Albere district – 38122 Trento – Italy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-pyspark-1-3-0-sql-package-and-rows-tp22405p22423.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Opening many Parquet files = slow
You may have seen this thread: http://search-hadoop.com/m/JW1q5SlRpt1 Cheers On Wed, Apr 8, 2015 at 6:15 AM, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi guys *I’ve got:* - 180 days of log data in Parquet. - Each day is stored in a separate folder in S3. - Each day consists of 20-30 Parquet files of 256 MB each. - Spark 1.3 on Amazon EMR This makes approximately 5000 Parquet files with a total size if 1.5 TB. *My code*: val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”) *Problem*: Before the very first stage is started, Spark spends about 25 minutes printing the following: ... 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for reading at position '258305902' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for reading at position '260897108' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading at position '261259189' 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73' for reading … etc It looks like Spark is opening each file, before it actually does any work. This means a delay of 25 minutes when working with Parquet files. Previously, we used LZO files and did not experience this problem. *Bonus info: * This also happens when I use auto partition discovery (i.e. sqlContext.parquetFile(“/path/to/logsroot/)). What can I do to avoid this? Thanks in advance! Eric Eijkelenboom
Re: Spark 1.3 on CDH 5.3.1 YARN
Yes, should be fine since you are running on YARN. This is probably more appropriate for the cdh-user list. On Apr 8, 2015 9:35 AM, roy rp...@njit.edu wrote: Hi, We have cluster running on CDH 5.3.2 and Spark 1.2 (Which is current version in CDH5.3.2), But We want to try Spark 1.3 without breaking existing setup, so is it possible to have Spark 1.3 on existing setup ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-on-CDH-5-3-1-YARN-tp22422.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error running Spark on Cloudera
spark.eventLog.dir should contain the full HDFS URL. In general, this should be sufficient: spark.eventLog.dir=hdfs:/user/spark/applicationHistory On Wed, Apr 8, 2015 at 6:45 AM, Vijayasarathy Kannan kvi...@vt.edu wrote: I am trying to run a Spark application using spark-submit on a cluster using Cloudera manager. I get the error Exception in thread main java.io.IOException: Error in creating log directory: file:/user/spark/applicationHistory//app-20150408094126-0008 Adding the below lines in /etc/spark/conf/spark-defaults.conf wouldn't resolve it. spark.eventLog.dir=/user/spark/applicationHistory spark.eventLog.enabled=true Any idea on what is missing? -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Timeout errors from Akka in Spark 1.2.1
There are a couple of options. Increase timeout (see Spark configuration). Also see past mails in the mailing list. Another option you may try (I have gut feeling that may work, but I am not sure) is calling GC on the driver periodically. The cleaning up of stuff is tied to GCing of RDD objects and regular cleaning may help keep things clean more rigorously rather than in unpredictable bursts of GC activity. Let us know how it works out. TD On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com wrote: I have a standalone and local Spark streaming process where we are reading inputs using FlumeUtils. Our longest window size is 6 hours. After about a day and a half of running without any issues, we start seeing Timeout errors while cleaning up input blocks. This seems to cause reading from Flume to cease. ERROR sparkDriver-akka.actor.default-dispatcher-78 BlockManagerSlaveActor.logError - Error in removing block input-0-1428182594000 org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(driver, localhost, 55067),input-0-1428182594000,StorageLevel(false, false, false, false, 1),0,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361) at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) ... 17 more There was a similar query posted here http://apache-spark-user-list.1001560.n3.nabble.com/Block-removal-causes-Akka-timeouts-td15632.html but did not find any resolution to that issue. Thanks in advance, NB
Reading file with Unicode characters
Hi, Does SparkContext's textFile() method handle files with Unicode characters? How about files in UTF-8 format? Going further, is it possible to specify encodings to the method? If not, what should one do if the files to be read are in some encoding? Thanks, arun
Spark SQL Avro Library for 1.2
How do I build Spark SQL Avro Library for Spark 1.2 ? I was following this https://github.com/databricks/spark-avro and was able to build spark-avro_2.10-1.0.0.jar by simply running sbt/sbt package from the project root. but we are on Spark 1.2 and need compatible spark-avro jar. Any idea how do I do it ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Avro-Library-for-1-2-tp22421.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.3 on CDH 5.3.1 YARN
Hi, We have cluster running on CDH 5.3.2 and Spark 1.2 (Which is current version in CDH5.3.2), But We want to try Spark 1.3 without breaking existing setup, so is it possible to have Spark 1.3 on existing setup ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-on-CDH-5-3-1-YARN-tp22422.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: EC2 spark-submit --executor-memory
If you are using Spark Standalone deployment, make sure you set the WORKER_MEMROY over 20G, and you do have 20G physical memory. Yong Date: Tue, 7 Apr 2015 20:58:42 -0700 From: li...@adobe.com To: user@spark.apache.org Subject: EC2 spark-submit --executor-memory Dear Spark team, I'm using the EC2 script to startup a Spark cluster. If I login and use the executor-memory parameter in the submit script, the UI tells me that no cores are assigned to the job and nothing happens. Without executor-memory everything works fine... Until I get dag-scheduler-event-loop java.lang.OutOfMemoryError: Java heap space, but that's another issue. ./bin/spark-submit \ --class ... \ --executor-memory 20G \ /path/to/examples.jar Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EC2-spark-submit-executor-memory-tp22417.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Subscribe
Re: Subscribe
Please email user-subscr...@spark.apache.org On Apr 8, 2015, at 6:28 AM, Idris Ali psychid...@gmail.com wrote: - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Opening many Parquet files = slow
Hi guys I’ve got: 180 days of log data in Parquet. Each day is stored in a separate folder in S3. Each day consists of 20-30 Parquet files of 256 MB each. Spark 1.3 on Amazon EMR This makes approximately 5000 Parquet files with a total size if 1.5 TB. My code: val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”) Problem: Before the very first stage is started, Spark spends about 25 minutes printing the following: ... 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for reading at position '258305902' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for reading at position '260897108' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading at position '261259189' 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73' for reading … etc It looks like Spark is opening each file, before it actually does any work. This means a delay of 25 minutes when working with Parquet files. Previously, we used LZO files and did not experience this problem. Bonus info: This also happens when I use auto partition discovery (i.e. sqlContext.parquetFile(“/path/to/logsroot/)). What can I do to avoid this? Thanks in advance! Eric Eijkelenboom
Re: Advice using Spark SQL and Thrift JDBC Server
To use the HiveThriftServer2.startWithContext, I thought one would use the following artifact in the build: org.apache.spark%% spark-hive-thriftserver % 1.3.0 But I am unable to resolve the artifact. I do not see it in maven central or any other repo. Do I need to build Spark and publish locally or just missing something obvious here? Basic class is like this: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveMetastoreTypes._ import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.thriftserver._ object MyThriftServer { val sparkConf = new SparkConf() // master is passed to spark-submit, but could also be specified explicitely // .setMaster(sparkMaster) .setAppName(My ThriftServer) .set(spark.cores.max, 2) val sc = new SparkContext(sparkConf) val sparkContext = sc import sparkContext._ val sqlContext = new HiveContext(sparkContext) import sqlContext._ import sqlContext.implicits._ // register temp tables here HiveThriftServer2.startWithContext(sqlContext) } Build has the following: scalaVersion := 2.10.4 val SPARK_VERSION = 1.3.0 libraryDependencies ++= Seq( org.apache.spark %% spark-streaming-kafka % SPARK_VERSION exclude(org.apache.spark, spark-core_2.10) exclude(org.apache.spark, spark-streaming_2.10) exclude(org.apache.spark, spark-sql_2.10) exclude(javax.jms, jms), org.apache.spark %% spark-core % SPARK_VERSION % provided, org.apache.spark %% spark-streaming % SPARK_VERSION % provided, org.apache.spark %% spark-sql % SPARK_VERSION % provided, org.apache.spark %% spark-hive % SPARK_VERSION % provided, org.apache.spark %% spark-hive-thriftserver % SPARK_VERSION % provided, org.apache.kafka %% kafka % 0.8.1.1 exclude(javax.jms, jms) exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri), joda-time % joda-time % 2.7, log4j % log4j % 1.2.14 exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri) ) Appreciate the assistance. -Todd On Tue, Apr 7, 2015 at 4:09 PM, James Aley james.a...@swiftkey.com wrote: Excellent, thanks for your help, I appreciate your advice! On 7 Apr 2015 20:43, Michael Armbrust mich...@databricks.com wrote: That should totally work. The other option would be to run a persistent metastore that multiple contexts can talk to and periodically run a job that creates missing tables. The trade-off here would be more complexity, but less downtime due to the server restarting. On Tue, Apr 7, 2015 at 12:34 PM, James Aley james.a...@swiftkey.com wrote: Hi Michael, Thanks so much for the reply - that really cleared a lot of things up for me! Let me just check that I've interpreted one of your suggestions for (4) correctly... Would it make sense for me to write a small wrapper app that pulls in hive-thriftserver as a dependency, iterates my Parquet directory structure to discover tables and registers each as a temp table in some context, before calling HiveThriftServer2.createWithContext as you suggest? This would mean that to add new content, all I need to is restart that app, which presumably could also be avoided fairly trivially by periodically restarting the server with a new context internally. That certainly beats manual curation of Hive table definitions, if it will work? Thanks again, James. On 7 April 2015 at 19:30, Michael Armbrust mich...@databricks.com wrote: 1) What exactly is the relationship between the thrift server and Hive? I'm guessing Spark is just making use of the Hive metastore to access table definitions, and maybe some other things, is that the case? Underneath the covers, the Spark SQL thrift server is executing queries using a HiveContext. In this mode, nearly all computation is done with Spark SQL but we try to maintain compatibility with Hive wherever possible. This means that you can write your queries in HiveQL, read tables from the Hive metastore, and use Hive UDFs UDTs UDAFs, etc. The one exception here is Hive DDL operations (CREATE TABLE, etc). These are passed directly to Hive code and executed there. The Spark SQL DDL is sufficiently different that we always try to parse that first, and fall back to Hive when it does not parse. One possibly confusing point here, is that you can persist Spark SQL tables into the Hive metastore, but this is not the same as a Hive table. We are only use the metastore as a repo for metadata, but are not using their format for the information in this case (as we have datasources that hive does not understand, including things like schema auto discovery). HiveQL DDL, run by Hive but can be read by Spark SQL: CREATE TABLE t (x INT) SORTED AS PARQUET Spark SQL DDL, run by Spark SQL, stored in metastore, cannot be read by hive: CREATE TABLE t USING parquet (path '/path/to/data') 2) Am I therefore
Error running Spark on Cloudera
I am trying to run a Spark application using spark-submit on a cluster using Cloudera manager. I get the error Exception in thread main java.io.IOException: Error in creating log directory: file:/user/spark/applicationHistory//app-20150408094126-0008 Adding the below lines in /etc/spark/conf/spark-defaults.conf wouldn't resolve it. *spark.eventLog.dir=/user/spark/applicationHistory* *spark.eventLog.enabled=true* Any idea on what is missing?
Maintaining state
It should be noted I'm a newbie to Spark so please have patience ... I'm trying to convert an existing application over to spark and am running into some high level questions that I can't seem to resolve. Possibly because what I'm trying to do is not supported. In a nutshell as I process the individual elements of an rdd I want to save away some calculations etc that for all intensive purposes, the results fit nicely into a hashmap structure . I'd like to than take that hashmap and somehow get access to it, so I can use and update it as I process element 2 . and than naturally I want it available for the remaining elements in an rdd and even across RDD's . In this example I mention Hashmap but it could be any arbitrary object . So from a broad sense I'm looking at maintaining some state across each element of a JavaDStream.( The state information can get large but I will be partitioning the dstream by hashing on a key ... I don't think however this is relevant to the question being asked ... ) I'd like to do this while I'm transforming an RDD into another RDD as part of a JavaDStream transform or map type operation.. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-state-tp22424.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming and SQL
Hi all, I am using Spark Streaming to monitor an S3 bucket for objects that contain JSON. I want to import that JSON into Spark SQL DataFrame. Here's my current code: *from pyspark import SparkContext, SparkConf* *from pyspark.streaming import StreamingContext* *import json* *from pyspark.sql import SQLContext* *conf = SparkConf().setAppName('MyApp').setMaster('local[4]')* *sc = SparkContext(conf=conf)* *ssc = StreamingContext(sc, 30)* *sqlContext = SQLContext(sc)* *distFile = ssc.textFileStream(s3n://mybucket/)* *json_data = sqlContext.jsonRDD(distFile)* *json_data.printSchema()* *ssc.start()* *ssc.awaitTermination()* I am not creating DataFrame correctly as I get an error: *'TransformedDStream' object has no attribute '_jrdd'* Can someone help me out? Thanks, Vadim ᐧ
Re: Opening many Parquet files = slow
Thanks for the report. We improved the speed here in 1.3.1 so would be interesting to know if this helps. You should also try disabling schema merging if you do not need that feature (i.e. all of your files are the same schema). sqlContext.load(path, parquet, Map(mergeSchema - false)) On Wed, Apr 8, 2015 at 7:35 AM, Ted Yu yuzhih...@gmail.com wrote: You may have seen this thread: http://search-hadoop.com/m/JW1q5SlRpt1 Cheers On Wed, Apr 8, 2015 at 6:15 AM, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi guys *I’ve got:* - 180 days of log data in Parquet. - Each day is stored in a separate folder in S3. - Each day consists of 20-30 Parquet files of 256 MB each. - Spark 1.3 on Amazon EMR This makes approximately 5000 Parquet files with a total size if 1.5 TB. *My code*: val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”) *Problem*: Before the very first stage is started, Spark spends about 25 minutes printing the following: ... 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for reading at position '258305902' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for reading at position '260897108' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading at position '261259189' 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73' for reading … etc It looks like Spark is opening each file, before it actually does any work. This means a delay of 25 minutes when working with Parquet files. Previously, we used LZO files and did not experience this problem. *Bonus info: * This also happens when I use auto partition discovery (i.e. sqlContext.parquetFile(“/path/to/logsroot/)). What can I do to avoid this? Thanks in advance! Eric Eijkelenboom
Re: The differentce between SparkSql/DataFram join and Rdd join
I think your thread dump for the master is actually just a thread dump for SBT that is waiting on a forked driver program. ... java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 0x7fed624ff528 (a java.lang.UNIXProcess) at java.lang.Object.wait(Object.java:503) at java.lang.UNIXProcess.waitFor(UNIXProcess.java:263) - locked 0x7fed624ff528 (a java.lang.UNIXProcess) at sbt.SimpleProcess.exitValue(ProcessImpl.scala:377) ... Additionally, no work seems to be happening on the worker. I think you want to jstack the process ForkMain. On Wed, Apr 8, 2015 at 2:37 AM, Hao Ren inv...@gmail.com wrote: Hi Michael, In fact, I find that all workers are hanging when SQL/DF join is running. So I picked the master and one of the workers. jstack is the following: Master 2015-04-08 09:08:22 Full thread dump OpenJDK 64-Bit Server VM (24.65-b04 mixed mode): Thread-3 prio=10 tid=0x7fdbe0013000 nid=0x2836 runnable [0x7fdd0cd89000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:272) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) - locked 0x7fed62503a00 (a java.lang.UNIXProcess$ProcessPipeInputStream) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) - locked 0x7fed95527048 (a java.io.InputStreamReader) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.readLine(BufferedReader.java:317) - locked 0x7fed95527048 (a java.io.InputStreamReader) at java.io.BufferedReader.readLine(BufferedReader.java:382) at sbt.BasicIO$$anonfun$processFully$1$$anonfun$apply$8.apply(ProcessImpl.scala:58) at sbt.BasicIO$$anonfun$processFully$1$$anonfun$apply$8.apply(ProcessImpl.scala:58) at sbt.BasicIO$.readFully$1(ProcessImpl.scala:63) at sbt.BasicIO$.processLinesFully(ProcessImpl.scala:69) at sbt.BasicIO$$anonfun$processFully$1.apply(ProcessImpl.scala:58) at sbt.BasicIO$$anonfun$processFully$1.apply(ProcessImpl.scala:55) at sbt.SimpleProcessBuilder$$anonfun$4.apply$mcV$sp(ProcessImpl.scala:357) at sbt.Spawn$$anon$3.run(ProcessImpl.scala:17) Thread-2 prio=10 tid=0x7fdbe001 nid=0x2835 runnable [0x7fdd0d995000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:272) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) - locked 0x7fed625018b8 (a java.lang.UNIXProcess$ProcessPipeInputStream) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) - locked 0x7fed8cc9d8b8 (a java.io.InputStreamReader) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.readLine(BufferedReader.java:317) - locked 0x7fed8cc9d8b8 (a java.io.InputStreamReader) at java.io.BufferedReader.readLine(BufferedReader.java:382) at sbt.BasicIO$$anonfun$processFully$1$$anonfun$apply$8.apply(ProcessImpl.scala:58) at sbt.BasicIO$$anonfun$processFully$1$$anonfun$apply$8.apply(ProcessImpl.scala:58) at sbt.BasicIO$.readFully$1(ProcessImpl.scala:63) at sbt.BasicIO$.processLinesFully(ProcessImpl.scala:69) at sbt.BasicIO$$anonfun$processFully$1.apply(ProcessImpl.scala:58) at sbt.BasicIO$$anonfun$processFully$1.apply(ProcessImpl.scala:55) at sbt.SimpleProcessBuilder$$anonfun$3.apply$mcV$sp(ProcessImpl.scala:354) at sbt.Spawn$$anon$3.run(ProcessImpl.scala:17) pool-5-thread-5 prio=10 tid=0x7ff16034a000 nid=0x2832 waiting on condition [0x7fdd0da96000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7feacd0621d0 (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359) at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
Re: Incremently load big RDD file into Memory
Hi Muhammad, There are lots of ways to do it. My company actually develops a text mining solution which embeds a very fast Approximate Neighbours solution (a demo with real time queries on the wikipedia dataset can be seen at wikinsights.org). For the record, we now prepare a dataset of 4.5 million documents for querying in about 2 or 3 minutes on a 32 cores cluster, and the queries take less than 10ms when the dataset is in memory. But if you just want to precompute everything and don't mind waiting a few tens of minutes (or hours), and don't want to bother with an approximate neighbour solution, then the best way is probably something like this : 1 - block your data (i.e. group your items in X large groups). Instead of a dataset of N elements, you should now have a dataset of X blocks containing N/X elements each. 2 - do the cartesian product (instead of N*N elements, you now have just X*X blocks, which should take less memory) 3 - for each pair of blocks (blockA,blockB), perform the computation of distances for each elements of blockA with each element of blockB, but keep only the top K best for each element of blockA. Output is List((elementOfBlockA, listOfKNearestElementsOfBlockBWithTheDistance),..) 4 - reduceByKey (the key is the elementOfBlockA), by merging the listOfNearestElements and always keeping the K nearest. This is an exact version of top K. This is only interesting if K N/X. But even if K is large, it is possible that it will fit your needs. Remember that you will still compute N*N distances (this is the problem with exact nearest neighbours), the only difference with what you're doing now is that you produces less items and duplicates less data. Indeed, if one of your elements takes 100bytes, the per element cartesian will produce N*N*100*2 bytes, while the blocked version will produce X*X*100*2*N/X, ie X*N*100*2 bytes. Guillaume Hi Guillaume, Thanks for you reply. Can you please tell me how can i improve for Top-k nearest points. P.S. My post is not accepted on the list thats why i am sending you email here. I would be really grateful to you if you reply it. Thanks, On Wed, Apr 8, 2015 at 1:23 PM, Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com wrote: This kind of operation is not scalable, not matter what you do, at least if you _really_ want to do that. However, if what you're looking for is not to really compute all distances, (for instance if you're looking only for the top K nearest points), then it can be highly improved. It all depends of what you want to do eventually. Guillaume val locations = filelines.map(line = line.split(\t)).map(t = (t(5).toLong, (t(2).toDouble, t(3).toDouble))).distinct().collect() val cartesienProduct=locations.cartesian(locations).map(t= Edge(t._1._1,t._2._1,distanceAmongPoints(t._1._2._1,t._1._2._2,t._2._2._1,t._2._2._2))) Code executes perfectly fine uptill here but when i try to use cartesienProduct it got stuck i.e. val count =cartesienProduct.count() Any help to efficiently do this will be highly appreciated. -- View this message in context:http://apache-spark-user-list.1001560.n3.nabble.com/Incremently-load-big-RDD-file-into-Memory-tp22410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail:user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail:user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- Regards, Muhammad Aamir /CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content./ -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: Advice using Spark SQL and Thrift JDBC Server
Sorry guys. I didn't realize that https://issues.apache.org/jira/browse/SPARK-4925 was not fixed yet. You can publish locally in the mean time (sbt/sbt publishLocal). On Wed, Apr 8, 2015 at 8:29 AM, Mohammed Guller moham...@glassbeam.com wrote: +1 Interestingly, I ran into the exactly the same issue yesterday. I couldn’t find any documentation about which project to include as a dependency in build.sbt to use HiveThriftServer2. Would appreciate help. Mohammed *From:* Todd Nist [mailto:tsind...@gmail.com] *Sent:* Wednesday, April 8, 2015 5:49 AM *To:* James Aley *Cc:* Michael Armbrust; user *Subject:* Re: Advice using Spark SQL and Thrift JDBC Server To use the HiveThriftServer2.startWithContext, I thought one would use the following artifact in the build: org.apache.spark%% spark-hive-thriftserver % 1.3.0 But I am unable to resolve the artifact. I do not see it in maven central or any other repo. Do I need to build Spark and publish locally or just missing something obvious here? Basic class is like this: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveMetastoreTypes._ import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.thriftserver._ object MyThriftServer { val sparkConf = new SparkConf() // master is passed to spark-submit, but could also be specified explicitely // .setMaster(sparkMaster) .setAppName(My ThriftServer) .set(spark.cores.max, 2) val sc = new SparkContext(sparkConf) val sparkContext = sc import sparkContext._ val sqlContext = new HiveContext(sparkContext) import sqlContext._ import sqlContext.implicits._ // register temp tables here HiveThriftServer2.startWithContext(sqlContext) } Build has the following: scalaVersion := 2.10.4 val SPARK_VERSION = 1.3.0 libraryDependencies ++= Seq( org.apache.spark %% spark-streaming-kafka % SPARK_VERSION exclude(org.apache.spark, spark-core_2.10) exclude(org.apache.spark, spark-streaming_2.10) exclude(org.apache.spark, spark-sql_2.10) exclude(javax.jms, jms), org.apache.spark %% spark-core % SPARK_VERSION % provided, org.apache.spark %% spark-streaming % SPARK_VERSION % provided, org.apache.spark %% spark-sql % SPARK_VERSION % provided, org.apache.spark %% spark-hive % SPARK_VERSION % provided, org.apache.spark %% spark-hive-thriftserver % SPARK_VERSION % provided, org.apache.kafka %% kafka % 0.8.1.1 exclude(javax.jms, jms) exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri), joda-time % joda-time % 2.7, log4j % log4j % 1.2.14 exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri) ) Appreciate the assistance. -Todd On Tue, Apr 7, 2015 at 4:09 PM, James Aley james.a...@swiftkey.com wrote: Excellent, thanks for your help, I appreciate your advice! On 7 Apr 2015 20:43, Michael Armbrust mich...@databricks.com wrote: That should totally work. The other option would be to run a persistent metastore that multiple contexts can talk to and periodically run a job that creates missing tables. The trade-off here would be more complexity, but less downtime due to the server restarting. On Tue, Apr 7, 2015 at 12:34 PM, James Aley james.a...@swiftkey.com wrote: Hi Michael, Thanks so much for the reply - that really cleared a lot of things up for me! Let me just check that I've interpreted one of your suggestions for (4) correctly... Would it make sense for me to write a small wrapper app that pulls in hive-thriftserver as a dependency, iterates my Parquet directory structure to discover tables and registers each as a temp table in some context, before calling HiveThriftServer2.createWithContext as you suggest? This would mean that to add new content, all I need to is restart that app, which presumably could also be avoided fairly trivially by periodically restarting the server with a new context internally. That certainly beats manual curation of Hive table definitions, if it will work? Thanks again, James. On 7 April 2015 at 19:30, Michael Armbrust mich...@databricks.com wrote: 1) What exactly is the relationship between the thrift server and Hive? I'm guessing Spark is just making use of the Hive metastore to access table definitions, and maybe some other things, is that the case? Underneath the covers, the Spark SQL thrift server is executing queries using a HiveContext. In this mode, nearly all computation is done with Spark SQL but we try to maintain compatibility with Hive wherever possible. This means that you can write your queries in HiveQL, read tables from the Hive metastore, and use Hive UDFs UDTs UDAFs, etc. The one exception here is Hive DDL operations (CREATE
Re: parquet partition discovery
Back to the user list so everyone can see the result of the discussion... Ah. It all makes sense now. The issue is that when I created the parquet files, I included an unnecessary directory name (data.parquet) below the partition directories. It’s just a leftover from when I started with Michael’s sample code and it only made sense before I added the partition directories. I probably thought it was some magic name that was required when spark scanned for parquet files. The structure looks something like this: drwxr-xr-x - user supergroup 0 2015-04-02 13:17 hdfs://host/tablename/date=20150302/sym=A/data.parquet/... If I just move all the files up a level (there goes a day of work) , the existing code should work fine. Whether it’s useful to handle intermediate non-partition directories or whether that just creates some extra risk I can’t say, since I’m new to all the technology in this whole stack. I'm mixed here. There is always a tradeoff between silently ignoring structure that people might not be aware of (and thus might be a bug) and just working. Having this as an option at least certainly seems reasonable. I'd be curious if anyone had other thoughts? Unfortunately, it takes many minutes (even with mergeSchema=false) to create the RDD. It appears that the whole data store will still be recursively traversed (even with mergeSchema=false, a manually specified schema, and a partition spec [which I can’t pass in through a public API]) so that all of the metadata FileStatuses can be cached. In my case I’m going to have years of data, so there’s no way that will be feasible. Should I just explicitly load the partitions I want instead of using partition discovery? Is there any plan to have a less aggressive version of support for partitions, where metadata is only cached for partitions that are used in queries? We improved the speed here in 1.3.1 so I'd be curious if that helps. We definitely need to continue to speed things up here though. We have to enumerate all the partitions so we know what data to read when a query comes in, but I do think we can parallelize it or something.
start-slave.sh not starting
I am trying to start the worker by: sbin/start-slave.sh spark://ip-10-241-251-232:7077 In the logs it's complaining about: Master must be a URL of the form spark://hostname:port I also have this in spark-defaults.conf spark.master spark://ip-10-241-251-232:7077 Did I miss anything?
Re: org.apache.spark.ml.recommendation.ALS
some additional context: Since, I am using features of spark 1.3.0, I have downloaded spark 1.3.0 and used spark-submit from there. The cluster is still on spark-1.2.0. So, this looks to me that at runtime, the executors could not find some libraries of spark-1.3.0, even though I ran spark-submit from my downloaded spark-1.30. On Apr 6, 2015, at 1:37 PM, Jay Katukuri jkatuk...@apple.com wrote: Here is the command that I have used : spark-submit —class packagename.ALSNew --num-executors 100 --master yarn ALSNew.jar -jar spark-sql_2.11-1.3.0.jar hdfs://input_path Btw - I could run the old ALS in mllib package. On Apr 6, 2015, at 12:32 PM, Xiangrui Meng men...@gmail.com wrote: So ALSNew.scala is your own application, did you add it with spark-submit or spark-shell? The correct command should like spark-submit --class your.package.name.ALSNew ALSNew.jar [options] Please check the documentation: http://spark.apache.org/docs/latest/submitting-applications.html -Xiangrui On Mon, Apr 6, 2015 at 12:27 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi, Here is the stack trace: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at ALSNew$.main(ALSNew.scala:35) at ALSNew.main(ALSNew.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks, Jay On Apr 6, 2015, at 12:24 PM, Xiangrui Meng men...@gmail.com wrote: Please attach the full stack trace. -Xiangrui On Mon, Apr 6, 2015 at 12:06 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi all, I got a runtime error while running the ALS. Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; The error that I am getting is at the following code: val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF() Any help is appreciated ! I have tried passing the spark-sql jar using the -jar spark-sql_2.11-1.3.0.jar Thanks, Jay On Mar 17, 2015, at 12:50 PM, Xiangrui Meng men...@gmail.com wrote: Please remember to copy the user list next time. I might not be able to respond quickly. There are many others who can help or who can benefit from the discussion. Thanks! -Xiangrui On Tue, Mar 17, 2015 at 12:04 PM, Jay Katukuri jkatuk...@apple.com wrote: Great Xiangrui. It works now. Sorry that I needed to bug you :) Jay On Mar 17, 2015, at 11:48 AM, Xiangrui Meng men...@gmail.com wrote: Please check this section in the user guide: http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection You need `import sqlContext.implicits._` to use `toDF()`. -Xiangrui On Mon, Mar 16, 2015 at 2:34 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi Xiangrui, Thanks a lot for the quick reply. I am still facing an issue. I have tried the code snippet that you have suggested: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate”)} for this, I got the below error: error: ';' expected but '.' found. [INFO] }.toDF(user, item, rate”)} [INFO] ^ when I tried below code val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF(user, item, rate) error: value toDF is not a member of org.apache.spark.rdd.RDD[(Int, Int, Float)] [INFO] possible cause: maybe a semicolon is missing before `value toDF'? [INFO] }).toDF(user, item, rate) I have looked at the document that you have shared and tried the following code: case class Record(user: Int, item: Int, rate:Double) val ratings = purchase.map(_.split(',')).map(r =Record(r(0).toInt, r(1).toInt, r(2).toDouble)) .toDF(user, item, rate) for this, I got the below error: error: value toDF is not a member of org.apache.spark.rdd.RDD[Record] Appreciate your help ! Thanks, Jay On Mar 16, 2015, at 11:35 AM, Xiangrui Meng men...@gmail.com wrote: Try this: val ratings = purchase.map { line =
RE: Reading file with Unicode characters
Spark use the Hadoop TextInputFormat to read the file. Since Hadoop is almost only supporting Linux, so UTF-8 is the only encoding supported, as it is the the one on Linux. If you have other encoding data, you may want to vote for this Jira:https://issues.apache.org/jira/browse/MAPREDUCE-232 Yong Date: Wed, 8 Apr 2015 10:35:18 -0700 Subject: Reading file with Unicode characters From: lists.a...@gmail.com To: user@spark.apache.org CC: lists.a...@gmail.com Hi, Does SparkContext's textFile() method handle files with Unicode characters? How about files in UTF-8 format? Going further, is it possible to specify encodings to the method? If not, what should one do if the files to be read are in some encoding? Thanks,arun
RE: Advice using Spark SQL and Thrift JDBC Server
Michael, Thank you! Looks like the sbt build is broken for 1.3. I downloaded the source code for 1.3, but I get the following error a few minutes after I run “sbt/sbt publishLocal” [error] (network-shuffle/*:update) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-network-common_2.10;1.3.0: configuration not public in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It was required from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test [error] Total time: 106 s, completed Apr 8, 2015 12:33:45 PM Mohammed From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Wednesday, April 8, 2015 11:54 AM To: Mohammed Guller Cc: Todd Nist; James Aley; user; Patrick Wendell Subject: Re: Advice using Spark SQL and Thrift JDBC Server Sorry guys. I didn't realize that https://issues.apache.org/jira/browse/SPARK-4925 was not fixed yet. You can publish locally in the mean time (sbt/sbt publishLocal). On Wed, Apr 8, 2015 at 8:29 AM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: +1 Interestingly, I ran into the exactly the same issue yesterday. I couldn’t find any documentation about which project to include as a dependency in build.sbt to use HiveThriftServer2. Would appreciate help. Mohammed From: Todd Nist [mailto:tsind...@gmail.commailto:tsind...@gmail.com] Sent: Wednesday, April 8, 2015 5:49 AM To: James Aley Cc: Michael Armbrust; user Subject: Re: Advice using Spark SQL and Thrift JDBC Server To use the HiveThriftServer2.startWithContext, I thought one would use the following artifact in the build: org.apache.spark%% spark-hive-thriftserver % 1.3.0 But I am unable to resolve the artifact. I do not see it in maven central or any other repo. Do I need to build Spark and publish locally or just missing something obvious here? Basic class is like this: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveMetastoreTypes._ import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.thriftserver._ object MyThriftServer { val sparkConf = new SparkConf() // master is passed to spark-submit, but could also be specified explicitely // .setMaster(sparkMaster) .setAppName(My ThriftServer) .set(spark.cores.max, 2) val sc = new SparkContext(sparkConf) val sparkContext = sc import sparkContext._ val sqlContext = new HiveContext(sparkContext) import sqlContext._ import sqlContext.implicits._ // register temp tables here HiveThriftServer2.startWithContext(sqlContext) } Build has the following: scalaVersion := 2.10.4 val SPARK_VERSION = 1.3.0 libraryDependencies ++= Seq( org.apache.spark %% spark-streaming-kafka % SPARK_VERSION exclude(org.apache.spark, spark-core_2.10) exclude(org.apache.spark, spark-streaming_2.10) exclude(org.apache.spark, spark-sql_2.10) exclude(javax.jms, jms), org.apache.spark %% spark-core % SPARK_VERSION % provided, org.apache.spark %% spark-streaming % SPARK_VERSION % provided, org.apache.spark %% spark-sql % SPARK_VERSION % provided, org.apache.spark %% spark-hive % SPARK_VERSION % provided, org.apache.spark %% spark-hive-thriftserver % SPARK_VERSION % provided, org.apache.kafka %% kafka % 0.8.1.1 exclude(javax.jms, jms) exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri), joda-time % joda-time % 2.7, log4j % log4j % 1.2.14 exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri) ) Appreciate the assistance. -Todd On Tue, Apr 7, 2015 at 4:09 PM, James Aley james.a...@swiftkey.commailto:james.a...@swiftkey.com wrote: Excellent, thanks for your help, I appreciate your advice! On 7 Apr 2015 20:43, Michael Armbrust mich...@databricks.commailto:mich...@databricks.com wrote: That should totally work. The other option would be to run a persistent metastore that multiple contexts can talk to and periodically run a job that creates missing tables. The trade-off here would be more complexity, but less downtime due to the server restarting. On Tue, Apr 7, 2015 at 12:34 PM, James Aley james.a...@swiftkey.commailto:james.a...@swiftkey.com wrote: Hi Michael, Thanks so much for the reply - that really cleared a lot of things up for me! Let me just check that I've interpreted one of your suggestions for (4) correctly... Would it make sense for me to write a small wrapper app that pulls in hive-thriftserver as a dependency, iterates my Parquet directory structure to discover tables and registers each as a temp table in some context, before calling HiveThriftServer2.createWithContext as you suggest? This would mean that to add new content, all I need to is restart that app, which presumably could also be avoided fairly trivially by periodically restarting the server with a new context internally. That certainly beats manual curation
Class incompatible error
I am seeing the following, is this because of my maven version? 15/04/08 15:42:22 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-241-251-232.us-west-2.compute.internal): java.io.InvalidClassException: org.apache.spark.Aggregator; local class incompatible: stream classdesc serialVersionUID = 5032037208639381169, local class serialVersionUID = -9085606473104903453 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency
Unit testing with HiveContext
I am trying to unit test some code which takes an existing HiveContext and uses it to execute a CREATE TABLE query (among other things). Unfortunately I've run into some hurdles trying to unit test this, and I'm wondering if anyone has a good approach. The metastore DB is automatically created in the local directory, but it doesn't seem to be cleaned up afterward. Is there any way to get Spark to clean this up when the context is stopped? Or can I point this to some other location, such as a temp directory? Trying to create a table fails because it is using the default warehouse directory (/user/hive/warehouse). Is there some way to change this without hard-coding a directory in a hive-site.xml; again, I'd prefer to point it to a temp directory so it will be automatically removed. I tried a couple of things that didn't work: - hiveContext.sql(SET hive.metastore.warehouse.dir=/tmp/dir/xyz) - hiveContext.setConf(hive.metastore.warehouse.dir, /tmp/dir/xyz) Any advice from those who have been here before would be appreciated.
Empty RDD?
When I call *transform* or *foreachRDD *on* DStream*, I keep getting an error that I have an empty RDD, which make sense since my batch interval maybe smaller than the rate at which new data are coming in. How to guard against it? Thanks, Vadim ᐧ
Re: Timeout errors from Akka in Spark 1.2.1
Since we are running in local mode, won't all the executors be in the same JVM as the driver? Thanks NB On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com wrote: Its does take effect on the executors, not on the driver. Which is okay because executors have all the data and therefore have GC issues, not so usually for the driver. If you want to double-sure, print the JVM flag (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags) However, the GC i was referring to that initiates the RDD and shuffle cleanup was the GC on the driver. Thought I would clarify. TD On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote: Hi TD, Thanks for the response. Since you mentioned GC, this got me thinking. Given that we are running in local mode (all in a single JVM) for now, does the option spark.executor.extraJavaOptions set to -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all before we use it to create the StreamingContext? I ask because that is what we are doing right now. If not, perhaps we have not been running with the Concurrent Mark Sweep at all and is that recommended instead of forcing GC periodically? Thanks NB On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com wrote: There are a couple of options. Increase timeout (see Spark configuration). Also see past mails in the mailing list. Another option you may try (I have gut feeling that may work, but I am not sure) is calling GC on the driver periodically. The cleaning up of stuff is tied to GCing of RDD objects and regular cleaning may help keep things clean more rigorously rather than in unpredictable bursts of GC activity. Let us know how it works out. TD On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com wrote: I have a standalone and local Spark streaming process where we are reading inputs using FlumeUtils. Our longest window size is 6 hours. After about a day and a half of running without any issues, we start seeing Timeout errors while cleaning up input blocks. This seems to cause reading from Flume to cease. ERROR sparkDriver-akka.actor.default-dispatcher-78 BlockManagerSlaveActor.logError - Error in removing block input-0-1428182594000 org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(driver, localhost, 55067),input-0-1428182594000,StorageLevel(false, false, false, false, 1),0,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361) at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) ... 17 more There was a
Pyspark query by binary type
I am loading some avro data into spark using the following code: sqlContext.sql(CREATE TEMPORARY TABLE foo USING com.databricks.spark.avro OPTIONS (path 'hdfs://*.avro')) The avro data contains some binary fields that get translated to the BinaryType data type. I am struggling with how to use the binary type in a query statement. For example: sqlContext.sql(select * from foo where name='\x00\x00\x0c\x07\xac\x02').collect() throws a utf8 error. Trying this: sqlContext.sql(uselect * from foo where name='\x00\x00\x0c\x07\xac\x02').collect() does not return any results. Any thoughts/ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-query-by-binary-type-tp22431.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Add row IDs column to data frame
More generic version of a question below: Is it possible to append a column to existing DataFrame at all? I understand that this is not an easy task in Spark environment, but is there any workaround? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Add-row-IDs-column-to-data-frame-tp22385p22427.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming and SQL
Hi all, I figured it out! The DataFrames and SQL example in Spark Streaming docs were useful. Best, Vadim ᐧ On Wed, Apr 8, 2015 at 2:38 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, I am using Spark Streaming to monitor an S3 bucket for objects that contain JSON. I want to import that JSON into Spark SQL DataFrame. Here's my current code: *from pyspark import SparkContext, SparkConf* *from pyspark.streaming import StreamingContext* *import json* *from pyspark.sql import SQLContext* *conf = SparkConf().setAppName('MyApp').setMaster('local[4]')* *sc = SparkContext(conf=conf)* *ssc = StreamingContext(sc, 30)* *sqlContext = SQLContext(sc)* *distFile = ssc.textFileStream(s3n://mybucket/)* *json_data = sqlContext.jsonRDD(distFile)* *json_data.printSchema()* *ssc.start()* *ssc.awaitTermination()* I am not creating DataFrame correctly as I get an error: *'TransformedDStream' object has no attribute '_jrdd'* Can someone help me out? Thanks, Vadim ᐧ
sortByKey with multiple partitions
Hi, If I perform a sortByKey(true, 2).saveAsTextFile(filename) on a cluster, will the data be sorted per partition, or in total. (And is this guaranteed?) Example: Input 4,2,3,6,5,7 Sorted per partition: part-0: 2,3,7 part-1: 4,5,6 Sorted in total: part-0: 2,3,4 part-1: 5,6,7 Thanks, Tom P.S. (I know that the data might not end up being uniformly distributed, example: 4 elements in part-0 and 2 in part-1) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-with-multiple-partitions-tp22426.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Add row IDs column to data frame
You could convert DF to RDD, then in map phase or in join add new column, and then again convert to DF. I know this is not elegant solution and maybe it is not a solution at all. :) But this is the first thing that popped in my mind. I am new also to DF api. Best Bojan On Apr 9, 2015 00:37, olegshirokikh [via Apache Spark User List] ml-node+s1001560n22427...@n3.nabble.com wrote: More generic version of a question below: Is it possible to append a column to existing DataFrame at all? I understand that this is not an easy task in Spark environment, but is there any workaround? -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Add-row-IDs-column-to-data-frame-tp22385p22427.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=Ymxvb2Q5cmF2ZW5AZ21haWwuY29tfDF8NTk3ODE0NzQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Add-row-IDs-column-to-data-frame-tp22385p22428.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Timeout errors from Akka in Spark 1.2.1
Thanks TD. I believe that might have been the issue. Will try for a few days after passing in the GC option on the java command line when we start the process. Thanks for your timely help. NB On Wed, Apr 8, 2015 at 6:08 PM, Tathagata Das t...@databricks.com wrote: Yes, in local mode they the driver and executor will be same the process. And in that case the Java options in SparkConf configuration will not work. On Wed, Apr 8, 2015 at 1:44 PM, N B nb.nos...@gmail.com wrote: Since we are running in local mode, won't all the executors be in the same JVM as the driver? Thanks NB On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com wrote: Its does take effect on the executors, not on the driver. Which is okay because executors have all the data and therefore have GC issues, not so usually for the driver. If you want to double-sure, print the JVM flag (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags) However, the GC i was referring to that initiates the RDD and shuffle cleanup was the GC on the driver. Thought I would clarify. TD On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote: Hi TD, Thanks for the response. Since you mentioned GC, this got me thinking. Given that we are running in local mode (all in a single JVM) for now, does the option spark.executor.extraJavaOptions set to -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all before we use it to create the StreamingContext? I ask because that is what we are doing right now. If not, perhaps we have not been running with the Concurrent Mark Sweep at all and is that recommended instead of forcing GC periodically? Thanks NB On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com wrote: There are a couple of options. Increase timeout (see Spark configuration). Also see past mails in the mailing list. Another option you may try (I have gut feeling that may work, but I am not sure) is calling GC on the driver periodically. The cleaning up of stuff is tied to GCing of RDD objects and regular cleaning may help keep things clean more rigorously rather than in unpredictable bursts of GC activity. Let us know how it works out. TD On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com wrote: I have a standalone and local Spark streaming process where we are reading inputs using FlumeUtils. Our longest window size is 6 hours. After about a day and a half of running without any issues, we start seeing Timeout errors while cleaning up input blocks. This seems to cause reading from Flume to cease. ERROR sparkDriver-akka.actor.default-dispatcher-78 BlockManagerSlaveActor.logError - Error in removing block input-0-1428182594000 org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(driver, localhost, 55067),input-0-1428182594000,StorageLevel(false, false, false, false, 1),0,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361) at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at
Re: Timeout errors from Akka in Spark 1.2.1
Yes, in local mode they the driver and executor will be same the process. And in that case the Java options in SparkConf configuration will not work. On Wed, Apr 8, 2015 at 1:44 PM, N B nb.nos...@gmail.com wrote: Since we are running in local mode, won't all the executors be in the same JVM as the driver? Thanks NB On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com wrote: Its does take effect on the executors, not on the driver. Which is okay because executors have all the data and therefore have GC issues, not so usually for the driver. If you want to double-sure, print the JVM flag (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags) However, the GC i was referring to that initiates the RDD and shuffle cleanup was the GC on the driver. Thought I would clarify. TD On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote: Hi TD, Thanks for the response. Since you mentioned GC, this got me thinking. Given that we are running in local mode (all in a single JVM) for now, does the option spark.executor.extraJavaOptions set to -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all before we use it to create the StreamingContext? I ask because that is what we are doing right now. If not, perhaps we have not been running with the Concurrent Mark Sweep at all and is that recommended instead of forcing GC periodically? Thanks NB On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com wrote: There are a couple of options. Increase timeout (see Spark configuration). Also see past mails in the mailing list. Another option you may try (I have gut feeling that may work, but I am not sure) is calling GC on the driver periodically. The cleaning up of stuff is tied to GCing of RDD objects and regular cleaning may help keep things clean more rigorously rather than in unpredictable bursts of GC activity. Let us know how it works out. TD On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com wrote: I have a standalone and local Spark streaming process where we are reading inputs using FlumeUtils. Our longest window size is 6 hours. After about a day and a half of running without any issues, we start seeing Timeout errors while cleaning up input blocks. This seems to cause reading from Flume to cease. ERROR sparkDriver-akka.actor.default-dispatcher-78 BlockManagerSlaveActor.logError - Error in removing block input-0-1428182594000 org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(driver, localhost, 55067),input-0-1428182594000,StorageLevel(false, false, false, false, 1),0,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361) at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at
Re: sortByKey with multiple partitions
See the scaladoc from OrderedRDDFunctions.scala : * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling * `collect` or `save` on the resulting RDD will return or output an ordered list of records * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys). Cheers On Wed, Apr 8, 2015 at 3:01 PM, Tom thubregt...@gmail.com wrote: Hi, If I perform a sortByKey(true, 2).saveAsTextFile(filename) on a cluster, will the data be sorted per partition, or in total. (And is this guaranteed?) Example: Input 4,2,3,6,5,7 Sorted per partition: part-0: 2,3,7 part-1: 4,5,6 Sorted in total: part-0: 2,3,4 part-1: 5,6,7 Thanks, Tom P.S. (I know that the data might not end up being uniformly distributed, example: 4 elements in part-0 and 2 in part-1) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-with-multiple-partitions-tp22426.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Class incompatible error
bq. one is Oracle and the other is OpenJDK I don't have experience with mixed JDK's. Can you try with using single JDK ? Cheers On Wed, Apr 8, 2015 at 3:26 PM, Mohit Anchlia mohitanch...@gmail.com wrote: For the build I am using java version 1.7.0_65 which seems to be the same as the one on the spark host. However one is Oracle and the other is OpenJDK. Does that make any difference? On Wed, Apr 8, 2015 at 1:24 PM, Ted Yu yuzhih...@gmail.com wrote: What version of Java do you use to build ? Cheers On Wed, Apr 8, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am seeing the following, is this because of my maven version? 15/04/08 15:42:22 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-241-251-232.us-west-2.compute.internal): java.io.InvalidClassException: org.apache.spark.Aggregator; local class incompatible: stream classdesc serialVersionUID = 5032037208639381169, local class serialVersionUID = -9085606473104903453 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency
Re: Opening many Parquet files = slow
Hi Eric - Would you mind to try either disabling schema merging as what Michael suggested, or disabling the new Parquet data source by sqlContext.setConf(spark.sql.parquet.useDataSourceApi, false) Cheng On 4/9/15 2:43 AM, Michael Armbrust wrote: Thanks for the report. We improved the speed here in 1.3.1 so would be interesting to know if this helps. You should also try disabling schema merging if you do not need that feature (i.e. all of your files are the same schema). sqlContext.load(path, parquet, Map(mergeSchema - false)) On Wed, Apr 8, 2015 at 7:35 AM, Ted Yu yuzhih...@gmail.com mailto:yuzhih...@gmail.com wrote: You may have seen this thread: http://search-hadoop.com/m/JW1q5SlRpt1 Cheers On Wed, Apr 8, 2015 at 6:15 AM, Eric Eijkelenboom eric.eijkelenb...@gmail.com mailto:eric.eijkelenb...@gmail.com wrote: Hi guys *I’ve got:* * 180 days of log data in Parquet. * Each day is stored in a separate folder in S3. * Each day consists of 20-30 Parquet files of 256 MB each. * Spark 1.3 on Amazon EMR This makes approximately 5000 Parquet files with a total size if 1.5 TB. *My code*: val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”) *Problem*: Before the very first stage is started, Spark spends about 25 minutes printing the following: ... 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for reading at position '258305902' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for reading at position '260897108' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading at position '261259189' 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73' for reading … etc It looks like Spark is opening each file, before it actually does any work. This means a delay of 25 minutes when working with Parquet files. Previously, we used LZO files and did not experience this problem. *Bonus info: * This also happens when I use auto partition discovery (i.e. sqlContext.parquetFile(“/path/to/logsroot/)). What can I do to avoid this? Thanks in advance! Eric Eijkelenboom
RE: Advice using Spark SQL and Thrift JDBC Server
Hey Patrick, Michael and Todd, Thank you for your help! As you guys recommended, I did a local install and got my code to compile. As an FYI, on my local machine the sbt build fails even if I add –DskipTests. So I used mvn. Mohammed From: Patrick Wendell [mailto:patr...@databricks.com] Sent: Wednesday, April 8, 2015 6:16 PM To: Todd Nist Cc: Mohammed Guller; Michael Armbrust; James Aley; user Subject: Re: Advice using Spark SQL and Thrift JDBC Server Hey Guys, Someone submitted a patch for this just now. It's a very simple fix and we can merge it soon. However, it's just missed our timeline for Spark 1.3.1, so the upstream thing won't get fully published until 1.3.2. However, you can always just install locally and build against your local install. - Patrick On Wed, Apr 8, 2015 at 4:38 PM, Todd Nist tsind...@gmail.commailto:tsind...@gmail.com wrote: Hi Mohammed, I think you just need to add -DskipTests to you build. Here is how I built it: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests clean package install build/sbt does however fail even if only doing package which should skip tests. I am able to build the MyThriftServer above now. Thanks Michael for the assistance. -Todd On Wed, Apr 8, 2015 at 3:39 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Michael, Thank you! Looks like the sbt build is broken for 1.3. I downloaded the source code for 1.3, but I get the following error a few minutes after I run “sbt/sbt publishLocal” [error] (network-shuffle/*:update) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-network-common_2.10;1.3.0: configuration not public in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It was required from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test [error] Total time: 106 s, completed Apr 8, 2015 12:33:45 PM Mohammed From: Michael Armbrust [mailto:mich...@databricks.commailto:mich...@databricks.com] Sent: Wednesday, April 8, 2015 11:54 AM To: Mohammed Guller Cc: Todd Nist; James Aley; user; Patrick Wendell Subject: Re: Advice using Spark SQL and Thrift JDBC Server Sorry guys. I didn't realize that https://issues.apache.org/jira/browse/SPARK-4925 was not fixed yet. You can publish locally in the mean time (sbt/sbt publishLocal). On Wed, Apr 8, 2015 at 8:29 AM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: +1 Interestingly, I ran into the exactly the same issue yesterday. I couldn’t find any documentation about which project to include as a dependency in build.sbt to use HiveThriftServer2. Would appreciate help. Mohammed From: Todd Nist [mailto:tsind...@gmail.commailto:tsind...@gmail.com] Sent: Wednesday, April 8, 2015 5:49 AM To: James Aley Cc: Michael Armbrust; user Subject: Re: Advice using Spark SQL and Thrift JDBC Server To use the HiveThriftServer2.startWithContext, I thought one would use the following artifact in the build: org.apache.spark%% spark-hive-thriftserver % 1.3.0 But I am unable to resolve the artifact. I do not see it in maven central or any other repo. Do I need to build Spark and publish locally or just missing something obvious here? Basic class is like this: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveMetastoreTypes._ import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.thriftserver._ object MyThriftServer { val sparkConf = new SparkConf() // master is passed to spark-submit, but could also be specified explicitely // .setMaster(sparkMaster) .setAppName(My ThriftServer) .set(spark.cores.max, 2) val sc = new SparkContext(sparkConf) val sparkContext = sc import sparkContext._ val sqlContext = new HiveContext(sparkContext) import sqlContext._ import sqlContext.implicits._ // register temp tables here HiveThriftServer2.startWithContext(sqlContext) } Build has the following: scalaVersion := 2.10.4 val SPARK_VERSION = 1.3.0 libraryDependencies ++= Seq( org.apache.spark %% spark-streaming-kafka % SPARK_VERSION exclude(org.apache.spark, spark-core_2.10) exclude(org.apache.spark, spark-streaming_2.10) exclude(org.apache.spark, spark-sql_2.10) exclude(javax.jms, jms), org.apache.spark %% spark-core % SPARK_VERSION % provided, org.apache.spark %% spark-streaming % SPARK_VERSION % provided, org.apache.spark %% spark-sql % SPARK_VERSION % provided, org.apache.spark %% spark-hive % SPARK_VERSION % provided, org.apache.spark %% spark-hive-thriftserver % SPARK_VERSION % provided, org.apache.kafka %% kafka % 0.8.1.1 exclude(javax.jms, jms) exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri), joda-time % joda-time % 2.7, log4j % log4j % 1.2.14 exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx,
Re: parquet partition discovery
On 4/9/15 3:09 AM, Michael Armbrust wrote: Back to the user list so everyone can see the result of the discussion... Ah. It all makes sense now. The issue is that when I created the parquet files, I included an unnecessary directory name (data.parquet) below the partition directories. It’s just a leftover from when I started with Michael’s sample code and it only made sense before I added the partition directories. I probably thought it was some magic name that was required when spark scanned for parquet files. The structure looks something like this: drwxr-xr-x - user supergroup 0 2015-04-02 13:17 hdfs://host/tablename/date=20150302/sym=A/data.parquet/... If I just move all the files up a level (there goes a day of work) , the existing code should work fine. Whether it’s useful to handle intermediate non-partition directories or whether that just creates some extra risk I can’t say, since I’m new to all the technology in this whole stack. I'm mixed here. There is always a tradeoff between silently ignoring structure that people might not be aware of (and thus might be a bug) and just working. Having this as an option at least certainly seems reasonable. I'd be curious if anyone had other thoughts? Take the following directory name as an example: /path/to/partition/a=1/random/b=foo One possible approach can be, we grab both a=1 and b=foo, then either report random by throwing an exception or ignore it with a WARN log. Unfortunately, it takes many minutes (even with mergeSchema=false) to create the RDD. It appears that the whole data store will still be recursively traversed (even with mergeSchema=false, a manually specified schema, and a partition spec [which I can’t pass in through a public API]) so that all of the metadata FileStatuses can be cached. In my case I’m going to have years of data, so there’s no way that will be feasible. Should I just explicitly load the partitions I want instead of using partition discovery? Is there any plan to have a less aggressive version of support for partitions, where metadata is only cached for partitions that are used in queries? We improved the speed here in 1.3.1 so I'd be curious if that helps. We definitely need to continue to speed things up here though. We have to enumerate all the partitions so we know what data to read when a query comes in, but I do think we can parallelize it or something.
Re: Empty RDD?
Aah yes. The jsonRDD method needs to walk through the whole RDD to understand the schema, and does not work if there is not data in it. Making sure there is no data in it using take(1) should work. TD
Re: Cannot run unit test.
It's because your tests are running in parallel and you can only have one context running at a time. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-run-unit-test-tp14459p22429.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Regarding GroupBy
I wanted to run the groupBy(partition ) but this is not working. here first part in pairvendorData will be repeated multiple second part. Both are object do I need to overrite the equals and hash code? Is groupBy fast enough? JavaPairRDDVendorRecord, VendorRecord pairvendorData =matchRdd.flatMapToPair( new PairFlatMapFunctionVendorRecord, VendorRecord, VendorRecord(){ @Override public IterableTuple2VendorRecord,VendorRecord call( VendorRecord t) throws Exception { ListTuple2VendorRecord, VendorRecord pairs = new LinkedListTuple2VendorRecord, VendorRecord(); CompanyMatcherHelper helper = new CompanyMatcherHelper(); MatcherKeys matchkeys=helper.getBlockinkeys(t); ListVendorRecord Matchedrecords =ckdao.getMatchingRecordCknids(matchkeys); log.info(List Size is+Matchedrecords.size()); for(int i=0;iMatchedrecords.size();i++){ pairs.add( new Tuple2VendorRecord,VendorRecord(t,Matchedrecords.get(i))); } return pairs; } } );
Re: Reading file with Unicode characters
Thanks! arun On Wed, Apr 8, 2015 at 10:51 AM, java8964 java8...@hotmail.com wrote: Spark use the Hadoop TextInputFormat to read the file. Since Hadoop is almost only supporting Linux, so UTF-8 is the only encoding supported, as it is the the one on Linux. If you have other encoding data, you may want to vote for this Jira: https://issues.apache.org/jira/browse/MAPREDUCE-232 Yong -- Date: Wed, 8 Apr 2015 10:35:18 -0700 Subject: Reading file with Unicode characters From: lists.a...@gmail.com To: user@spark.apache.org CC: lists.a...@gmail.com Hi, Does SparkContext's textFile() method handle files with Unicode characters? How about files in UTF-8 format? Going further, is it possible to specify encodings to the method? If not, what should one do if the files to be read are in some encoding? Thanks, arun
Re: Empty RDD?
Thanks TD! On Apr 8, 2015, at 9:36 PM, Tathagata Das t...@databricks.com wrote: Aah yes. The jsonRDD method needs to walk through the whole RDD to understand the schema, and does not work if there is not data in it. Making sure there is no data in it using take(1) should work. TD - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Opening many Parquet files = slow
We noticed similar perf degradation using Parquet (outside of Spark) and it happened due to merging of multiple schemas. Would be good to know if disabling merge of schema (if the schema is same) as Michael suggested helps in your case. On Wed, Apr 8, 2015 at 11:43 AM, Michael Armbrust mich...@databricks.com wrote: Thanks for the report. We improved the speed here in 1.3.1 so would be interesting to know if this helps. You should also try disabling schema merging if you do not need that feature (i.e. all of your files are the same schema). sqlContext.load(path, parquet, Map(mergeSchema - false)) On Wed, Apr 8, 2015 at 7:35 AM, Ted Yu yuzhih...@gmail.com wrote: You may have seen this thread: http://search-hadoop.com/m/JW1q5SlRpt1 Cheers On Wed, Apr 8, 2015 at 6:15 AM, Eric Eijkelenboom eric.eijkelenb...@gmail.com wrote: Hi guys *I’ve got:* - 180 days of log data in Parquet. - Each day is stored in a separate folder in S3. - Each day consists of 20-30 Parquet files of 256 MB each. - Spark 1.3 on Amazon EMR This makes approximately 5000 Parquet files with a total size if 1.5 TB. *My code*: val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”) *Problem*: Before the very first stage is started, Spark spends about 25 minutes printing the following: ... 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for reading at position '258305902' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for reading at position '260897108' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading at position '261259189' 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening ' s3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73' for reading … etc It looks like Spark is opening each file, before it actually does any work. This means a delay of 25 minutes when working with Parquet files. Previously, we used LZO files and did not experience this problem. *Bonus info: * This also happens when I use auto partition discovery (i.e. sqlContext.parquetFile(“/path/to/logsroot/)). What can I do to avoid this? Thanks in advance! Eric Eijkelenboom
Re: function to convert to pair
Please take a look at zipWithIndex() of RDD. Cheers On Wed, Apr 8, 2015 at 3:40 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have a RDDSomeObject I want to convert it to RDDsequenceNumber,SomeObject this sequence number can be 1 for first SomeObject 2 for second SomeOjejct Regards jeet
Re: [ThriftServer] User permissions warning
The Thrift server hasn't support authentication or Hadoop doAs yet, so you can simply ignore this warning. To avoid this, when connecting via JDBC you may specify the user to the same user who starts the Thrift server process. For Beeline, use -n user. On 4/8/15 11:49 PM, Yana Kadiyska wrote: Hi folks, I am noticing a pesky and persistent warning in my logs (this is from Spark 1.2.1): | 15/04/08 15:23:05 WARN ShellBasedUnixGroupsMapping: got exception trying to get groups for user anonymous org.apache.hadoop.util.Shell$ExitCodeException: id: anonymous: No such user at org.apache.hadoop.util.Shell.runCommand(Shell.java:261) at org.apache.hadoop.util.Shell.run(Shell.java:188) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:381) at org.apache.hadoop.util.Shell.execCommand(Shell.java:467) at org.apache.hadoop.util.Shell.execCommand(Shell.java:450) at org.apache.hadoop.security.ShellBasedUnixGroupsMapping.getUnixGroups(ShellBasedUnixGroupsMapping.java:86) at org.apache.hadoop.security.ShellBasedUnixGroupsMapping.getGroups(ShellBasedUnixGroupsMapping.java:55) at org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.getGroups(JniBasedUnixGroupsMappingWithFallback.java:50) at org.apache.hadoop.security.Groups.getGroups(Groups.java:89) at org.apache.hadoop.security.UserGroupInformation.getGroupNames(UserGroupInformation.java:1292) at org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator.setConf(HadoopDefaultAuthenticator.java:62) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:70) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130) at org.apache.hadoop.hive.ql.metadata.HiveUtils.getAuthenticator(HiveUtils.java:365) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:278) | I cannot figure out what I might be missing -- the thrift server is started via sbin/start-thriftserver --master ..., I can see that the process is running under my user. I don't have any functional issues but this is annoying (filling up my logs/making it heard to read). Can someone give me pointers on what to check? Things I've tried: 1. hive.server2.enable.doAs is NOT set in hive-site.xml so I expect user should at least show up as my id, not anonymous 2.export HADOOP_USER_NAME=someusername -- error still shows up about anonymous Curious if anyone has solved this
Re: Empty RDD?
What is the computation you are doing in the foreachRDD, that is throwing the exception? One way to guard against is to do a take(1) to see if you get back any data. If there is none, then don't do anything with the RDD. TD On Wed, Apr 8, 2015 at 1:08 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: When I call *transform* or *foreachRDD *on* DStream*, I keep getting an error that I have an empty RDD, which make sense since my batch interval maybe smaller than the rate at which new data are coming in. How to guard against it? Thanks, Vadim ᐧ
function to convert to pair
Hi All I have a RDDSomeObject I want to convert it to RDDsequenceNumber,SomeObject this sequence number can be 1 for first SomeObject 2 for second SomeOjejct Regards jeet
Re: Class incompatible error
For the build I am using java version 1.7.0_65 which seems to be the same as the one on the spark host. However one is Oracle and the other is OpenJDK. Does that make any difference? On Wed, Apr 8, 2015 at 1:24 PM, Ted Yu yuzhih...@gmail.com wrote: What version of Java do you use to build ? Cheers On Wed, Apr 8, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am seeing the following, is this because of my maven version? 15/04/08 15:42:22 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-241-251-232.us-west-2.compute.internal): java.io.InvalidClassException: org.apache.spark.Aggregator; local class incompatible: stream classdesc serialVersionUID = 5032037208639381169, local class serialVersionUID = -9085606473104903453 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency
Re: Class incompatible error
What version of Java do you use to build ? Cheers On Wed, Apr 8, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am seeing the following, is this because of my maven version? 15/04/08 15:42:22 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-241-251-232.us-west-2.compute.internal): java.io.InvalidClassException: org.apache.spark.Aggregator; local class incompatible: stream classdesc serialVersionUID = 5032037208639381169, local class serialVersionUID = -9085606473104903453 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency
Re: Support for Joda
Which version of Joda are you using ? Here is snippet of dependency:tree out w.r.t. Joda : [INFO] +- org.apache.flume:flume-ng-core:jar:1.4.0:compile ... [INFO] | +- joda-time:joda-time:jar:2.1:compile FYI On Wed, Apr 8, 2015 at 12:53 PM, Patrick Grandjean p.r.grandj...@gmail.com wrote: Hi, I have an RDD with objects containing Joda's LocalDate. When trying to save the RDD as Parquet, I get an exception. Here is the code: - val sqlC = new org.apache.spark.sql.SQLContext(sc) import sqlC._ myRDD.saveAsParquetFile(parquet) - The exception: Exception in thread main scala.MatchError: org.joda.time.LocalDate (of class scala.reflect.internal.Types$TypeRef$$anon$6) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:105) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:125) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:123) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:123) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:100) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33) at org.apache.spark.sql.catalyst.ScalaReflection$class.attributesFor(ScalaReflection.scala:94) at org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:33) at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:111) Is it possible to extend Spark with adapters in order to support new types? How to add support for Joda types? I am using spark 1.2.1 with cloudera 5.3.2 Patrick.
Re: Unit testing with HiveContext
Please take a look at sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala : protected def configure(): Unit = { warehousePath.delete() metastorePath.delete() setConf(javax.jdo.option.ConnectionURL, sjdbc:derby:;databaseName=$metastorePath;create=true) setConf(hive.metastore.warehouse.dir, warehousePath.toString) } Cheers On Wed, Apr 8, 2015 at 1:07 PM, Daniel Siegmann daniel.siegm...@teamaol.com wrote: I am trying to unit test some code which takes an existing HiveContext and uses it to execute a CREATE TABLE query (among other things). Unfortunately I've run into some hurdles trying to unit test this, and I'm wondering if anyone has a good approach. The metastore DB is automatically created in the local directory, but it doesn't seem to be cleaned up afterward. Is there any way to get Spark to clean this up when the context is stopped? Or can I point this to some other location, such as a temp directory? Trying to create a table fails because it is using the default warehouse directory (/user/hive/warehouse). Is there some way to change this without hard-coding a directory in a hive-site.xml; again, I'd prefer to point it to a temp directory so it will be automatically removed. I tried a couple of things that didn't work: - hiveContext.sql(SET hive.metastore.warehouse.dir=/tmp/dir/xyz) - hiveContext.setConf(hive.metastore.warehouse.dir, /tmp/dir/xyz) Any advice from those who have been here before would be appreciated.
Re: Advice using Spark SQL and Thrift JDBC Server
Hi Mohammed, I think you just need to add -DskipTests to you build. Here is how I built it: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests clean package install build/sbt does however fail even if only doing package which should skip tests. I am able to build the MyThriftServer above now. Thanks Michael for the assistance. -Todd On Wed, Apr 8, 2015 at 3:39 PM, Mohammed Guller moham...@glassbeam.com wrote: Michael, Thank you! Looks like the sbt build is broken for 1.3. I downloaded the source code for 1.3, but I get the following error a few minutes after I run “sbt/sbt publishLocal” [error] (network-shuffle/*:update) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-network-common_2.10;1.3.0: configuration not public in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It was required from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test [error] Total time: 106 s, completed Apr 8, 2015 12:33:45 PM Mohammed *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Wednesday, April 8, 2015 11:54 AM *To:* Mohammed Guller *Cc:* Todd Nist; James Aley; user; Patrick Wendell *Subject:* Re: Advice using Spark SQL and Thrift JDBC Server Sorry guys. I didn't realize that https://issues.apache.org/jira/browse/SPARK-4925 was not fixed yet. You can publish locally in the mean time (sbt/sbt publishLocal). On Wed, Apr 8, 2015 at 8:29 AM, Mohammed Guller moham...@glassbeam.com wrote: +1 Interestingly, I ran into the exactly the same issue yesterday. I couldn’t find any documentation about which project to include as a dependency in build.sbt to use HiveThriftServer2. Would appreciate help. Mohammed *From:* Todd Nist [mailto:tsind...@gmail.com] *Sent:* Wednesday, April 8, 2015 5:49 AM *To:* James Aley *Cc:* Michael Armbrust; user *Subject:* Re: Advice using Spark SQL and Thrift JDBC Server To use the HiveThriftServer2.startWithContext, I thought one would use the following artifact in the build: org.apache.spark%% spark-hive-thriftserver % 1.3.0 But I am unable to resolve the artifact. I do not see it in maven central or any other repo. Do I need to build Spark and publish locally or just missing something obvious here? Basic class is like this: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveMetastoreTypes._ import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.thriftserver._ object MyThriftServer { val sparkConf = new SparkConf() // master is passed to spark-submit, but could also be specified explicitely // .setMaster(sparkMaster) .setAppName(My ThriftServer) .set(spark.cores.max, 2) val sc = new SparkContext(sparkConf) val sparkContext = sc import sparkContext._ val sqlContext = new HiveContext(sparkContext) import sqlContext._ import sqlContext.implicits._ // register temp tables here HiveThriftServer2.startWithContext(sqlContext) } Build has the following: scalaVersion := 2.10.4 val SPARK_VERSION = 1.3.0 libraryDependencies ++= Seq( org.apache.spark %% spark-streaming-kafka % SPARK_VERSION exclude(org.apache.spark, spark-core_2.10) exclude(org.apache.spark, spark-streaming_2.10) exclude(org.apache.spark, spark-sql_2.10) exclude(javax.jms, jms), org.apache.spark %% spark-core % SPARK_VERSION % provided, org.apache.spark %% spark-streaming % SPARK_VERSION % provided, org.apache.spark %% spark-sql % SPARK_VERSION % provided, org.apache.spark %% spark-hive % SPARK_VERSION % provided, org.apache.spark %% spark-hive-thriftserver % SPARK_VERSION % provided, org.apache.kafka %% kafka % 0.8.1.1 exclude(javax.jms, jms) exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri), joda-time % joda-time % 2.7, log4j % log4j % 1.2.14 exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri) ) Appreciate the assistance. -Todd On Tue, Apr 7, 2015 at 4:09 PM, James Aley james.a...@swiftkey.com wrote: Excellent, thanks for your help, I appreciate your advice! On 7 Apr 2015 20:43, Michael Armbrust mich...@databricks.com wrote: That should totally work. The other option would be to run a persistent metastore that multiple contexts can talk to and periodically run a job that creates missing tables. The trade-off here would be more complexity, but less downtime due to the server restarting. On Tue, Apr 7, 2015 at 12:34 PM, James Aley james.a...@swiftkey.com wrote: Hi Michael, Thanks so much for the reply - that really cleared a lot of things up for me! Let me just check that I've interpreted one of your suggestions for (4) correctly... Would it make sense for me to write a