How to preserve the order of parquet files?

2018-02-07 Thread Kevin Jung
Hi all,
In spark 2.2.1, when I load parquet files, it shows differently ordered
result of original dataset.
It seems like FileSourceScanExec.createNonBucketedReadRDD method sorts
parquet file splits by their own lengths.
-
val splitFiles = selectedPartitions.flatMap { partition =>
  partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
  (0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else
remaining
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(
  partition.values, file.getPath.toUri.toString, offset, size,
hosts)
  }
} else {
  val hosts = getBlockHosts(blockLocations, 0, file.getLen)
  Seq(PartitionedFile(
partition.values, file.getPath.toUri.toString, 0, file.getLen,
hosts))
}
  }
}*.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)*

So the partitions representing the part-x.parquet files are always
shuffled when I load them.
How can I preserve the order of a original data?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark summit Asia

2015-09-07 Thread Kevin Jung
Is there any plan to hold Spark summit in Asia?
I'm very much looking forward to it.

Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-summit-Asia-tp24598.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to build Spark 1.5, is build broken or can anyone successfully build?

2015-08-30 Thread Kevin Jung
I expect it because the versions are not in the range defined in pom.xml.
You should upgrade your maven version to 3.3.3 and JDK to 1.7.
Spark team already knows this issue so you can get some information on
community board of developers.

Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-build-Spark-1-5-is-build-broken-or-can-anyone-successfully-build-tp24513p24515.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Drop table and Hive warehouse

2015-08-24 Thread Kevin Jung
When I store DataFrame as table with command saveAsTable and then execute 
DROP TABLE in SparkSQL, it doesn't actually delete files in hive warehouse.
The table disappears from a table list but the data files are still alive.
Because of this, I can't saveAsTable with a same name before dropping table.
Is it a normal situation? If it is, I will delete files manually ;)

Kevin



상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

Re: Drop table and Hive warehouse

2015-08-24 Thread Kevin Jung
Thanks, Michael.
I discovered it myself. Finally, it was not a bug from Spark. 
I have two HDFS cluster and Hive uses hive.metastore.warehouse.dir + 
fs.defaultFS(HDFS1) for saving internal tables and also reference a default 
database URI(HDFS2) in DBS table from metastore.
It may not be a problem if URI of default database is same as fs.defaultFS.
Maybe few of people set their default database URI to another HDFS like me.
I copied hive-site.xml into spark conf then Hive and Spark had same metastore 
configuration.
But the result table of saveAsTable has its metadata in HDFS1 and its data in 
HDFS2.
DESCRIBE FORMATTED table_name will show the difference between Location of 
table(HDFS1) and Path in Storage Desc Params(HDFS2) even though table is type 
of MANAGED_TABLE.
That is why DROP TABLE deletes only metadata in HDFS1 and NOT delete data 
files in HDFS2.
So I can not reproduce a table with same location and same name. If I update 
DBS table in metastoredb to set default database URI to HDFS1, it works 
perfectly.


Kevin

--- Original Message ---
Sender : Michael Armbrustmich...@databricks.com
Date : 2015-08-25 00:43 (GMT+09:00)
Title : Re: Drop table and Hive warehouse

Thats not the expected behavior.  What version of Spark?


On Mon, Aug 24, 2015 at 1:32 AM, Kevin Jung itsjb.j...@samsung.com wrote:

When I store DataFrame as table with command saveAsTable and then execute 
DROP TABLE in SparkSQL, it doesn't actually delete files in hive warehouse.
The table disappears from a table list but the data files are still alive.
Because of this, I can't saveAsTable with a same name before dropping table.
Is it a normal situation? If it is, I will delete files manually ;)

Kevin




상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

SaveAsTable changes the order of rows

2015-08-19 Thread Kevin Jung
I have a simple RDD with Key/Value and do

val partitioned = rdd.partitionBy(new HashPartitioner(400))
val row = partitioned.first

I can get a key G2726 from a returned row. This first row is located on a 
partition #0 because G2726.hashCode is 67114000 and 67114000%400 is 0. But 
the order of keys is changed when I save rdd to table by doing saveAsTable. 
After doing this and calling sqlContext.table, a key from a first row is 
G265. Does DataFrame forget a parent's partitioner or Parquet format always 
rearranges the order of original data? In my case, the order is not important 
but some of users may want to keep their keys ordered.


