Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
in spark 1.1 maybe not so easy like spark 1.0 after commit: https://issues.apache.org/jira/browse/SPARK-2446 only binary with UTF8 annotation will be recognized as string after this commit, but in impala strings are always without UTF8 anno -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10490.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
Hi, I don't think anybody has been testing importing of Impala tables directly. Is there any chance to export these first, say as unpartitioned Hive tables and import these? Just an idea.. Andre On 07/21/2014 11:46 PM, chutium wrote: no, something like this 14/07/20 00:19:29 ERROR cluster.YarnClientClusterScheduler: Lost executor 2 on 02.xxx: remote Akka client disassociated ... ... 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Lost TID 832 (task 1.2:186) 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599) at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) ulimit is increased -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10344.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
I haven't had a chance to look at the details of this issue, but we have seen Spark successfully read Parquet tables created by Impala. On Tue, Jul 22, 2014 at 10:10 AM, Andre Schumacher andre.sc...@gmail.com wrote: Hi, I don't think anybody has been testing importing of Impala tables directly. Is there any chance to export these first, say as unpartitioned Hive tables and import these? Just an idea.. Andre On 07/21/2014 11:46 PM, chutium wrote: no, something like this 14/07/20 00:19:29 ERROR cluster.YarnClientClusterScheduler: Lost executor 2 on 02.xxx: remote Akka client disassociated ... ... 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Lost TID 832 (task 1.2:186) 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599) at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) ulimit is increased -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10344.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
Instead of using union, can you try sqlContext.parquetFile(/user/ hive/warehouse/xxx_parquet.db).registerAsTable(parquetTable)? Then, var all = sql(select some_id, some_type, some_time from parquetTable).map(line = (line(0), (line(1).toString, line(2).toString.substring(0, 19 Thanks, Yin On Sun, Jul 20, 2014 at 8:58 AM, chutium teng@gmail.com wrote: like this: val sc = new SparkContext(new SparkConf().setAppName(SLA Filter)) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val suffix = args(0) sqlContext.parquetFile(/user/hive/warehouse/xxx_parquet.db/xx001_ + suffix).registerAsTable(xx001) sqlContext.parquetFile(/user/hive/warehouse/xxx_parquet.db/xx002_ + suffix).registerAsTable(xx002) ... ... var xx001 = sql(select some_id, some_type, some_time from xx001).map(line = (line(0), (line(1).toString, line(2).toString.substring(0, 19)) ) ) var xx002 = sql(select some_id, some_type, some_time from xx002).map(line = (line(0), (line(1).toString, line(2).toString.substring(0, 19)) ) ) ... ... var all = xx001 union xx002 ... union ... all..groupByKey.filter( kv = FilterSLA.filterSLA(kv._2.toSeq) ).saveAsTextFile(xxx) filterSLA will turn the input Seq[(String, String)] to Map, then check somethinkg like if map contains type1 and type2 and then if timestamp_type1 - timestamp_type2 2days thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10268.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
Hi, unfortunately it is not so straightforward xxx_parquet.db is a folder of managed database created by hive/impala, so, every sub element in it is a table in hive/impala, they are folders in HDFS, and each table has different schema, and in its folder there are one or more parquet files. that means xx001_suffix xx002_suffix are folders, there are some parquet files like xx001_suffix/parquet_file1_with_schema1 xx002_suffix/parquet_file1_with_schema2 xx002_suffix/parquet_file2_with_schema2 it seems only union can do this job~ Nonetheless, thank you very much, maybe the only reason is that spark eating up too much memory... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10335.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
What's the exception you're seeing? Is it an OOM? On Mon, Jul 21, 2014 at 11:20 AM, chutium teng@gmail.com wrote: Hi, unfortunately it is not so straightforward xxx_parquet.db is a folder of managed database created by hive/impala, so, every sub element in it is a table in hive/impala, they are folders in HDFS, and each table has different schema, and in its folder there are one or more parquet files. that means xx001_suffix xx002_suffix are folders, there are some parquet files like xx001_suffix/parquet_file1_with_schema1 xx002_suffix/parquet_file1_with_schema2 xx002_suffix/parquet_file2_with_schema2 it seems only union can do this job~ Nonetheless, thank you very much, maybe the only reason is that spark eating up too much memory... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10335.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
no, something like this 14/07/20 00:19:29 ERROR cluster.YarnClientClusterScheduler: Lost executor 2 on 02.xxx: remote Akka client disassociated ... ... 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Lost TID 832 (task 1.2:186) 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599) at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) ulimit is increased -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10344.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
like this: val sc = new SparkContext(new SparkConf().setAppName(SLA Filter)) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val suffix = args(0) sqlContext.parquetFile(/user/hive/warehouse/xxx_parquet.db/xx001_ + suffix).registerAsTable(xx001) sqlContext.parquetFile(/user/hive/warehouse/xxx_parquet.db/xx002_ + suffix).registerAsTable(xx002) ... ... var xx001 = sql(select some_id, some_type, some_time from xx001).map(line = (line(0), (line(1).toString, line(2).toString.substring(0, 19)) ) ) var xx002 = sql(select some_id, some_type, some_time from xx002).map(line = (line(0), (line(1).toString, line(2).toString.substring(0, 19)) ) ) ... ... var all = xx001 union xx002 ... union ... all..groupByKey.filter( kv = FilterSLA.filterSLA(kv._2.toSeq) ).saveAsTextFile(xxx) filterSLA will turn the input Seq[(String, String)] to Map, then check somethinkg like if map contains type1 and type2 and then if timestamp_type1 - timestamp_type2 2days thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10268.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
Can you attach your code? Thanks, Yin On Sat, Jul 19, 2014 at 4:10 PM, chutium teng@gmail.com wrote: 160G parquet files (ca. 30 files, snappy compressed, made by cloudera impala) ca. 30 full table scan, took 3-5 columns out, then some normal scala operations like substring, groupby, filter, at the end, save as file in HDFS yarn-client mode, 23 core and 60G mem / node but, always failed ! startup script (3 NodeManager, each an executor): some screenshot: http://apache-spark-user-list.1001560.n3.nabble.com/file/n10254/spark1.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n10254/spark2.png i got some log like: same job using standalone mode (3 slaves) works... startup script (each 24 cores, 64g mem) : any idea? thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254.html Sent from the Apache Spark User List mailing list archive at Nabble.com.