How to preserve the order of parquet files?

2018-02-07 Thread Kevin Jung
Hi all, In spark 2.2.1, when I load parquet files, it shows differently ordered result of original dataset. It seems like FileSourceScanExec.createNonBucketedReadRDD method sorts parquet file splits by their own lengths. - val splitFiles = selectedPartitions.flatMap { partition =>

Spark summit Asia

2015-09-07 Thread Kevin Jung
Is there any plan to hold Spark summit in Asia? I'm very much looking forward to it. Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-summit-Asia-tp24598.html Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Unable to build Spark 1.5, is build broken or can anyone successfully build?

2015-08-30 Thread Kevin Jung
I expect it because the versions are not in the range defined in pom.xml. You should upgrade your maven version to 3.3.3 and JDK to 1.7. Spark team already knows this issue so you can get some information on community board of developers. Kevin -- View this message in context:

Drop table and Hive warehouse

2015-08-24 Thread Kevin Jung
When I store DataFrame as table with command saveAsTable and then execute DROP TABLE in SparkSQL, it doesn't actually delete files in hive warehouse. The table disappears from a table list but the data files are still alive. Because of this, I can't saveAsTable with a same name before dropping

Re: Drop table and Hive warehouse

2015-08-24 Thread Kevin Jung
warehouse Thats not the expected behavior. What version of Spark? On Mon, Aug 24, 2015 at 1:32 AM, Kevin Jung itsjb.j...@samsung.com wrote: When I store DataFrame as table with command saveAsTable and then execute DROP TABLE in SparkSQL, it doesn't actually delete files in hive warehouse. The table

SaveAsTable changes the order of rows

2015-08-19 Thread Kevin Jung
I have a simple RDD with Key/Value and do val partitioned = rdd.partitionBy(new HashPartitioner(400)) val row = partitioned.first I can get a key G2726 from a returned row. This first row is located on a partition #0 because G2726.hashCode is 67114000 and 67114000%400 is 0. But the order of

Can't find directory after resetting REPL state

2015-08-15 Thread Kevin Jung
Spark shell can't find base directory of class server after running :reset command. scala :reset scala 1 uncaught exception during compilation: java.lang.AssertiON-ERROR java.lang.AssertiON-ERROR: assertion failed: Tried to find '$line33' in '/tmp/spark-f47f3917-ac31-4138-bf1a-a8cefd094ac3'

Re: What is the optimal approach to do Secondary Sort in Spark?

2015-08-11 Thread Kevin Jung
You should create key as tuple type. In your case, RDD[((id, timeStamp) , value)] is the proper way to do. Kevin --- Original Message --- Sender : swethaswethakasire...@gmail.com Date : 2015-08-12 09:37 (GMT+09:00) Title : What is the optimal approach to do Secondary Sort in Spark? Hi,

GenericRowWithSchema is too heavy

2015-07-27 Thread Kevin Jung
Hi all, SparkSQL usually creates DataFrame with GenericRowWithSchema(is that right?). And 'Row' is a super class of GenericRow and GenericRowWithSchema. The only difference is that GenericRowWithSchema has its schema information as StructType. But I think one DataFrame has only one schema then

MapType in spark-sql

2015-01-20 Thread Kevin Jung
Hi all How can I add MapType and ArrayType to schema when I create StructType programmatically? val schema = StructType( schemaString.split( ).map(fieldName = StructField(fieldName, StringType, true))) above code from spark document works fine but if I change StringType to MapType or

Re: How to compute RDD[(String, Set[String])] that include large Set

2015-01-19 Thread Kevin Jung
As far as I know, the tasks before calling saveAsText are transformations so that they are lazy computed. Then saveAsText action performs all transformations and your Set[String] grows up at this time. It creates large collection if you have few keys and this causes OOM easily when your executor

Manually trigger RDD map function without action

2015-01-12 Thread Kevin Jung
Hi all Is there efficient way to trigger RDD transformations? I'm now using count action to achieve this. Best regards Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html Sent from the Apache

Re: Manually trigger RDD map function without action

2015-01-12 Thread Kevin Jung
Cody said If you don't care about the value that your map produced (because you're not already collecting or saving it), then is foreach more appropriate to what you're doing? but I can not see it from this thread. Anyway, I performed small benchmark to test what function is the most efficient