Kevin




상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

Can't find directory after resetting REPL state

2015-08-15 Thread Kevin Jung
Spark shell can't find base directory of class server after running :reset 
command. 
scala :reset 
scala 1 
uncaught exception during compilation: java.lang.AssertiON-ERROR 
java.lang.AssertiON-ERROR: assertion failed: Tried to find '$line33' in 
'/tmp/spark-f47f3917-ac31-4138-bf1a-a8cefd094ac3' but it is not a directory 
~~~impossible to command anymore~~~ 
I figure out reset() method in SparkIMain try to delete virtualDirectory and 
then create again. But virtualDirectory.create() makes a file, not a directory. 
Does anyone face a same problem under spark 1.4.0? 

Kevin
상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

Re: What is the optimal approach to do Secondary Sort in Spark?

2015-08-11 Thread Kevin Jung
You should create key as tuple type. In your case, RDD[((id, timeStamp) , 
value)] is the proper way to do.

Kevin

--- Original Message ---
Sender : swethaswethakasire...@gmail.com
Date : 2015-08-12 09:37 (GMT+09:00)
Title : What is the optimal approach to do Secondary Sort in Spark?

Hi,

What is the optimal approach to do Secondary sort in Spark? I have to first
Sort by an Id in the key and further sort it by timeStamp which is present
in the value.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-optimal-approach-to-do-Secondary-Sort-in-Spark-tp24219.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

GenericRowWithSchema is too heavy

2015-07-27 Thread Kevin Jung
Hi all,

SparkSQL usually creates DataFrame with GenericRowWithSchema(is that
right?). And 'Row' is a super class of GenericRow and GenericRowWithSchema.
The only difference is that GenericRowWithSchema has its schema information
as StructType. But I think one DataFrame has only one schema then each row
should not have to store schema in it. Because StructType is very heavy and
most of RDD has many rows. To test this,
1) create DataFrame and call rdd ( RDD[Row] ) = GenericRowWithSchema
2) dataframe.map( row = Row(row.toSeq)) = GenericRow
3) dataframe.map( row = row.toSeq) = underlying sequence of a row
4) saveAsObjectFile or use org.apache.spark.util.SizeEstimator.estimate
And my result is,
(dataframe with 5columns)
GenericRowWithSchema = 13gb
GenericRow = 8.2gb
Seq = 7gb

Best regards
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GenericRowWithSchema-is-too-heavy-tp24018.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



MapType in spark-sql

2015-01-20 Thread Kevin Jung
Hi all
How can I add MapType and ArrayType to schema when I create StructType
programmatically?
val schema =
  StructType(
schemaString.split( ).map(fieldName = StructField(fieldName,
StringType, true)))
above code from spark document works fine but if I change StringType to
MapType or ArrayType , it isn't compiled. Thanks in advance.

Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MapType-in-spark-sql-tp21274.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to compute RDD[(String, Set[String])] that include large Set

2015-01-19 Thread Kevin Jung
As far as I know, the tasks before calling saveAsText  are transformations so
that they are lazy computed. Then saveAsText action performs all
transformations and your Set[String] grows up at this time. It creates large
collection if you have few keys and this causes OOM easily when your
executor memory and fraction settings are not suitable for computing this.
If you want only collection counts by keys , you can use countByKey() or
map() RDD[(String, Set[String])] to RDD[(String,Long)] after creating hoge
RDD to make reduceByKey collect only counts of keys.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-compute-RDD-String-Set-String-that-include-large-Set-tp21248p21251.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Manually trigger RDD map function without action

2015-01-12 Thread Kevin Jung
Hi all
Is there efficient way to trigger RDD transformations? I'm now using count
action to achieve this.

Best regards
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Manually trigger RDD map function without action

2015-01-12 Thread Kevin Jung
Cody said If you don't care about the value that your map produced (because
you're not already collecting or saving it), then is foreach more
appropriate to what you're doing? but I can not see it from this thread.
Anyway, I performed small benchmark to test what function is the most
efficient way. And a winner is foreach(a = a) according to everyone's
expectations. Collect can cause OOM from driver and count is very slower
than the others. Thanks all.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Manually-trigger-RDD-map-function-without-action-tp21094p21110.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Shuffle write increases in spark 1.2

2014-12-29 Thread Kevin Jung
Hi all,
The size of shuffle write showing in spark web UI is mush different when I
execute same spark job on same input data(100GB) in both spark 1.1 and spark
1.2.
At the same sortBy stage, the size of shuffle write is 39.7GB in spark 1.1
but 91.0GB in spark 1.2.
I set spark.shuffle.manager option to hash because it's default value is
changed but spark 1.2 writes larger file than spark 1.1.
Can anyone tell me why this happened?

Thanks
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Partitioner in sortBy

2014-12-10 Thread Kevin Jung
Hi,
I'm wondering if I change RangePartitioner in sortBy to another partitioner
like HashPartitioner.
The first thing that comes into my head is that it can not be replaceable
due to RangePartitioner is a part of the sort algorithm.
If we call mapPartitions on key based partition after sorting, we need to
repartition or coalece the dataset because it is rangepartitioned.
In this case, we can not avoid shuffle dataset twice during sorting and
repartitioning.
It makes performance issues in large dataset.

Thanks,
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Partitioner-in-sortBy-tp20614.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark code style

2014-11-21 Thread Kevin Jung
Hi all.
Here are two code snippets.
And they will produce the same result.

1.
rdd.map( function )

2.
rdd.map( function1 ).map( function2 ).map( function3 )

What are the pros and cons of these two methods?

Regards
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-code-style-tp19463.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: default parallelism bug?

2014-10-20 Thread Kevin Jung
I use Spark 1.1.0 and set these options to spark-defaults.conf
spark.scheduler.mode FAIR
spark.cores.max 48
spark.default.parallelism 72

Thanks,
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/default-parallelism-bug-tp16787p16894.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



default parallelism bug?

2014-10-19 Thread Kevin Jung
Hi,
I usually use file on hdfs to make PairRDD and analyze it by using
combineByKey,reduceByKey, etc.
But sometimes it hangs when I set spark.default.parallelism configuration,
though the size of file is small.
If I remove this configuration, all works fine.
Does anyone tell me why this occur?

Regards,
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/default-parallelism-bug-tp16787.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Stucked job work well after rdd.count or rdd.collect

2014-10-05 Thread Kevin Jung
Hi, all.
I'm in an unusual situation.
The code,

...
1: val cell = dataSet.flatMap(parse(_)).cache
2: val distinctCell = cell.keyBy(_._1).reduceByKey(removeDuplication(_,
_)).mapValues(_._3).cache
3: val groupedCellByLine =
distinctCell.map(cellToIterableColumn).groupByKey.cache
4: val result = (1 to groupedCellByLine.map(_._2.size).max).toArray
...

get stuck when the line 4 is executed.
But if I add 'cell.collect' or 'cell.count' between line 3 and line 4, it
works fine.
I don't know why it happens.
Does anyone have experience like this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stucked-job-work-well-after-rdd-count-or-rdd-collect-tp15776.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to clear broadcast variable from driver memory?

2014-09-03 Thread Kevin Jung
Hi,
I tried Broadcast.unpersist() on Spark 1.0.1 but MemoryStore(driver memory)
still allocated it.

//LOGS
//Block broadcast_0 stored as values to memory (estimated size 380.1 MB,
free 5.7 GB)
The free size of memory was same after calling unpersist.
Can I clear this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-clear-broadcast-variable-from-driver-memory-tp13353.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: zip equal-length but unequally-partition

2014-09-02 Thread Kevin Jung
I just created it.
Here's ticket.
https://issues.apache.org/jira/browse/SPARK-3364

Thanks,
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/zip-equal-length-but-unequally-partition-tp13246p13330.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



zip equal-length but unequally-partition

2014-09-01 Thread Kevin Jung
http://www.adamcrume.com/blog/archive/2014/02/19/fixing-sparks-rdd-zip
http://www.adamcrume.com/blog/archive/2014/02/19/fixing-sparks-rdd-zip  