Shuffle write increases in spark 1.2

2014-12-29 Thread Kevin Jung
Hi all, The size of shuffle write showing in spark web UI is mush different when I execute same spark job on same input data(100GB) in both spark 1.1 and spark 1.2. At the same sortBy stage, the size of shuffle write is 39.7GB in spark 1.1 but 91.0GB in spark 1.2. I set spark.shuffle.manager

Partitioner in sortBy

2014-12-10 Thread Kevin Jung
Hi, I'm wondering if I change RangePartitioner in sortBy to another partitioner like HashPartitioner. The first thing that comes into my head is that it can not be replaceable due to RangePartitioner is a part of the sort algorithm. If we call mapPartitions on key based partition after sorting, we

spark code style

2014-11-21 Thread Kevin Jung
Hi all. Here are two code snippets. And they will produce the same result. 1. rdd.map( function ) 2. rdd.map( function1 ).map( function2 ).map( function3 ) What are the pros and cons of these two methods? Regards Kevin -- View this message in context:

Re: default parallelism bug?

2014-10-20 Thread Kevin Jung
I use Spark 1.1.0 and set these options to spark-defaults.conf spark.scheduler.mode FAIR spark.cores.max 48 spark.default.parallelism 72 Thanks, Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/default-parallelism-bug-tp16787p16894.html Sent from the

default parallelism bug?

2014-10-19 Thread Kevin Jung
Hi, I usually use file on hdfs to make PairRDD and analyze it by using combineByKey,reduceByKey, etc. But sometimes it hangs when I set spark.default.parallelism configuration, though the size of file is small. If I remove this configuration, all works fine. Does anyone tell me why this occur?

Stucked job work well after rdd.count or rdd.collect

2014-10-05 Thread Kevin Jung
Hi, all. I'm in an unusual situation. The code, ... 1: val cell = dataSet.flatMap(parse(_)).cache 2: val distinctCell = cell.keyBy(_._1).reduceByKey(removeDuplication(_, _)).mapValues(_._3).cache 3: val groupedCellByLine = distinctCell.map(cellToIterableColumn).groupByKey.cache 4: val result = (1

How to clear broadcast variable from driver memory?

2014-09-03 Thread Kevin Jung
Hi, I tried Broadcast.unpersist() on Spark 1.0.1 but MemoryStore(driver memory) still allocated it. //LOGS //Block broadcast_0 stored as values to memory (estimated size 380.1 MB, free 5.7 GB) The free size of memory was same after calling unpersist. Can I clear this? -- View this message in

Re: zip equal-length but unequally-partition

2014-09-02 Thread Kevin Jung
I just created it. Here's ticket. https://issues.apache.org/jira/browse/SPARK-3364 Thanks, Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/zip-equal-length-but-unequally-partition-tp13246p13330.html Sent from the Apache Spark User List mailing list

zip equal-length but unequally-partition

2014-09-01 Thread Kevin Jung
http://www.adamcrume.com/blog/archive/2014/02/19/fixing-sparks-rdd-zip http://www.adamcrume.com/blog/archive/2014/02/19/fixing-sparks-rdd-zip Please check this url . I got same problem in v1.0.1 In some cases, RDD losts several elements after zip so that a total count of ZippedRDD is less than

how can I get the number of cores

2014-08-29 Thread Kevin Jung
Hi all Spark web ui gives me the information about total cores and used cores. I want to get this information programmatically. How can I do this? Thanks Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-can-I-get-the-number-of-cores-tp13111.html

iterator cause NotSerializableException

2014-08-22 Thread Kevin Jung
Hi The following code gives me 'Task not serializable: java.io.NotSerializableException: scala.collection.mutable.ArrayOps$ofInt' var x = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3) var iter = Array(5).toIterator var value = 5 var value2 = iter.next x.map( q = q*value).collect //Line 1, it works.

Re: Transform RDD[List]

2014-08-12 Thread Kevin Jung
Thanks for your answer. Yes, I want to transpose data. At this point, I have one more question. I tested it with RDD1 List(1, 2, 3, 4, 5) List(6, 7, 8, 9, 10) List(11, 12, 13, 14, 15) List(16, 17, 18, 19, 20) And the result is... ArrayBuffer(11, 1, 16, 6) ArrayBuffer(2, 12, 7, 17)

Transform RDD[List]

2014-08-11 Thread Kevin Jung
Hi It may be simple question, but I can not figure out the most efficient way. There is a RDD containing list. RDD ( List(1,2,3,4,5) List(6,7,8,9,10) ) I want to transform this to RDD ( List(1,6) List(2,7) List(3,8) List(4,9) List(5,10) ) And I want to achieve this without using collect

Re: Transform RDD[List]

2014-08-11 Thread Kevin Jung
Hi ssimanta. The first line creates RDD[Int], not RDD[List[Int]]. In case of List , I can not zip all list elements in RDD like a.zip(b) and I can not use only tuple2 because realworld RDD has more List elements in source RDD. So I guess the expected result depends on the count of original Lists.

SparkSQL can not use SchemaRDD from Hive

2014-07-28 Thread Kevin Jung
Hi I got a error message while using Hive and SparkSQL. This is code snippet I used. (in spark-shell , 1.0.0) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val hive = new org.apache.spark.sql.hive.HiveContext(sc) var sample = hive.hql(select * from sample10) // This

Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$

2014-07-20 Thread Kevin Jung
Hi, Victor I got the same issue and I posted it. In my case, it only happens when I query some spark-sql on spark 1.0.1 but for spark 1.0.0, it works properly. Have you run the same job on spark 1.0.0 ? Sincerely, Kevin -- View this message in context:

Spark 1.0.1 akka connection refused

2014-07-15 Thread Kevin Jung
Hi, I recently upgrade my spark 1.0.0 cluster to 1.0.1. But it gives me ERROR remote.EndpointWriter: AssociationError when I run simple SparkSQL job in spark-shell. here is code, val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class Person(name:String, Age:Int,

Re: Spark 1.0.1 akka connection refused

2014-07-15 Thread Kevin Jung
UPDATES: It happens only when I use 'case class' and map RDD to this class in spark-shell. The other RDD transform, SchemaRDD with parquet file and any SparkSQL operation work fine. Is there some changes related to case class operation between 1.0.0 and 1.0.1? Best regards Kevin -- View this

Case class in java

2014-07-03 Thread Kevin Jung
Hi, I'm trying to convert scala spark job into java. In case of scala, I typically use 'case class' to apply schema to RDD. It can be converted into POJO class in java, but what I really want to do is dynamically creating POJO classes like scala REPL do. For this reason, I import javassist to

Re: Case class in java

2014-07-03 Thread Kevin Jung
I found a web page for hint. http://ardoris.wordpress.com/2014/03/30/how-spark-does-class-loading/ I learned SparkIMain has internal httpserver to publish class object but can't figure out how I use it in java. Any ideas? Thanks, Kevin -- View this message in context:

Re: Case class in java

2014-07-03 Thread Kevin Jung
This will load listed jars when SparkContext is created. In case of REPL, we define and import classes after SparkContext created. According to above mentioned site, Executor install class loader in 'addReplClassLoaderIfNeeded' method using spark.repl.class.uri configuration. Then I will try to

Re: use spark-shell in the source

2014-06-12 Thread Kevin Jung
Thanks for answer. Yes, I tried to launch an interactive REPL in the middle of my application :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/use-spark-shell-in-the-source-tp7453p7539.html Sent from the Apache Spark User List mailing list archive at