Please check this url .
I got same problem in v1.0.1
In some cases, RDD losts several elements after zip so that a total count of
ZippedRDD is less than source RDD.
will 1.1 version of Spark fix it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/zip-equal-length-but-unequally-partition-tp13246.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how can I get the number of cores

2014-08-29 Thread Kevin Jung
Hi all
Spark web ui gives me the information about total cores and used cores.
I want to get this information programmatically.
How can I do this?

Thanks
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-can-I-get-the-number-of-cores-tp13111.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



iterator cause NotSerializableException

2014-08-22 Thread Kevin Jung
Hi
The following code gives me 'Task not serializable:
java.io.NotSerializableException: scala.collection.mutable.ArrayOps$ofInt'

var x = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3)
var iter = Array(5).toIterator
var value = 5
var value2 = iter.next

x.map( q = q*value).collect //Line 1, it works.

x.map( q= q*value2).collect //Line 2, error

'value' and 'value2' look like exactly same, but why does this happen?
The iterator from RDD.toLocalIterator cause this too.
I tested it in spark-shell on Spark 1.0.2.

Thanks
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/iterator-cause-NotSerializableException-tp12638.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Transform RDD[List]

2014-08-12 Thread Kevin Jung
Thanks for your answer. 
Yes, I want to transpose data.
At this point, I have one more question.
I tested it with
RDD1
List(1, 2, 3, 4, 5)
List(6, 7, 8, 9, 10)
List(11, 12, 13, 14, 15)
List(16, 17, 18, 19, 20)

And the result is... 
ArrayBuffer(11, 1, 16, 6)
ArrayBuffer(2, 12, 7, 17)
ArrayBuffer(3, 13, 18, 8)
ArrayBuffer(9, 19, 4, 14)
ArrayBuffer(15, 20, 10, 5)

It collects well but the order is shuffled.
Can I maintain the order?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Transform-RDD-List-tp11948p11974.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Transform RDD[List]

2014-08-11 Thread Kevin Jung
Hi
It may be simple question, but I can not figure out the most efficient way.
There is a RDD containing list.

RDD
(
 List(1,2,3,4,5)
 List(6,7,8,9,10)
)

I want to transform this to

RDD
(
List(1,6)
List(2,7)
List(3,8)
List(4,9)
List(5,10)
)

And I want to achieve this without using collect method because realworld
RDD can have a lot of elements then it may cause out of memory.
Any ideas will be welcome.

Best regards
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Transform-RDD-List-tp11948.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Transform RDD[List]

2014-08-11 Thread Kevin Jung
Hi ssimanta.
The first line creates RDD[Int], not RDD[List[Int]].
In case of List , I can not zip all list elements in RDD like a.zip(b) and I
can not use only tuple2 because realworld RDD has more List elements in
source RDD.
So I guess the expected result depends on the count of original Lists.
This problem is related to pivot table.

Thanks
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Transform-RDD-List-tp11948p11957.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL can not use SchemaRDD from Hive

2014-07-28 Thread Kevin Jung
Hi
I got a error message while using Hive and SparkSQL.
This is code snippet I used.

(in spark-shell , 1.0.0)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val hive = new org.apache.spark.sql.hive.HiveContext(sc)
var sample = hive.hql(select * from sample10) // This creates SchemaRDD. I
have table 'sample10' in hive.
var countHive = sample.count() // It works
sqlContext.registerRDDAsTable(sample,temp)
sqlContext.sql(select * from temp).count() // It gives me a error message
java.lang.RuntimeException: Table Not Found: sample10

I don't know why this happen. Does SparkSQL conflict with Hive?

Thanks,
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-can-not-use-SchemaRDD-from-Hive-tp10841.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$

2014-07-20 Thread Kevin Jung
Hi, Victor

I got the same issue and I posted it.
In my case, it only happens when I query some spark-sql on spark 1.0.1 but
for spark 1.0.0, it works properly.
Have you run the same job on spark 1.0.0 ? 

Sincerely,
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10274.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark 1.0.1 akka connection refused

2014-07-15 Thread Kevin Jung
Hi,
I recently upgrade my spark 1.0.0 cluster to 1.0.1.
But it gives me ERROR remote.EndpointWriter: AssociationError when I run
simple SparkSQL job in spark-shell.

here is code,
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
case class Person(name:String, Age:Int, Gender:String, Birth:String)
val peopleRDD = sc.textFile(/sample/sample.csv).map(_.split(,)).map(p =
Person(p(0).toString, p(1).toInt, p(2).toString, p(3).toString))
peopleRDD.collect().foreach(println)

and here is error message on worker node.
14/07/16 10:58:04 ERROR remote.EndpointWriter: AssociationError
[akka.tcp://sparkwor...@my.test6.com:38030] -
[akka.tcp://sparkexecu...@my.test6.com:35534]: Error [Association failed
with [akka.tcp://sparkexecu...@my.tes$
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkexecu...@my.test6.com:35534]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection
refused: my.test6.com/

and here is error message in shell 
14/07/16 11:33:15 WARN scheduler.TaskSetManager: Loss was due to
java.lang.NoClassDefFoundError
java.lang.NoClassDefFoundError: Could not initialize class $line10.$read$
at
$line14.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19)
at
$line14.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)

I tested it on spark 1.0.0(same machine) and it was fine.
It seems like Worker cannot find Executor akka endpoint.
Do you have any ideas?

Best regards
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-akka-connection-refused-tp9864.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark 1.0.1 akka connection refused

2014-07-15 Thread Kevin Jung
UPDATES:
It happens only when I use 'case class' and map RDD to this class in
spark-shell.
The other RDD transform, SchemaRDD with parquet file and any SparkSQL
operation work fine.
Is there some changes related to case class operation between 1.0.0 and
1.0.1?

Best regards 
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-akka-connection-refused-tp9864p9875.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Case class in java

2014-07-03 Thread Kevin Jung
Hi,
I'm trying to convert scala spark job into java.
In case of scala, I typically use 'case class' to apply schema to RDD.
It can be converted into POJO class in java, but what I really want to do is
dynamically creating POJO classes like scala REPL do.
For this reason, I import javassist to create POJO class in runtime easily.
But the problem is Worker nodes can't find this class.
The error message is..
 host workernode2.com: java.lang.ClassNotFoundException: GeneratedClass_no1
 java.net.URLClassLoader$1.run(URLClassLoader.java:366)
java.net.URLClassLoader$1.run(URLClassLoader.java:355)
java.security.AccessController.doPrivileged(Native Method)
java.net.URLClassLoader.findClass(URLClassLoader.java:354)
java.lang.ClassLoader.loadClass(ClassLoader.java:423)
java.lang.ClassLoader.loadClass(ClassLoader.java:356)
java.lang.Class.forName0(Native Method)
java.lang.Class.forName(Class.java:266)
Generated class's classloader is
'Thread.currentThread().getContextClassLoader()'.
I expect it can be visible for Driver-node but Worker node's executor can
not see it.
Are changing classloader for loading Generated class and broadcasting
Generated class by spark context effective?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Case-class-in-java-tp8720.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Case class in java

2014-07-03 Thread Kevin Jung
I found a web page for hint.
http://ardoris.wordpress.com/2014/03/30/how-spark-does-class-loading/
I learned SparkIMain has internal httpserver to publish class object but
can't figure out how I use it in java.
Any ideas?

Thanks,
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Case-class-in-java-tp8720p8724.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Case class in java

2014-07-03 Thread Kevin Jung
This will load listed jars when SparkContext is created.
In case of REPL, we define and import classes after SparkContext created.
According to above mentioned site, Executor install class loader in
'addReplClassLoaderIfNeeded' method using spark.repl.class.uri
configuration.
Then I will try to make class server distributing *dynamically created
classes* in my driver application to Executors as spark REPL.

Thanks,
Kevin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Case-class-in-java-tp8720p8765.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: use spark-shell in the source

2014-06-12 Thread Kevin Jung
Thanks for answer.
Yes, I tried to launch an interactive REPL in the middle of my application
:)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/use-spark-shell-in-the-source-tp7453p7539.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.