Re: How to reflect dynamic registration udf?

2016-12-16 Thread Cheng Lian
Could you please provide more context about what you are trying to do here?

On Thu, Dec 15, 2016 at 6:27 PM 李斌松  wrote:

> How to reflect dynamic registration udf?
>
> java.lang.UnsupportedOperationException: Schema for type _$13 is not
> supported
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
> at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:145)
> at
> com.alibaba.spark.odps.driver.util.Utils$$anon$1.processMatch(Utils.scala:115)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec$1.lookForMatches(ScanSpec.java:759)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:446)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:368)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:59)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> final class sparkFunc(val name: String) extends StaticAnnotation{}
>
> def registerFunc(hiveContext: HiveContext): Unit = {
> info("register udf function")
>
> val ru = scala.reflect.runtime.universe
> val classLoaderMirror = ru.runtimeMirror(getClass.getClassLoader)
>
> new FastClasspathScanner("com.alibaba.spark.odps.driver.functions")
> .matchAllClasses(new ClassMatchProcessor() {
> override def processMatch(aClass: Class[_]): Unit = {
> val classMirror = classLoaderMirror.classSymbol(aClass)
> val annotation = classMirror.annotations.find(_.tpe =:= 
> ru.typeOf[sparkFunc]).getOrElse(null)
>
> try {
> if (annotation != null) {
> var funcName = 
> StringUtils.substringBetween(annotation.toString, "\"", "\"")
>
> if (chekClazz(aClass, classOf[Function1[_, _]])) {
> val func: Function1[_, _] = 
> createInstance[Function1[_, _]](aClass).get
> hiveContext.udf.register(funcName, func)
> } else if (chekClazz(aClass, classOf[Function2[_, _, 
> _]])) {
> val func: Function2[_, _, _] = 
> createInstance[Function2[_, _, _]](aClass).get
> hiveContext.udf.register(funcName, func)
> } else if (chekClazz(aClass, classOf[Function3[_, _, 
> _, _]])) {
> val func: Function3[_, _, _, _] = 
> createInstance[Function3[_, _, _, _]](aClass).get
> hiveContext.udf.register(funcName, func)
> } else {
> throw new RuntimeException("not support function")
> }
>
> info("== register function: {}", funcName)
> }
> } catch {
> case e: Exception => error(e.getMessage, e)
> }
> }
> }).scan()
> }
>
> private def chekClazz(sClass: Class[_], pClass: Class[_]): Boolean = {
> try {
> sClass.asSubclass(pClass)
> true
> } catch {
> case e: Exception => false
> }
> }
>
> private def createInstance[T: ClassTag](clazz: Class[_]): Try[T] = {
> Try {
> val constructor = clazz.getDeclaredConstructor()
> constructor.setAccessible(true)
> val obj = constructor.newInstance()
> val t = implicitly[ClassTag[T]].runtimeClass
> if (t.isInstance(obj)) {
> obj.asInstanceOf[T]
> } else throw new ClassCastException(clazz.getName + " is not a 
> subtype of " + t)
> } recover {
> case i: InvocationTargetException if i.getTargetException ne null ⇒ 
> throw i.getTargetException
> }
> }
>
>


Re: Writing to Parquet Job turns to wait mode after even completion of job

2016-10-24 Thread Cheng Lian



On 10/22/16 6:18 AM, Steve Loughran wrote:

...
On Sat, Oct 22, 2016 at 3:41 AM, Cheng Lian <lian.cs@gmail.com 
<mailto:lian.cs@gmail.com>> wrote:


What version of Spark are you using and how many output files
does the job writes out?

By default, Spark versions before 1.6 (not including) writes
Parquet summary files when committing the job. This process reads
footers from all Parquet files in the destination directory and
merges them together. This can be particularly bad if you are
appending a small amount of data to a large existing Parquet dataset.

If that's the case, you may disable Parquet summary files by
setting Hadoop configuration " parquet.enable.summary-metadata"
to false.




Now I'm a bit mixed up. Should that be 
spark.sql.parquet.enable.summary-metadata =false?
No, "parquet.enable.summary-metadata" is a Hadoop configuration option 
introduced by Parquet. In Spark 2.0, you can simply set it using 
spark.conf.set(), Spark will propagate it properly.



We've disabled it by default since 1.6.0

Cheng


On 10/21/16 1:47 PM, Chetan Khatri wrote:

Hello Spark Users,

I am writing around 10 GB of Processed Data to Parquet where
having 1 TB of HDD and 102 GB of RAM, 16 vCore machine on Google
Cloud.

Every time, i write to parquet. it shows on Spark UI that stages
succeeded but on spark shell it hold context on wait mode for
almost 10 mins. then it clears broadcast, accumulator shared
variables.

Can we sped up this thing ?

Thanks.

-- 
Yours Aye,

Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are
confidential and are intended solely for addressee. The
information may also be legally privileged. This transmission is
sent in trust, for the sole purpose of delivery to the intended
recipient. If you have received this transmission in error, any
use, reproduction or dissemination of this transmission is
strictly prohibited. If you are not the intended recipient,
please immediately notify the sender by reply e-mail or phone
and delete this message and its attachments, if any.​​





--
Yours Aye,
Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are 
confidential and are intended solely for addressee. The information 
may also be legally privileged. This transmission is sent in trust, 
for the sole purpose of delivery to the intended recipient. If you 
have received this transmission in error, any use, reproduction or 
dissemination of this transmission is strictly prohibited. If you are 
not the intended recipient, please immediately notify the sender by 
reply e-mail or phone and delete this message and its attachments, if 
any.​​






Re: [Spark 2.0.0] error when unioning to an empty dataset

2016-10-24 Thread Cheng Lian



On 10/22/16 1:42 PM, Efe Selcuk wrote:
Ah, looks similar. Next opportunity I get, I'm going to do a 
printSchema on the two datasets and see if they don't match up.


I assume that unioning the underlying RDDs doesn't run into this 
problem because of less type checking or something along those lines?

Exactly.


On Fri, Oct 21, 2016 at 3:39 PM Cheng Lian <lian.cs@gmail.com 
<mailto:lian.cs@gmail.com>> wrote:


Efe - You probably hit this bug:
https://issues.apache.org/jira/browse/SPARK-18058


On 10/21/16 2:03 AM, Agraj Mangal wrote:

I have seen this error sometimes when the elements in the schema
have different nullabilities. Could you print the schema for
data and for someCode.thatReturnsADataset() and see if there is
any difference between the two ?

On Fri, Oct 21, 2016 at 9:14 AM, Efe Selcuk <efema...@gmail.com
<mailto:efema...@gmail.com>> wrote:

Thanks for the response. What do you mean by "semantically"
the same? They're both Datasets of the same type, which is a
case class, so I would expect compile-time integrity of the
data. Is there a situation where this wouldn't be the case?

Interestingly enough, if I instead create an empty rdd with
sparkContext.emptyRDD of the same case class type, it works!

So something like:
var data = spark.sparkContext.emptyRDD[SomeData]

// loop
data = data.union(someCode.thatReturnsADataset().rdd)
// end loop

data.toDS //so I can union it to the actual Dataset I have
elsewhere

On Thu, Oct 20, 2016 at 8:34 PM Agraj Mangal
<agraj@gmail.com <mailto:agraj@gmail.com>> wrote:

I believe this normally comes when Spark is unable to
perform union due to "difference" in schema of the
operands. Can you check if the schema of both the
datasets are semantically same ?

On Tue, Oct 18, 2016 at 9:06 AM, Efe Selcuk
<efema...@gmail.com <mailto:efema...@gmail.com>> wrote:

Bump!

On Thu, Oct 13, 2016 at 8:25 PM Efe Selcuk
<efema...@gmail.com <mailto:efema...@gmail.com>> wrote:

I have a use case where I want to build a dataset
based off of conditionally available data. I
thought I'd do something like this:

case class SomeData( ... ) // parameters are
basic encodable types like strings and BigDecimals

var data = spark.emptyDataset[SomeData]

// loop, determining what data to ingest and
process into datasets
data = data.union(someCode.thatReturnsADataset)
// end loop

However I get a runtime exception:

Exception in thread "main"
org.apache.spark.sql.AnalysisException:
unresolved operator 'Union;
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at

org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at

org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at

org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at

org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at
org.apache.spark.sql.Dataset.(Dataset.scala:161)
at
org.apache.spark.sql.Dataset.(Dataset.scala:167)
at
org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at

org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at
org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Granted, I'm new at Spark so this might be an
anti-pattern, so I'm open to suggestions. However
it doesn't seem li

Re: RDD groupBy() then random sort each group ?

2016-10-21 Thread Cheng Lian
I think it would much easier to use DataFrame API to do this by doing 
local sort using randn() as key. For example, in Spark 2.0:


val df = spark.range(100)
val shuffled = df.repartition($"id" % 10).sortWithinPartitions(randn(42))

Replace df with a DataFrame wrapping your RDD, and $"id" % 10 with the 
key to group by, then you can get the RDD from shuffled and do the 
following operations you want.


Cheng


On 10/20/16 10:53 AM, Yang wrote:
in my application, I group by same training samples by their 
model_id's  (the input table contains training samples for 100k 
different models), then each group ends up having about 1 million 
training samples,


then I feed that group of samples to a little Logistic Regression 
solver (SGD), but SGD requires the input data to be shuffled randomly 
(so that positive and negative samples are evenly distributed), so now 
I do something like


my_input_rdd.groupBy(x=>x.model_id).map(x=>
val (model_id, group_of_rows) = x

 (model_id, group_of_rows.toSeq().shuffle() )

).map(x=> (x._1, train_sgd(x._2))


the issue is that on the 3rd row above, I had to explicitly call 
toSeq() on the group_of_rows in order to shuffle, which is an Iterable 
and not Seq. now I have to load the entire 1 million rows into memory, 
and in practice I've seen my tasks OOM and GC time goes crazy (about 
50% of total run time). I suspect this toSeq() is the reason, since 
doing a simple count() on the groupBy() result works fine.


I am planning to shuffle the my_input_rdd first, and then groupBy(), 
and not do the toSeq().shuffle(). intuitively the input rdd is already 
shuffled, so UNLESS groupBy() tries to do some sorting, the rows in 
the group SHOULD remain shuffled  but overall this remains rather 
flimsy.


any ideas to do this more reliably?

thanks!




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark 2.0.0] error when unioning to an empty dataset

2016-10-21 Thread Cheng Lian
Efe - You probably hit this bug: 
https://issues.apache.org/jira/browse/SPARK-18058



On 10/21/16 2:03 AM, Agraj Mangal wrote:
I have seen this error sometimes when the elements in the schema have 
different nullabilities. Could you print the schema for data and for 
someCode.thatReturnsADataset() and see if there is any difference 
between the two ?


On Fri, Oct 21, 2016 at 9:14 AM, Efe Selcuk > wrote:


Thanks for the response. What do you mean by "semantically" the
same? They're both Datasets of the same type, which is a case
class, so I would expect compile-time integrity of the data. Is
there a situation where this wouldn't be the case?

Interestingly enough, if I instead create an empty rdd with
sparkContext.emptyRDD of the same case class type, it works!

So something like:
var data = spark.sparkContext.emptyRDD[SomeData]

// loop
data = data.union(someCode.thatReturnsADataset().rdd)
// end loop

data.toDS //so I can union it to the actual Dataset I have elsewhere

On Thu, Oct 20, 2016 at 8:34 PM Agraj Mangal > wrote:

I believe this normally comes when Spark is unable to perform
union due to "difference" in schema of the operands. Can you
check if the schema of both the datasets are semantically same ?

On Tue, Oct 18, 2016 at 9:06 AM, Efe Selcuk
> wrote:

Bump!

On Thu, Oct 13, 2016 at 8:25 PM Efe Selcuk
> wrote:

I have a use case where I want to build a dataset
based off of conditionally available data. I thought
I'd do something like this:

case class SomeData( ... ) // parameters are basic
encodable types like strings and BigDecimals

var data = spark.emptyDataset[SomeData]

// loop, determining what data to ingest and process
into datasets
  data = data.union(someCode.thatReturnsADataset)
// end loop

However I get a runtime exception:

Exception in thread "main"
org.apache.spark.sql.AnalysisException: unresolved
operator 'Union;
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at

org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at

org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at

org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at

org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at
org.apache.spark.sql.Dataset.(Dataset.scala:161)
at
org.apache.spark.sql.Dataset.(Dataset.scala:167)
at
org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at
org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at
org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Granted, I'm new at Spark so this might be an
anti-pattern, so I'm open to suggestions. However it
doesn't seem like I'm doing anything incorrect here,
the types are correct. Searching for this error online
returns results seemingly about working in dataframes
and having mismatching schemas or a different order of
fields, and it seems like bugfixes have gone into
place for those cases.

Thanks in advance.
Efe




-- 
Thanks & Regards,

Agraj Mangal




--
Thanks & Regards,
Agraj Mangal




Re: How to iterate the element of an array in DataFrame?

2016-10-21 Thread Cheng Lian
You may either use SQL function "array" and "named_struct" or define a 
case class with expected field names.


Cheng


On 10/21/16 2:45 AM, 颜发才(Yan Facai) wrote:

My expectation is:
root
|-- tag: vector

namely, I want to extract from:
[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
to:
Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))

I believe it needs two step:
1. val tag2vec = {tag: Array[Structure] => Vector}
2. mblog_tags.withColumn("vec", tag2vec(col("tag"))

But, I have no idea of how to describe the Array[Structure] in the 
DataFrame.






On Fri, Oct 21, 2016 at 4:51 PM, lk_spark > wrote:


how about change Schema from
root
 |-- category.firstCategory: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- category: string (nullable = true)
 |||-- weight: string (nullable = true)
to:
root
 |-- category: string (nullable = true)
 |-- weight: string (nullable = true)
2016-10-21

lk_spark


*发件人:*颜发才(Yan Facai) >
*发送时间:*2016-10-21 15:35
*主题:*Re: How to iterate the element of an array in DataFrame?
*收件人:*"user.spark">
*抄送:*
I don't know how to construct
`array>`.
Could anyone help me?

I try to get the array by :
scala> mblog_tags.map(_.getSeq[(String, String)](0))

while the result is:
res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] =
[value: array>]


How to express `struct` ?



On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai)
> wrote:

Hi, I want to extract the attribute `weight` of an array,
and combine them to construct a sparse vector.

### My data is like this:

scala> mblog_tags.printSchema
root
 |-- category.firstCategory: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- category: string (nullable = true)
 |||-- weight: string (nullable = true)


scala> mblog_tags.show(false)
+--+
|category.firstCategory |
+--+
|[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
|[[tagCategory_029, 0.9]] |
|[[tagCategory_029, 0.8]]|
+--+


### And expected:
Vectors.sparse(100, Array(60, 29), Array(0.8, 0.7))
Vectors.sparse(100, Array(29), Array(0.9))
Vectors.sparse(100, Array(29), Array(0.8))

How to iterate an array in DataFrame?
Thanks.









Re: Dataframe schema...

2016-10-21 Thread Cheng Lian
Yea, confirmed. While analyzing unions, we treat StructTypes with 
different field nullabilities as incompatible types and throws this error.


Opened https://issues.apache.org/jira/browse/SPARK-18058 to track this 
issue. Thanks for reporting!


Cheng


On 10/21/16 3:15 PM, Cheng Lian wrote:


Hi Muthu,

What is the version of Spark are you using? This seems to be a bug in 
the analysis phase.


Cheng


On 10/21/16 12:50 PM, Muthu Jayakumar wrote:

Sorry for the late response. Here is what I am seeing...


Schema from parquet file.
d1.printSchema()
root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = true)

d2.printSchema() //Data created using dataframe and/or processed before writing to 
parquet file.

root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = false)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = false)

d1.union(d2).printSchema()
Exception in thread "main" org.apache.spark.sql.AnalysisException: 
unresolved operator 'Union;
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)

at org.apache.spark.sql.Dataset.(Dataset.scala:161)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Please advice,
Muthu

On Thu, Oct 20, 2016 at 1:46 AM, Michael Armbrust 
<mich...@databricks.com <mailto:mich...@databricks.com>> wrote:


What is the issue you see when unioning?

On Wed, Oct 19, 2016 at 6:39 PM, Muthu Jayakumar
<bablo...@gmail.com <mailto:bablo...@gmail.com>> wrote:

Hello Michael,

Thank you for looking into this query. In my case there seem
to be an issue when I union a parquet file read from disk
versus another dataframe that I construct in-memory. The only
difference I see is the containsNull = true. In fact, I do
not see any errors with union on the simple schema of "col1
thru col4" above. But the problem seem to exist only on that
"some_histogram" column which contains the mixed containsNull
= true/false.
Let me know if this helps.

Thanks,
Muthu



On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust
<mich...@databricks.com <mailto:mich...@databricks.com>> wrote:

Nullable is just a hint to the optimizer that its
impossible for there to be a null value in this column,
so that it can avoid generating code for null-checks. 
When in doubt, we set nullable=true since it is always

safer to check.

Why in particular are you trying to change the
nullability of the column?

On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar
<bablo...@gmail.com <mailto:bablo...@gmail.com>> wrote:

Hello there,

I am trying to understand how and when does DataFrame
(or Dataset) sets nullable = true vs false on a schema.

Here is my observation from a sample code I tried...


scala> spark.createDataset(Seq((1, "a", 2.0d), (2,
"b", 2.0d), (3, "c", 2.0d))).toDF("col1", "col2",
"col3").withColumn("col4", lit("bla")).printSchema()
root
 |-- col1: integer (nullable = false)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = false)
 |-- col4: string (nullable = false)


scala> spark.createDataset(Seq((1, "a", 2.0d), (2,
"b", 2.0d), (3, "c", 2

Re: Dataframe schema...

2016-10-21 Thread Cheng Lian

Hi Muthu,

What is the version of Spark are you using? This seems to be a bug in 
the analysis phase.


Cheng


On 10/21/16 12:50 PM, Muthu Jayakumar wrote:

Sorry for the late response. Here is what I am seeing...


Schema from parquet file.
d1.printSchema()
root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = true)

d2.printSchema() //Data created using dataframe and/or processed before writing to 
parquet file.

root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = false)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = false)

d1.union(d2).printSchema()
Exception in thread "main" org.apache.spark.sql.AnalysisException: 
unresolved operator 'Union;
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)

at org.apache.spark.sql.Dataset.(Dataset.scala:161)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Please advice,
Muthu

On Thu, Oct 20, 2016 at 1:46 AM, Michael Armbrust 
> wrote:


What is the issue you see when unioning?

On Wed, Oct 19, 2016 at 6:39 PM, Muthu Jayakumar
> wrote:

Hello Michael,

Thank you for looking into this query. In my case there seem
to be an issue when I union a parquet file read from disk
versus another dataframe that I construct in-memory. The only
difference I see is the containsNull = true. In fact, I do not
see any errors with union on the simple schema of "col1 thru
col4" above. But the problem seem to exist only on that
"some_histogram" column which contains the mixed containsNull
= true/false.
Let me know if this helps.

Thanks,
Muthu



On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust
> wrote:

Nullable is just a hint to the optimizer that its
impossible for there to be a null value in this column, so
that it can avoid generating code for null-checks. When in
doubt, we set nullable=true since it is always safer to
check.

Why in particular are you trying to change the nullability
of the column?

On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar
> wrote:

Hello there,

I am trying to understand how and when does DataFrame
(or Dataset) sets nullable = true vs false on a schema.

Here is my observation from a sample code I tried...


scala> spark.createDataset(Seq((1, "a", 2.0d), (2,
"b", 2.0d), (3, "c", 2.0d))).toDF("col1", "col2",
"col3").withColumn("col4", lit("bla")).printSchema()
root
 |-- col1: integer (nullable = false)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = false)
 |-- col4: string (nullable = false)


scala> spark.createDataset(Seq((1, "a", 2.0d), (2,
"b", 2.0d), (3, "c", 2.0d))).toDF("col1", "col2",
"col3").withColumn("col4",
lit("bla")).write.parquet("/tmp/sample.parquet")

scala>
spark.read.parquet("/tmp/sample.parquet").printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = true)
 |-- col4: string (nullable = true)


The place where 

Re: Writing to Parquet Job turns to wait mode after even completion of job

2016-10-21 Thread Cheng Lian
What version of Spark are you using and how many output files does the 
job writes out?


By default, Spark versions before 1.6 (not including) writes Parquet 
summary files when committing the job. This process reads footers from 
all Parquet files in the destination directory and merges them together. 
This can be particularly bad if you are appending a small amount of data 
to a large existing Parquet dataset.


If that's the case, you may disable Parquet summary files by setting 
Hadoop configuration " parquet.enable.summary-metadata" to false.


We've disabled it by default since 1.6.0

Cheng


On 10/21/16 1:47 PM, Chetan Khatri wrote:

Hello Spark Users,

I am writing around 10 GB of Processed Data to Parquet where having 1 
TB of HDD and 102 GB of RAM, 16 vCore machine on Google Cloud.


Every time, i write to parquet. it shows on Spark UI that stages 
succeeded but on spark shell it hold context on wait mode for almost 
10 mins. then it clears broadcast, accumulator shared variables.


Can we sped up this thing ?

Thanks.

--
Yours Aye,
Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are 
confidential and are intended solely for addressee. The information 
may also be legally privileged. This transmission is sent in trust, 
for the sole purpose of delivery to the intended recipient. If you 
have received this transmission in error, any use, reproduction or 
dissemination of this transmission is strictly prohibited. If you are 
not the intended recipient, please immediately notify the sender by 
reply e-mail or phone and delete this message and its attachments, if 
any.​​




Re: Where condition on columns of Arrays does no longer work in spark 2

2016-10-21 Thread Cheng Lian

Thanks for reporting! It's a bug, just filed a ticket to track it:

https://issues.apache.org/jira/browse/SPARK-18053

Cheng


On 10/20/16 1:54 AM, filthysocks wrote:
I have a Column in a DataFrame that contains Arrays and I wanna filter 
for equality. It does work fine in spark 1.6 but not in 2.0 In spark 
1.6.2:

import org.apache.spark.sql.SQLContext

case class DataTest(lists: Seq[Int])

val sql = new SQLContext(sc)
val data = sql.createDataFrame(sc.parallelize(Seq(
DataTest(Seq(1)),
DataTest(Seq(4,5,6))
   )))
data.registerTempTable("uiae")
sql.sql(s"SELECT lists FROM uiae WHERE 
lists=Array(1)").collect().foreach(println)
returns:[WrappedArray(1)]
In spark 2.0.0:
import spark.implicits._

case class DataTest(lists: Seq[Int])
val data = Seq(DataTest(Seq(1)),DataTest(Seq(4,5,6))).toDS()

data.createOrReplaceTempView("uiae")
spark.sql(s"SELECT lists FROM uiae WHERE 
lists=Array(1)").collect().foreach(println)
returns: nothing

Is that a bug? Or is it just done differently in spark 2?

View this message in context: Where condition on columns of Arrays 
does no longer work in spark 2 

Sent from the Apache Spark User List mailing list archive 
 at Nabble.com.




Re: Consuming parquet files built with version 1.8.1

2016-10-17 Thread Cheng Lian

Hi Dinesh,

Thanks for reporting. This is kinda weird and I can't reproduce this. 
Were doing the experiments using a clean compiled Spark master branch? 
And I don't think you have to use parquet-mr 1.8.1 to read Parquet files 
generated using parquet-mr 1.8.1 unless you are using something not 
implemented in 1.7.


Cheng


On 9/6/16 12:34 AM, Dinesh Narayanan wrote:

Hello,
I have some parquet files generated with 1.8.1 through an MR job that 
i need to consume. I see that master is built with parquet 1.8.1 but i 
get this error(with master branch)


java.lang.NoSuchMethodError: 
org.apache.parquet.schema.Types$MessageTypeBuilder.addFields([Lorg/apache/parquet/schema/Type;)Lorg/apache/parquet/schema/Types$GroupBuilder;
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport$.clipParquetSchema(ParquetReadSupport.scala:114)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.init(ParquetReadSupport.scala:67)
at 
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:136)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:360)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:339)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown 
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown 
Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Do you think i am missing something here or is this is a potential 
bug? Any workarounds to use parquet files with version PARQUET_2_0


Thanks
Dinesh




Re: Spark-2.0.0 fails reading a parquet dataset generated by Spark-1.6.2

2016-08-12 Thread Cheng Lian

OK, I've merged this PR to master and branch-2.0.


On 8/11/16 8:27 AM, Cheng Lian wrote:
Haven't figured out the exactly way how it failed, but the leading 
underscore in the partition directory name looks suspicious. Could you 
please try this PR to see whether it fixes the issue: 
https://github.com/apache/spark/pull/14585/files


Cheng


On 8/9/16 5:38 PM, immerrr again wrote:

Another follow-up: I have narrowed it down to the first 32 partitions,
but from that point it gets strange.

Here's the error:

In [68]: spark.read.parquet(*subdirs[:32])
...
AnalysisException: u'Unable to infer schema for ParquetFormat at
/path/to/data/_locality_code=AQ,/path/to/data/_locality_code=AI. It
must be specified manually;'


Removing *any* of the subdirs from that set removes the error.

In [69]: for i in range(32): spark.read.parquet(*(subdirs[:i] +
subdirs[i+1:32]))


Here's the punchline: schemas for the first 31 and for the last 31 of
those 32 subdirs are the same:

In [70]: spark.read.parquet(*subdirs[:31]).schema.jsonValue() ==
spark.read.parquet(*subdirs[1:32]).schema.jsonValue()
Out[70]: True

Any idea why that might be happening?

On Tue, Aug 9, 2016 at 12:12 PM, immerrr again <imme...@gmail.com> 
wrote:

Some follow-up information:

- dataset size is ~150G

- the data is partitioned by one of the columns, _locality_code:
$ ls -1
_locality_code=AD
_locality_code=AE
_locality_code=AF
_locality_code=AG
_locality_code=AI
_locality_code=AL
_locality_code=AM
_locality_code=AN

_locality_code=YE
_locality_code=YT
_locality_code=YU
_locality_code=ZA
_locality_code=ZM
_locality_code=ZW
_SUCCESS

- some of the partitions contain only one row, but all partitions are
in place (ie number of directories matches number of distinct
localities
val counts = 
sqlContext.read.parquet("/path-to-data").groupBy("_locality_code").count().orderBy($"count").collect()


scala> counts.slice(counts.length-10, counts.length)
res13: Array[org.apache.spark.sql.Row] = Array([CN,5682255],
[AU,6090561], [ES,6184507], [IT,7093401], [FR,8814435], [CA,10005467],
[UK,15375397], [BR,15829260], [IN,22404143], [US,98585175])

scala> counts.slice(0, 10)
res14: Array[org.apache.spark.sql.Row] = Array([UM,1], [JB,1], [JK,1],
[WP,1], [JT,1], [SX,9], [BL,52], [BQ,70], [BV,115], [MF,115])


On Tue, Aug 9, 2016 at 11:10 AM, immerrr again <imme...@gmail.com> 
wrote:

Hi everyone

I tried upgrading Spark-1.6.2 to Spark-2.0.0 but run into an issue
reading the existing data. Here's how the traceback looks in
spark-shell:

scala> spark.read.parquet("/path/to/data")
org.apache.spark.sql.AnalysisException: Unable to infer schema for
ParquetFormat at /path/to/data. It must be specified manually;
   at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:397)
   at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:397)

   at scala.Option.getOrElse(Option.scala:121)
   at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:396)
   at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
   at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:427) 

   at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:411) 


   ... 48 elided

If I enable DEBUG log with sc.setLogLevel("DEBUG"), here's what I
additionally see in the output:
https://gist.github.com/immerrr/4474021ae70f35b7b9e262251c0abc59. Of
course, that same data is read and processed by spark-1.6.2 correctly.

Any idea what might be wrong here?

Cheers,
immerrr

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org







-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark-2.0.0 fails reading a parquet dataset generated by Spark-1.6.2

2016-08-10 Thread Cheng Lian
Haven't figured out the exactly way how it failed, but the leading 
underscore in the partition directory name looks suspicious. Could you 
please try this PR to see whether it fixes the issue: 
https://github.com/apache/spark/pull/14585/files


Cheng


On 8/9/16 5:38 PM, immerrr again wrote:

Another follow-up: I have narrowed it down to the first 32 partitions,
but from that point it gets strange.

Here's the error:

In [68]: spark.read.parquet(*subdirs[:32])
...
AnalysisException: u'Unable to infer schema for ParquetFormat at
/path/to/data/_locality_code=AQ,/path/to/data/_locality_code=AI. It
must be specified manually;'


Removing *any* of the subdirs from that set removes the error.

In [69]: for i in range(32): spark.read.parquet(*(subdirs[:i] +
subdirs[i+1:32]))


Here's the punchline: schemas for the first 31 and for the last 31 of
those 32 subdirs are the same:

In [70]: spark.read.parquet(*subdirs[:31]).schema.jsonValue() ==
spark.read.parquet(*subdirs[1:32]).schema.jsonValue()
Out[70]: True

Any idea why that might be happening?

On Tue, Aug 9, 2016 at 12:12 PM, immerrr again  wrote:

Some follow-up information:

- dataset size is ~150G

- the data is partitioned by one of the columns, _locality_code:
$ ls -1
_locality_code=AD
_locality_code=AE
_locality_code=AF
_locality_code=AG
_locality_code=AI
_locality_code=AL
_locality_code=AM
_locality_code=AN

_locality_code=YE
_locality_code=YT
_locality_code=YU
_locality_code=ZA
_locality_code=ZM
_locality_code=ZW
_SUCCESS

- some of the partitions contain only one row, but all partitions are
in place (ie number of directories matches number of distinct
localities
val counts = 
sqlContext.read.parquet("/path-to-data").groupBy("_locality_code").count().orderBy($"count").collect()

scala> counts.slice(counts.length-10, counts.length)
res13: Array[org.apache.spark.sql.Row] = Array([CN,5682255],
[AU,6090561], [ES,6184507], [IT,7093401], [FR,8814435], [CA,10005467],
[UK,15375397], [BR,15829260], [IN,22404143], [US,98585175])

scala> counts.slice(0, 10)
res14: Array[org.apache.spark.sql.Row] = Array([UM,1], [JB,1], [JK,1],
[WP,1], [JT,1], [SX,9], [BL,52], [BQ,70], [BV,115], [MF,115])


On Tue, Aug 9, 2016 at 11:10 AM, immerrr again  wrote:

Hi everyone

I tried upgrading Spark-1.6.2 to Spark-2.0.0 but run into an issue
reading the existing data. Here's how the traceback looks in
spark-shell:

scala> spark.read.parquet("/path/to/data")
org.apache.spark.sql.AnalysisException: Unable to infer schema for
ParquetFormat at /path/to/data. It must be specified manually;
   at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:397)
   at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:397)
   at scala.Option.getOrElse(Option.scala:121)
   at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:396)
   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
   at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:427)
   at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:411)
   ... 48 elided

If I enable DEBUG log with sc.setLogLevel("DEBUG"), here's what I
additionally see in the output:
https://gist.github.com/immerrr/4474021ae70f35b7b9e262251c0abc59. Of
course, that same data is read and processed by spark-1.6.2 correctly.

Any idea what might be wrong here?

Cheers,
immerrr

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: 回复: Bug about reading parquet files

2016-07-09 Thread Cheng Lian
According to our offline discussion, the target table consists of 1M+ 
small Parquet files (~12M by average). The OOM occurred at driver side 
while listing input files.


My theory is that the total size of all listed FileStatus objects is too 
large for the driver and caused the OOM.


Suggestions:

1. Merge those small Parquet files to reduce file number. Also, to be 
efficient, typically the size of a Parquet file should be at least 
larger than an HDFS block.

2. Try to increase driver size.

One possible improvement here in Spark is that we probably shouldn't 
list all input files of a partitioned table when the query only touches 
a fraction of all the partitions.


Cheng



On 7/8/16 8:44 PM, Sea wrote:

My spark version is 1.6.1.

== Parsed Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
  +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && 
(appid#5 = 6))

 +- Subquery dwd_native
+- 
Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] 
ParquetRelation: omega.dwd_native


== Analyzed Logical Plan ==
count: bigint
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
  +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && 
(appid#5 = 6))

 +- Subquery dwd_native
+- 
Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] 
ParquetRelation: omega.dwd_native


== Optimized Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate
  +- Project
 +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) 
&& (appid#5 = 6))
+- 
Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] 
ParquetRelation: omega.dwd_native


== Physical Plan ==
TungstenAggregate(key=[], 
functions=[(count(1),mode=Final,isDistinct=false)], output=[count#112L])
+- TungstenAggregate(key=[], 
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#115L])

   +- Limit 1
  +- ConvertToSafe
 +- TungstenAggregate(key=[], functions=[], output=[])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[], output=[])
  +- Scan ParquetRelation: omega.dwd_native[] 
InputPaths: 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=0, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=1, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=2, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=3, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=4, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=5, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=6, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=0, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=1, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=2, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=3, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=4, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=5, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=6, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=0, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=1, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=2, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=3, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=4, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=5, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=6, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/appid=0, 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/



Exception in thread "main" java.lang.OutOfMemoryError: 

Re: Bug about reading parquet files

2016-07-08 Thread Cheng Lian
What's the Spark version? Could you please also attach result of
explain(extended = true)?

On Fri, Jul 8, 2016 at 4:33 PM, Sea <261810...@qq.com> wrote:

> I have a problem reading parquet files.
> sql:
> select count(1) from   omega.dwd_native where year='2016' and month='07'
> and day='05' and hour='12' and appid='6';
> The hive partition is (year,month,day,appid)
>
> only two tasks, and it will list all directories in my table, not only
> /user/omega/events/v4/h/2016/07/07/12/appid=6
> [Stage 1:>  (0 +
> 0) / 2]
>
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/05/31/21/appid=1
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/06/28/20/appid=2
>
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/07/22/21/appid=65537
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/08/14/05/appid=65536
>
>


Re: Hive 1.0.0 not able to read Spark 1.6.1 parquet output files on EMR 4.7.0

2016-06-15 Thread Cheng Lian

Spark 1.6.1 is also using 1.7.0.

Could you please share the schema of your Parquet file as well as the 
exact exception stack trace reported by Hive?



Cheng


On 6/13/16 12:56 AM, mayankshete wrote:

Hello Team ,

I am facing an issue where output files generated by Spark 1.6.1 are not
read by Hive 1.0.0 . It is because Hive 1.0.0 uses older parquet version
than Spark 1.6.1 which is using 1.7.0 parquet .

Is it possible that we can use older parquet version in Spark or newer
parquet version in Hive ?
I have tried adding parquet-hive-bundle : 1.7.0 to Hive but while reading it
throws Failed with exception
java.io.IOException:java.lang.NullPointerException .

Can anyone give us the solution ?

Thanks ,
Mayank



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-1-0-0-not-able-to-read-Spark-1-6-1-parquet-output-files-on-EMR-4-7-0-tp27144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: update mysql in spark

2016-06-15 Thread Cheng Lian
Spark SQL doesn't support update command yet.

On Wed, Jun 15, 2016, 9:08 AM spR  wrote:

> hi,
>
> can we write a update query using sqlcontext?
>
> sqlContext.sql("update act1 set loc = round(loc,4)")
>
> what is wrong in this? I get the following error.
>
> Py4JJavaError: An error occurred while calling o20.sql.
> : java.lang.RuntimeException: [1.1] failure: ``with'' expected but identifier 
> update found
>
> update act1 set loc = round(loc,4)
> ^
>   at scala.sys.package$.error(package.scala:27)
>   at 
> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
>   at 
> org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
>   at 
> org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
>   at 
> org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:113)
>   at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
>   at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>   at 
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>   at 
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>   at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>   at 
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>   at 
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>   at 
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>   at 
> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
>   at 
> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
>   at 
> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
>   at 
> org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43)
>   at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231)
>   at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>   at py4j.Gateway.invoke(Gateway.java:259)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at py4j.GatewayConnection.run(GatewayConnection.java:209)
>   at java.lang.Thread.run(Thread.java:745)
>
>
>
>


Re: feedback on dataset api explode

2016-05-25 Thread Cheng Lian
Agree, since they can be easily replaced by .flatMap (to do explosion) 
and .select (to rename output columns)


Cheng

On 5/25/16 12:30 PM, Reynold Xin wrote:
Based on this discussion I'm thinking we should deprecate the two 
explode functions.


On Wednesday, May 25, 2016, Koert Kuipers > wrote:


wenchen,
that definition of explode seems identical to flatMap, so you dont
need it either?

michael,
i didn't know about the column expression version of explode, that
makes sense. i will experiment with that instead.

On Wed, May 25, 2016 at 3:03 PM, Wenchen Fan
> wrote:

I think we only need this version:  `def explode[B :
Encoder](f: A => TraversableOnce[B]): Dataset[B]`

For untyped one, `df.select(explode($"arrayCol").as("item"))`
should be the best choice.

On Wed, May 25, 2016 at 11:55 AM, Michael Armbrust
> wrote:

These APIs predate Datasets / encoders, so that is why
they are Row instead of objects.  We should probably
rethink that.

Honestly, I usually end up using the column expression
version of explode now that it exists (i.e.
explode($"arrayCol").as("Item")). It would be great to
understand more why you are using these instead.

On Wed, May 25, 2016 at 8:49 AM, Koert Kuipers
> wrote:

we currently have 2 explode definitions in Dataset:

 def explode[A <: Product : TypeTag](input:
Column*)(f: Row => TraversableOnce[A]): DataFrame

 def explode[A, B : TypeTag](inputColumn: String,
outputColumn: String)(f: A => TraversableOnce[B]):
DataFrame

1) the separation of the functions into their own
argument lists is nice, but unfortunately scala's type
inference doesn't handle this well, meaning that the
generic types always have to be explicitly provided. i
assume this was done to allow the "input" to be a
varargs in the first method, and then kept the same in
the second for reasons of symmetry.

2) i am surprised the first definition returns a
DataFrame. this seems to suggest DataFrame usage (so
DataFrame to DataFrame), but there is no way to
specify the output column names, which limits its
usability for DataFrames. i frequently end up using
the first definition for DataFrames anyhow because of
the need to return more than 1 column (and the data
has columns unknown at compile time that i need to
carry along making flatMap on Dataset
clumsy/unusable), but relying on the output columns
being called _1 and _2 and renaming then afterwards
seems like an anti-pattern.

3) using Row objects isn't very pretty. why not f: A
=> TraversableOnce[B] or something like that for the
first definition? how about:
 def explode[A: TypeTag, B: TypeTag](input:
Seq[Column], output: Seq[Column])(f: A =>
TraversableOnce[B]): DataFrame

best,
koert








Re: How to delete a record from parquet files using dataframes

2016-02-24 Thread Cheng Lian
Parquet is a read-only format. So the only way to remove data from a 
written Parquet file is to write a new Parquet file without unwanted rows.


Cheng

On 2/17/16 5:11 AM, SRK wrote:

Hi,

I am saving my records in the form of parquet files using dataframes in
hdfs. How to delete the records using dataframes?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-delete-a-record-from-parquet-files-using-dataframes-tp26242.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: cast column string -> timestamp in Parquet file

2016-01-25 Thread Cheng Lian

The following snippet may help:

  sqlContext.read.parquet(path).withColumn("col_ts", 
$"col".cast(TimestampType)).drop("col")


Cheng

On 1/21/16 6:58 AM, Muthu Jayakumar wrote:
DataFrame and udf. This may be more performant than doing an RDD 
transformation as you'll only transform just the column that requires 
to be changed.


Hope this helps.


On Thu, Jan 21, 2016 at 6:17 AM, Eli Super > wrote:


Hi

I have a large size parquet file .

I need to cast the whole column to timestamp format , then save

What the right way to do it ?

Thanks a lot






Re: DataFrame partitionBy to a single Parquet file (per partition)

2016-01-15 Thread Cheng Lian
You may try DataFrame.repartition(partitionExprs: Column*) to shuffle 
all data belonging to a single (data) partition into a single (RDD) 
partition:


|df.coalesce(1)|||.repartition("entity", "year", "month", "day", 
"status")|.write.partitionBy("entity", "year", "month", "day", 
"status").mode(SaveMode.Append).parquet(s"$location")|


(Unfortunately the naming here can be quite confusing.)

Cheng

On 1/14/16 11:48 PM, Patrick McGloin wrote:

Hi,

I would like to reparation / coalesce my data so that it is saved into 
one Parquet file per partition. I would also like to use the Spark SQL 
partitionBy API. So I could do that like this:


|df.coalesce(1).write.partitionBy("entity", "year", "month", "day", 
"status").mode(SaveMode.Append).parquet(s"$location") |


I've tested this and it doesn't seem to perform well. This is because 
there is only one partition to work on in the dataset and all the 
partitioning, compression and saving of files has to be done by one 
CPU core.


I could rewrite this to do the partitioning manually (using filter 
with the distinct partition values for example) before calling coalesce.


But is there a better way to do this using the standard Spark SQL API?

Best regards,

Patrick






Re: parquet repartitions and parquet.enable.summary-metadata does not work

2016-01-12 Thread Cheng Lian

I see. So there are actually 3000 tasks instead of 3000 jobs right?

Would you mind to provide the full stack trace of the GC issue? At first 
I thought it's identical to the _metadata one in the mail thread you 
mentioned.


Cheng

On 1/11/16 5:30 PM, Gavin Yue wrote:
Here is how I set the conf: 
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")


This actually works, I do not see the _metadata file anymore.

I think I made a mistake.  The 3000 jobs are coming from 
repartition("id").


I have 7600 json files and want to save as parquet.

So if I use:  df.write.parquet(path), it would generate 7600 parquet 
files with 7600 parititions which has no problem.


But if I use repartition to change partition number, say: 
df.reparition(3000).write.parquet


This would generate 7600 + 3000 tasks.  3000 tasks always fails due to 
GC problem.


Best,
Gavin



On Mon, Jan 11, 2016 at 4:31 PM, Cheng Lian <lian.cs@gmail.com 
<mailto:lian.cs@gmail.com>> wrote:


Hey Gavin,

Could you please provide a snippet of your code to show how did
you disabled "parquet.enable.summary-metadata" and wrote the
files? Especially, you mentioned you saw "3000 jobs" failed. Were
you writing each Parquet file with an individual job? (Usually
people use write.partitionBy(...).parquet(...) to write multiple
Parquet files.)

Cheng


On 1/10/16 10:12 PM, Gavin Yue wrote:

Hey,

I am trying to convert a bunch of json files into parquet,
which would output over 7000 parquet files. But tthere are too
many files, so I want to repartition based on id to 3000.

But I got the error of GC problem like this one:

https://mail-archives.apache.org/mod_mbox/spark-user/201512.mbox/%3CCAB4bC7_LR2rpHceQw3vyJ=l6xq9+9sjl3wgiispzyfh2xmt...@mail.gmail.com%3E#archives

So I set  parquet.enable.summary-metadata to false. But when I
write.parquet, I could still see the 3000 jobs run after the
writing parquet and they failed due to GC.

Basically repartition never succeeded for me. Is there any
other settings which could be optimized?

Thanks,
Gavin







Re: parquet repartitions and parquet.enable.summary-metadata does not work

2016-01-11 Thread Cheng Lian

Hey Gavin,

Could you please provide a snippet of your code to show how did you 
disabled "parquet.enable.summary-metadata" and wrote the files? 
Especially, you mentioned you saw "3000 jobs" failed. Were you writing 
each Parquet file with an individual job? (Usually people use 
write.partitionBy(...).parquet(...) to write multiple Parquet files.)


Cheng

On 1/10/16 10:12 PM, Gavin Yue wrote:

Hey,

I am trying to convert a bunch of json files into parquet, which would 
output over 7000 parquet files. But tthere are too many files, so I 
want to repartition based on id to 3000.


But I got the error of GC problem like this one: 
https://mail-archives.apache.org/mod_mbox/spark-user/201512.mbox/%3CCAB4bC7_LR2rpHceQw3vyJ=l6xq9+9sjl3wgiispzyfh2xmt...@mail.gmail.com%3E#archives


So I set  parquet.enable.summary-metadata to false. But when I 
write.parquet, I could still see the 3000 jobs run after the writing 
parquet and they failed due to GC.


Basically repartition never succeeded for me. Is there any other 
settings which could be optimized?


Thanks,
Gavin



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: memory leak when saving Parquet files in Spark

2015-12-14 Thread Cheng Lian

Thanks for the feedback, Matt!

Yes, we've also seen other feedback about slow Parquet summary file 
generation, especially when appending a small dataset to an existing 
large dataset. Disabling it is a reasonable workaround since the summary 
files are no longer important after parquet-mr 1.7.


We're planning to turn it off by default in future versions.

Cheng

On 12/15/15 12:27 AM, Matt K wrote:

Thanks Cheng!

I'm running 1.5. After setting the following, I'm no longer seeing 
this issue:


sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

Thanks,
-Matt

On Fri, Dec 11, 2015 at 1:58 AM, Cheng Lian <l...@databricks.com 
<mailto:l...@databricks.com>> wrote:


This is probably caused by schema merging. Were you using Spark
1.4 or earlier versions? Could you please try the following
snippet to see whether it helps:

df.write
  .format("parquet")
  .option("mergeSchema", "false")
  .partitionBy(partitionCols: _*)
  .mode(saveMode)
  .save(targetPath)

In 1.5, we've disabled schema merging by default.

Cheng


On 12/11/15 5:33 AM, Matt K wrote:

Hi all,

I have a process that's continuously saving data as Parquet with
Spark. The bulk of the saving logic simply looks like this:

  df.write
.format("parquet")
.partitionBy(partitionCols: _*)
.mode(saveMode).save(targetPath)

After running for a day or so, my process ran out of memory. I
took a memory-dump. I see that a single thread is holding 32,189
org.apache.parquet.hadoop.Footer objects, which in turn hold
ParquetMetadata. This is highly suspicious, since each thread
processes under 1GB of data at a time, and there's usually no
more than 10 files in a single batch (no small file problem). So
there may be a memory leak somewhere in the saveAsParquet code-path.

I've attached a screen-shot from Eclipse MemoryAnalyzer showing
the above. Note 32,189 references.

A shot in the dark, but is there a way to disable ParquetMetadata
file generation?

Thanks,
-Matt


-
To unsubscribe, e-mail:user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail:user-h...@spark.apache.org 
<mailto:user-h...@spark.apache.org>





--
www.calcmachine.com <http://www.calcmachine.com> - easy online calculator.




Re: About the bottleneck of parquet file reading in Spark

2015-12-10 Thread Cheng Lian
Cc Spark user list since this information is generally useful.

On Thu, Dec 10, 2015 at 3:31 PM, Lionheart <87249...@qq.com> wrote:

> Dear, Cheng
>  I'm a user of Spark. Our current Spark version is 1.4.1
>  In our project, I find there is a bottleneck when loading huge amount
> of parquet files. We tried to load more than 5 parquet files into the
> spark. The total size of the data is about 150G bytes. We find that Spark
> spent more than 30 minutes to do
>  sqlContext.read.option("mergSchema","false") .parquet(filelist:_*)
>  During this time, the network, disk and cpu are not busy. And based
> on the profile, all time is used by the FileSystem.globStatus(). Then I
> find the commit SPARK-8125 by you which accelerates the speed.
>  Then I update Spark to 1.5.1. Base on the test, the driver spent 13
> minutes to do the parquet reading. But I think there is still some
> possibility to improve this speed.
>   Base on the profile and reading the code, I find that the
> DataFrameReader method parquet is implemented in a serial manner to process
> the Path. Do you think if change parquet method into a concurrent version,
> the performance will become much better since there are many CPU core in
> the drive node of Spark?
>

Usually there shouldn't be many distinct paths passed to
DataFrameReader.parquet(). For those data files living under the same
parent directory, you can pass the path of their parent directory instead
of paths of all data files. Then I think this won't be a huge bottleneck.


>   By the way, when will the issud SPARK-8824 be solved. In my opinion,
> loss some precision with a warning message is better than throw a exception
> and say it is not supported.
>

This is a good question. For all those 4 data types,

   - DATE: It's actually already been supported, just resolved that JIRA
   ticket.
   - INTERVAL: We can start woking on this since now we've finally got
   CalendarIntervalType.
   - TIMESTAMP_MILLIS: We can start working on support this on the read
   path and convert extracted millisec timestamps to microsec ones. For the
   write path, maybe we can have an option to indicate whether
   TIMESTAMP_MILLIS or INT96 should be used to store timestamp values. If the
   former is chose, microsec part of the timestamp will be truncated.
   - TIMESTAMP_MICROS: Unfortunately this one depends on parquet-format and
   parquet-mr, which haven't added TIMESTAMP_MILLIS as OriginalType.



>
> Sincerely,
> Zhizhou Li
>
>
>
>


Re: memory leak when saving Parquet files in Spark

2015-12-10 Thread Cheng Lian
This is probably caused by schema merging. Were you using Spark 1.4 or 
earlier versions? Could you please try the following snippet to see 
whether it helps:


df.write
  .format("parquet")
  .option("mergeSchema", "false")
  .partitionBy(partitionCols: _*)
  .mode(saveMode)
  .save(targetPath)

In 1.5, we've disabled schema merging by default.

Cheng

On 12/11/15 5:33 AM, Matt K wrote:

Hi all,

I have a process that's continuously saving data as Parquet with 
Spark. The bulk of the saving logic simply looks like this:


  df.write
.format("parquet")
.partitionBy(partitionCols: _*)
.mode(saveMode).save(targetPath)

After running for a day or so, my process ran out of memory. I took a 
memory-dump. I see that a single thread is holding 32,189 
org.apache.parquet.hadoop.Footer objects, which in turn hold 
ParquetMetadata. This is highly suspicious, since each thread 
processes under 1GB of data at a time, and there's usually no more 
than 10 files in a single batch (no small file problem). So there may 
be a memory leak somewhere in the saveAsParquet code-path.


I've attached a screen-shot from Eclipse MemoryAnalyzer showing the 
above. Note 32,189 references.


A shot in the dark, but is there a way to disable ParquetMetadata file 
generation?


Thanks,
-Matt


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: parquet file doubts

2015-12-08 Thread Cheng Lian
Cc'd Parquet dev list. At first I expected to discuss this issue on 
Parquet dev list but sent to the wrong mailing list. However, I think 
it's OK to discuss it here since lots of Spark users are using Parquet 
and this information should be generally useful here.


Comments inlined.

On 12/7/15 10:34 PM, Shushant Arora wrote:

how to read it using parquet tools.
When I did
hadoop parquet.tools.Main meta prquetfilename

I didn't get any info of min and max values.
Didn't realize that you meant to inspect min/max values since what you 
asked was how to inspect the version of Parquet library that is used to 
generate the Parquet file.


Currently parquet-tools doesn't print min/max statistics information. 
I'm afraid you'll have to do it programmatically.
How can I see parquet version of my file.Is min max respective to some 
parquet version or available since beginning?
AFAIK, it was added in 1.5.0 
https://github.com/apache/parquet-mr/blob/parquet-1.5.0/parquet-column/src/main/java/parquet/column/statistics/Statistics.java


But I failed to find corresponding JIRA ticket or pull request for this.



On Mon, Dec 7, 2015 at 6:51 PM, Singh, Abhijeet 
<absi...@informatica.com <mailto:absi...@informatica.com>> wrote:


Yes, Parquet has min/max.

*From:*Cheng Lian [mailto:l...@databricks.com
<mailto:l...@databricks.com>]
*Sent:* Monday, December 07, 2015 11:21 AM
*To:* Ted Yu
*Cc:* Shushant Arora; user@spark.apache.org
<mailto:user@spark.apache.org>
*Subject:* Re: parquet file doubts

Oh sorry... At first I meant to cc spark-user list since Shushant
and I had been discussed some Spark related issues before. Then I
realized that this is a pure Parquet issue, but forgot to change
the cc list. Thanks for pointing this out! Please ignore this thread.

Cheng

On 12/7/15 12:43 PM, Ted Yu wrote:

Cheng:

I only see user@spark in the CC.

FYI

On Sun, Dec 6, 2015 at 8:01 PM, Cheng Lian
<l...@databricks.com <mailto:l...@databricks.com>> wrote:

cc parquet-dev list (it would be nice to always do so for
these general questions.)

Cheng

On 12/6/15 3:10 PM, Shushant Arora wrote:

Hi

I have few doubts on parquet file format.

1.Does parquet keeps min max statistics like in ORC. how can I
see parquet version(whether its1.1,1.2or1.3) for parquet file
generated using hive or custom MR or AvroParquetoutputFormat.

Yes, Parquet also keeps row group statistics. You may check
the Parquet file using the parquet-meta CLI tool in
parquet-tools (see
https://github.com/Parquet/parquet-mr/issues/321 for details),
then look for the "creator" field of the file. For
programmatic access, check for
o.a.p.hadoop.metadata.FileMetaData.createdBy.


2.how to sort parquet records while generating parquet file
using avroparquetoutput format?

AvroParquetOutputFormat is not a format. It's just responsible
for converting Avro records to Parquet records. How are you
using AvroParquetOutputFormat? Any example snippets?


Thanks



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>






Re: parquet file doubts

2015-12-06 Thread Cheng Lian
cc parquet-dev list (it would be nice to always do so for these general 
questions.)


Cheng

On 12/6/15 3:10 PM, Shushant Arora wrote:

Hi

I have few doubts on parquet file format.

1.Does parquet keeps min max statistics like in ORC. how can I see 
parquet version(whether its1.1,1.2or1.3) for parquet file generated 
using hive or custom MR or AvroParquetoutputFormat.
Yes, Parquet also keeps row group statistics. You may check the Parquet 
file using the parquet-meta CLI tool in parquet-tools (see 
https://github.com/Parquet/parquet-mr/issues/321 for details), then look 
for the "creator" field of the file. For programmatic access, check for 
o.a.p.hadoop.metadata.FileMetaData.createdBy.


2.how to sort parquet records while generating parquet file using 
avroparquetoutput format?
AvroParquetOutputFormat is not a format. It's just responsible for 
converting Avro records to Parquet records. How are you using 
AvroParquetOutputFormat? Any example snippets?


Thanks



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: parquet file doubts

2015-12-06 Thread Cheng Lian
Oh sorry... At first I meant to cc spark-user list since Shushant and I 
had been discussed some Spark related issues before. Then I realized 
that this is a pure Parquet issue, but forgot to change the cc list. 
Thanks for pointing this out! Please ignore this thread.


Cheng

On 12/7/15 12:43 PM, Ted Yu wrote:

Cheng:
I only see user@spark in the CC.

FYI

On Sun, Dec 6, 2015 at 8:01 PM, Cheng Lian <l...@databricks.com 
<mailto:l...@databricks.com>> wrote:


cc parquet-dev list (it would be nice to always do so for these
general questions.)

Cheng

On 12/6/15 3:10 PM, Shushant Arora wrote:

Hi

I have few doubts on parquet file format.

1.Does parquet keeps min max statistics like in ORC. how can I
see parquet version(whether its1.1,1.2or1.3) for parquet file
generated using hive or custom MR or AvroParquetoutputFormat.

Yes, Parquet also keeps row group statistics. You may check the
Parquet file using the parquet-meta CLI tool in parquet-tools (see
https://github.com/Parquet/parquet-mr/issues/321 for details),
then look for the "creator" field of the file. For programmatic
access, check for o.a.p.hadoop.metadata.FileMetaData.createdBy.


2.how to sort parquet records while generating parquet file
using avroparquetoutput format?

AvroParquetOutputFormat is not a format. It's just responsible for
converting Avro records to Parquet records. How are you using
AvroParquetOutputFormat? Any example snippets?


Thanks



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>






Re: df.partitionBy().parquet() java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-12-02 Thread Cheng Lian
You may try to set Hadoop conf "parquet.enable.summary-metadata" to 
false to disable writing Parquet summary files (_metadata and 
_common_metadata).


By default Parquet writes the summary files by collecting footers of all 
part-files in the dataset while committing the job. Spark also follows 
this convention. However, it turned out that the summary files aren't 
very useful in practice now, unless you have other downstream tools that 
strictly depend on the summary files. For example, if you don't need 
schema merging, Spark simply picks a random part-file to discovery the 
dataset schema. If you need schema merging, Spark has to read footers of 
all part-files anyway (but in a distributed, parallel way).


Cheng

On 12/3/15 6:11 AM, Don Drake wrote:
Does anyone have any suggestions on creating a large amount of parquet 
files? Especially in regards to the last phase where it creates the 
_metadata.


Thanks.

-Don

On Sat, Nov 28, 2015 at 9:02 AM, Don Drake > wrote:


I have a 2TB dataset that I have in a DataFrame that I am
attempting to partition by 2 fields and my YARN job seems to write
the partitioned dataset successfully.  I can see the output in
HDFS once all Spark tasks are done.

After the spark tasks are done, the job appears to be running for
over an hour, until I get the following (full stack trace below):

java.lang.OutOfMemoryError: GC overhead limit exceeded
at

org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(ParquetMetadataConverter.java:238)

I had set the driver memory to be 20GB.

I attempted to read in the partitioned dataset and got another
error saying the /_metadata directory was not a parquet file.  I
removed the _metadata directory and was able to query the data,
but it appeared to not use the partitioned directory when I
attempted to filter the data (it read every directory).

This is Spark 1.5.2 and I saw the same problem when running the
code in both Scala and Python.

Any suggestions are appreciated.

-Don

15/11/25 00:00:19 ERROR datasources.InsertIntoHadoopFsRelation:
Aborting job.
java.lang.OutOfMemoryError: GC overhead limit exceeded
at

org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(ParquetMetadataConverter.java:238)
at

org.apache.parquet.format.converter.ParquetMetadataConverter.addRowGroup(ParquetMetadataConverter.java:167)
at

org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetMetadata(ParquetMetadataConverter.java:79)
at

org.apache.parquet.hadoop.ParquetFileWriter.serializeFooter(ParquetFileWriter.java:405)
at

org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:433)
at

org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:423)
at

org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
at

org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at

org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:208)
at

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
at

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at

org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at

org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at

org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at

org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at

org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at

org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at


Re: DateTime Support - Hive Parquet

2015-11-29 Thread Cheng Lian
Oh sorry, you're right. Implicit conversion doesn't affect the schema 
inference process.


Just checked that Joda is already a direct dependency of Spark. So I 
think it's probably fine to add support for recognizing Joda DateTime as 
SQL TimestampType. Would you mind to file a JIRA ticket for it? PR is 
also welcomed!


Cheng

On 11/24/15 8:05 PM, Bryan wrote:


Cheng,

I am using Scala. I have an implicit conversion from Joda DateTime to 
timestamp. My tables are defined with Timestamp. However explicit 
conversation appears to be required. Do you have an example of 
implicit conversion for this case? Do you convert on insert or on RDD 
to DF conversion?


Regards,

Bryan Jeffrey

Sent from Outlook Mail


*From: *Cheng Lian
*Sent: *Tuesday, November 24, 2015 6:49 AM
*To: *Bryan;user
*Subject: *Re: DateTime Support - Hive Parquet

I see, then this is actually irrelevant to Parquet. I guess can 
support Joda DateTime in Spark SQL reflective schema inference to have 
this, provided that this is a frequent use case and Spark SQL already 
has Joda as a direct dependency.


On the other hand, if you are using Scala, you can write a simple 
implicit conversion method to avoid all the manual conversions.


Cheng

On 11/24/15 7:25 PM, Bryan wrote:

Cheng,

That’s exactly what I was hoping for – native support for writing
DateTime objects. As it stands Spark 1.5.2 seems to leave no
option but to do manual conversion (to nanos, Timestamp, etc)
prior to writing records to hive.

Regards,

Bryan Jeffrey

Sent from Outlook Mail


*From: *Cheng Lian
*Sent: *Tuesday, November 24, 2015 1:42 AM
*To: *Bryan Jeffrey;user
*Subject: *Re: DateTime Support - Hive Parquet

Hey Bryan,

What do you mean by "DateTime properties"? Hive and Spark SQL both

support DATE and TIMESTAMP types, but there's no DATETIME type. So I

assume you are referring to Java class DateTime (possibly the one in

joda)? Could you please provide a sample snippet that illustrates
your

requirement?

Cheng

On 11/23/15 9:40 PM, Bryan Jeffrey wrote:

> All,

>

> I am attempting to write objects that include a DateTime
properties to

> a persistent table using Spark 1.5.2 / HiveContext.  In 1.4.1 I was

> forced to convert the DateTime properties to Timestamp
properties.  I

> was under the impression that this issue was fixed in the
default Hive

> supported with 1.5.2 - however, I am still seeing the associated
errors.

>

> Is there a bug I can follow to determine when DateTime will be

> supported for Parquet?

>

> Regards,

>

> Bryan Jeffrey





Re: Parquet files not getting coalesced to smaller number of files

2015-11-29 Thread Cheng Lian
RDD.coalesce(n) returns a new RDD rather than modifying the original 
RDD. So what you need is:


metricsToBeSaved.coalesce(1500).saveAsNewAPIHadoopFile(...)

Cheng

On 11/29/15 12:21 PM, SRK wrote:

Hi,

I have the following code that saves the parquet files in my hourly batch to
hdfs. My idea is to coalesce the files to 1500 smaller files. The first run
it gives me 1500 files in hdfs. For the next runs the files seem to be
increasing even though I coalesce.

  Its not getting coalesced to 1500 files as I want. I also have an example
that I am using in the end. Please let me know if there is a different and
more efficient way of doing this.


 val job = Job.getInstance()

 var filePath = "path"


 val metricsPath: Path = new Path(filePath)

 //Check if inputFile exists
 val fs: FileSystem = FileSystem.get(job.getConfiguration)

 if (fs.exists(metricsPath)) {
   fs.delete(metricsPath, true)
 }


 // Configure the ParquetOutputFormat to use Avro as the
serialization format
 ParquetOutputFormat.setWriteSupportClass(job,
classOf[AvroWriteSupport])
 // You need to pass the schema to AvroParquet when you are writing
objects but not when you
 // are reading them. The schema is saved in Parquet file for future
readers to use.
 AvroParquetOutputFormat.setSchema(job, Metrics.SCHEMA$)


 // Create a PairRDD with all keys set to null and wrap each Metrics
in serializable objects
 val metricsToBeSaved = metrics.map(metricRecord => (null, new
SerializableMetrics(new Metrics(metricRecord._1, metricRecord._2._1,
metricRecord._2._2;

 metricsToBeSaved.coalesce(1500)
 // Save the RDD to a Parquet file in our temporary output directory
 metricsToBeSaved.saveAsNewAPIHadoopFile(filePath, classOf[Void],
classOf[Metrics],
   classOf[ParquetOutputFormat[Metrics]], job.getConfiguration)


https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SparkParquetExample.scala



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-files-not-getting-coalesced-to-smaller-number-of-files-tp25509.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DateTime Support - Hive Parquet

2015-11-24 Thread Cheng Lian
I see, then this is actually irrelevant to Parquet. I guess can support 
Joda DateTime in Spark SQL reflective schema inference to have this, 
provided that this is a frequent use case and Spark SQL already has Joda 
as a direct dependency.


On the other hand, if you are using Scala, you can write a simple 
implicit conversion method to avoid all the manual conversions.


Cheng

On 11/24/15 7:25 PM, Bryan wrote:


Cheng,

That’s exactly what I was hoping for – native support for writing 
DateTime objects. As it stands Spark 1.5.2 seems to leave no option 
but to do manual conversion (to nanos, Timestamp, etc) prior to 
writing records to hive.


Regards,

Bryan Jeffrey

Sent from Outlook Mail


*From: *Cheng Lian
*Sent: *Tuesday, November 24, 2015 1:42 AM
*To: *Bryan Jeffrey;user
*Subject: *Re: DateTime Support - Hive Parquet

Hey Bryan,

What do you mean by "DateTime properties"? Hive and Spark SQL both

support DATE and TIMESTAMP types, but there's no DATETIME type. So I

assume you are referring to Java class DateTime (possibly the one in

joda)? Could you please provide a sample snippet that illustrates your

requirement?

Cheng

On 11/23/15 9:40 PM, Bryan Jeffrey wrote:

> All,

>

> I am attempting to write objects that include a DateTime properties to

> a persistent table using Spark 1.5.2 / HiveContext.  In 1.4.1 I was

> forced to convert the DateTime properties to Timestamp properties.  I

> was under the impression that this issue was fixed in the default Hive

> supported with 1.5.2 - however, I am still seeing the associated errors.

>

> Is there a bug I can follow to determine when DateTime will be

> supported for Parquet?

>

> Regards,

>

> Bryan Jeffrey





Re: DateTime Support - Hive Parquet

2015-11-23 Thread Cheng Lian

Hey Bryan,

What do you mean by "DateTime properties"? Hive and Spark SQL both 
support DATE and TIMESTAMP types, but there's no DATETIME type. So I 
assume you are referring to Java class DateTime (possibly the one in 
joda)? Could you please provide a sample snippet that illustrates your 
requirement?


Cheng

On 11/23/15 9:40 PM, Bryan Jeffrey wrote:

All,

I am attempting to write objects that include a DateTime properties to 
a persistent table using Spark 1.5.2 / HiveContext.  In 1.4.1 I was 
forced to convert the DateTime properties to Timestamp properties.  I 
was under the impression that this issue was fixed in the default Hive 
supported with 1.5.2 - however, I am still seeing the associated errors.


Is there a bug I can follow to determine when DateTime will be 
supported for Parquet?


Regards,

Bryan Jeffrey



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: dounbts on parquet

2015-11-19 Thread Cheng Lian

/cc Spark user list

I'm confused here, you mentioned that you were writing Parquet files 
using MR jobs. What's the relation between that Parquet writing task and 
this JavaPairRDD one? Is it a separate problem?


Spark supports dynamic partitioning (e.g. df.write.partitionBy("col1", 
"col2").format("").save(path)), and there's a 
spark-avro 
<logWarning%28%22WARNING:%20Failed%20to%20write%20command%20history%20file:%20%22%20+%20e.getMessage%29> 
data source. If you are writing Avro records to multiple partitions, 
these two should help.


Cheng

On 11/19/15 4:30 PM, Shushant Arora wrote:

Thanks Cheng.

I have used avroParquetOutputFormat and it works fine.
my requirement is now to handle writing in multiple folders at same 
time. Basically the JavaPairrdd<Void,GenericRecord> I want to write to 
multiple folders based on final hive partitions where this rdd will 
lend.Have you used multiple output formats in spark?




On Fri, Nov 13, 2015 at 3:56 PM, Cheng Lian <lian.cs@gmail.com 
<mailto:lian.cs@gmail.com>> wrote:


Oh I see. Then parquet-avro should probably be more useful. AFAIK,
parquet-hive is only used internally in Hive. I don't see anyone
using it directly.

In general, you can first parse your text data, assemble them into
Avro records, and then write these records to Parquet.

BTW, Spark 1.2 also provides Parquet support. Since you're trying
to convert text data, I guess you probably don't have any nested
data. In that case, Spark 1.2 should be enough. it's not that
Spark 1.2 can't deal with nested data, it's about interoperability
with Hive. Because in the early days, Parquet spec itself didn't
specify how to write nested data. You may refer to this link for
more details:
http://spark.apache.org/docs/1.2.1/sql-programming-guide.html#parquet-files

Cheng


On 11/13/15 6:11 PM, Shushant Arora wrote:

No , I don't have data loaded in text form to hive- It was for
getting internals of what approach hive is taking .

I want direct writing to parquet file from MR job. For that Hive
Parquet datamodel vs Avro Parquet data model which approach is
    better?


On Fri, Nov 13, 2015 at 3:24 PM, Cheng Lian
<lian.cs@gmail.com <mailto:lian.cs@gmail.com>> wrote:

If you are already able to load the text data into Hive, then
using Hive itself to convert the data is obviously the
easiest and most compatible way. For example:

CREATE TABLE text_table (key INT, value STRING);
LOAD DATA LOCAL INPATH '/tmp/data.txt' INTO TABLE text_table;

CREATE TABLE parquet_table
STORED AS PARQUET
AS SELECT * FROM text_table;

Cheng


On 11/13/15 5:13 PM, Shushant Arora wrote:

Thanks !
so which one is better for dumping text data to hive using
custom MR/spark job - Hive Parquet datamodel using
hivewritable or Avro Parquet datamodel using avro object?

On Fri, Nov 13, 2015 at 12:45 PM, Cheng Lian
<lian.cs@gmail.com <mailto:lian.cs@gmail.com>> wrote:

ParquetOutputFormat is not a data model. A data model
provides a WriteSupport to ParquetOutputFormat to tell
Parquet how to convert upper level domain objects (Hive
Writables, Avro records, etc.) to Parquet records. So
all other data models uses it for writing Parquet files.

Hive does have a Parquet data model. If you create a
Parquet table in Hive like "CREATE TABLE t (key INT,
value STRING) STORED AS PARQUET", it invokes the Hive
Parquet data model when reading/write table t. In the
case you mentioned, records in the text table are
firstly extracted out by Hive into Hive Writables, and
then the Hive Parquet data model converts those
Writables into Parquet records.

Cheng


On 11/13/15 2:37 PM, Shushant Arora wrote:

Thanks Cheng.

I have spark version 1.2 deployed on my cluster so for
the time being I cannot use direct spark sql functionality.
I will try with AvroParquetOutputFormat. Just want to
know how AvroParquetOutputFormat is better than direct
ParquetOutputFormat ? And also is there any hive object
model - I mean when I create a parquet table in hive
and insert data in that table using text table which
object model does hive uses internally?

    Thanks
Shushant

On Fri, Nov 13, 2015 at 9:14 AM, Cheng Lian
<lian.cs@gmail.com <mailto:lian.cs@gmail.com>>
wrote:

If I understand your question correctly, you are
trying to write Parquet files using a s

Re: Unwanted SysOuts in Spark Parquet

2015-11-10 Thread Cheng Lian
This is because of PARQUET-369 
, which prevents 
users or other libraries to override Parquet's JUL logging settings via 
SLF4J. It has been fixed in the most recent parquet-format master (PR 
#32 ), but 
unfortunately there hasn't been a release yet.


Cheng

On 11/9/15 3:40 PM, swetha wrote:

Hi,

I see a lot of unwanted SysOuts when I try to save an RDD as parquet file.
Following is the code and
SysOuts. Any idea as to how to avoid the unwanted SysOuts?


 ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])

 AvroParquetOutputFormat.setSchema(job, ActiveSession.SCHEMA$)
activeSessionsToBeSaved.saveAsNewAPIHadoopFile("test", classOf[Void],
classOf[ActiveSession],
   classOf[ParquetOutputFormat[ActiveSession]], job.getConfiguration)

Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.codec.CodecConfig: Compression
set to false
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.codec.CodecConfig: Compression:
UNCOMPRESSED
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
block size to 134217728
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
page size to 1048576
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
dictionary page size to 1048576
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Dictionary
is on
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Validation
is off
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Writer
version is: PARQUET_1_0
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.InternalParquetRecordWriter:
Flushing mem columnStore to file. allocated memory: 29,159,377
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.codec.CodecConfig: Compression
set to false
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.codec.CodecConfig: Compression:
UNCOMPRESSED
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
block size to 134217728
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
page size to 1048576
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
dictionary page size to 1048576
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Dictionary
is on
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Validation
is off



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unwanted-SysOuts-in-Spark-Parquet-tp25325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org






Re: very slow parquet file write

2015-11-06 Thread Cheng Lian
I'd expect writing Parquet files slower than writing JSON files since 
Parquet involves more complicated encoders, but maybe not that slow. 
Would you mind to try to profile one Spark executor using tools like YJP 
to see what's the hotspot?


Cheng

On 11/6/15 7:34 AM, rok wrote:

Apologies if this appears a second time!

I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions into a
parquet file on HDFS. I've got a few hundred nodes in the cluster, so for
the size of file this is way over-provisioned (I've tried it with fewer
partitions and fewer nodes, no obvious effect). I was expecting the dump to
disk to be very fast -- the DataFrame is cached in memory and contains just
14 columns (13 are floats and one is a string). When I write it out in json
format, this is indeed reasonably fast (though it still takes a few minutes,
which is longer than I would expect).

However, when I try to write a parquet file it takes way longer -- the first
set of tasks finishes in a few minutes, but the subsequent tasks take more
than twice as long or longer. In the end it takes over half an hour to write
the file. I've looked at the disk I/O and cpu usage on the compute nodes and
it looks like the processors are fully loaded while the disk I/O is
essentially zero for long periods of time. I don't see any obvious garbage
collection issues and there are no problems with memory.

Any ideas on how to debug/fix this?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/very-slow-parquet-file-write-tp25295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: very slow parquet file write

2015-11-06 Thread Cheng Lian



On 11/6/15 10:53 PM, Rok Roskar wrote:
yes I was expecting that too because of all the metadata generation 
and compression. But I have not seen performance this bad for other 
parquet files I’ve written and was wondering if there could be 
something obvious (and wrong) to do with how I’ve specified the schema 
etc. It’s a very simple schema consisting of a StructType with a few 
StructField floats and a string. I’m using all the spark defaults for 
io compression.


I'll see what I can do about running a profiler -- can you point me to 
a resource/example?
This link is probably helpful: 
https://cwiki.apache.org/confluence/display/SPARK/Profiling+Spark+Applications+Using+YourKit


Thanks,

Rok

ps: my post on the mailing list is still listed as not accepted by the 
mailing list: 
http://apache-spark-user-list.1001560.n3.nabble.com/very-slow-parquet-file-write-td25295.html 
-- none of your responses are there either. I am definitely subscribed 
to the list though (I get daily digests). Any clue how to fix it?

Sorry, no idea :-/





On Nov 6, 2015, at 9:26 AM, Cheng Lian <lian.cs@gmail.com 
<mailto:lian.cs@gmail.com>> wrote:


I'd expect writing Parquet files slower than writing JSON files since 
Parquet involves more complicated encoders, but maybe not that slow. 
Would you mind to try to profile one Spark executor using tools like 
YJP to see what's the hotspot?


Cheng

On 11/6/15 7:34 AM, rok wrote:

Apologies if this appears a second time!

I'm writing a ~100 Gb pyspark DataFrame with a few hundred 
partitions into a
parquet file on HDFS. I've got a few hundred nodes in the cluster, 
so for

the size of file this is way over-provisioned (I've tried it with fewer
partitions and fewer nodes, no obvious effect). I was expecting the 
dump to
disk to be very fast -- the DataFrame is cached in memory and 
contains just
14 columns (13 are floats and one is a string). When I write it out 
in json
format, this is indeed reasonably fast (though it still takes a few 
minutes,

which is longer than I would expect).

However, when I try to write a parquet file it takes way longer -- 
the first
set of tasks finishes in a few minutes, but the subsequent tasks 
take more
than twice as long or longer. In the end it takes over half an hour 
to write
the file. I've looked at the disk I/O and cpu usage on the compute 
nodes and

it looks like the processors are fully loaded while the disk I/O is
essentially zero for long periods of time. I don't see any obvious 
garbage

collection issues and there are no problems with memory.

Any ideas on how to debug/fix this?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/very-slow-parquet-file-write-tp25295.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org 
<mailto:user-h...@spark.apache.org>











Re: Issue of Hive parquet partitioned table schema mismatch

2015-11-04 Thread Cheng Lian
Is there any chance that " spark.sql.hive.convertMetastoreParquet" is 
turned off?


Cheng

On 11/4/15 5:15 PM, Rex Xiong wrote:

Thanks Cheng Lian.
I found in 1.5, if I use spark to create this table with partition 
discovery, the partition pruning can be performed, but for my old 
table definition in pure Hive, the execution plan will do a parquet 
scan across all partitions, and it runs very slow.

Looks like the execution plan optimization is different.

2015-11-03 23:10 GMT+08:00 Cheng Lian <lian.cs@gmail.com 
<mailto:lian.cs@gmail.com>>:


SPARK-11153 should be irrelevant because you are filtering on a
partition key while SPARK-11153 is about Parquet filter push-down
and doesn't affect partition pruning.

Cheng


On 11/3/15 7:14 PM, Rex Xiong wrote:


We found the query performance is very poor due to this issue
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-11153
We usually use filter on partition key, the date, it's in string
type in 1.3.1 and works great.
But in 1.5, it needs to do parquet scan for all partitions.

2015年10月31日 下午7:38,"Rex Xiong" <bycha...@gmail.com
<mailto:bycha...@gmail.com>> 写道:

Add back this thread to email list, forgot to reply all.

2015年10月31日 下午7:23,"Michael Armbrust"
<mich...@databricks.com <mailto:mich...@databricks.com>> 写道:

Not that I know of.

On Sat, Oct 31, 2015 at 12:22 PM, Rex Xiong
<bycha...@gmail.com <mailto:bycha...@gmail.com>> wrote:

Good to know that, will have a try.
So there is no easy way to achieve it in pure hive
method?

2015年10月 31日 下午7:17,"Michael Armbrust"
<mich...@databricks.com
<mailto:mich...@databricks.com>> 写道:

Yeah, this was rewritten to be faster in Spark
1.5.  We use it with 10,000s of partitions.

On Sat, Oct 31, 2015 at 7:17 AM, Rex Xiong
<bycha...@gmail.com <mailto:bycha...@gmail.com>>
wrote:

1.3.1
It is a lot of improvement in 1.5+?

2015-10-30 19:23 GMT+08:00 Michael Armbrust
<mich...@databricks.com
<mailto:mich...@databricks.com>>:

We have tried schema merging feature,
but it's too slow, there're hundreds
of partitions.

Which version of Spark?











Re: Issue of Hive parquet partitioned table schema mismatch

2015-11-03 Thread Cheng Lian
SPARK-11153 should be irrelevant because you are filtering on a 
partition key while SPARK-11153 is about Parquet filter push-down and 
doesn't affect partition pruning.


Cheng

On 11/3/15 7:14 PM, Rex Xiong wrote:


We found the query performance is very poor due to this issue
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-11153
We usually use filter on partition key, the date, it's in string type 
in 1.3.1 and works great.

But in 1.5, it needs to do parquet scan for all partitions.

2015年10月31日 下午7:38,"Rex Xiong" > 写道:


Add back this thread to email list, forgot to reply all.

2015年10月31日 下午7:23,"Michael Armbrust"
> 写道:

Not that I know of.

On Sat, Oct 31, 2015 at 12:22 PM, Rex Xiong
> wrote:

Good to know that, will have a try.
So there is no easy way to achieve it in pure hive method?

2015年10月31日 下午7:17,"Michael Armbrust"
>
写道:

Yeah, this was rewritten to be faster in Spark 1.5. 
We use it with 10,000s of partitions.


On Sat, Oct 31, 2015 at 7:17 AM, Rex Xiong
> wrote:

1.3.1
It is a lot of improvement in 1.5+?

2015-10-30 19:23 GMT+08:00 Michael Armbrust
>:

We have tried schema merging feature, but
it's too slow, there're hundreds of
partitions.

Which version of Spark?








Re: Filter applied on merged Parquet shemsa with new column fails.

2015-10-28 Thread Cheng Lian

Hey Hyukjin,

Sorry that I missed the JIRA ticket. Thanks for bring this issue up 
here, your detailed investigation.


From my side, I think this is a bug of Parquet. Parquet was designed to 
support schema evolution. When scanning a Parquet, if a column exists in 
the requested schema but missing in the file schema, that column is 
filled with null. This should also hold for pushed-down predicate 
filters. For example, if filter "a = 1" is pushed down but column "a" 
doesn't exist in the Parquet file being scanned, it's safe to assume "a" 
is null in all records and drop all of them. On the contrary, if "a IS 
NULL" is pushed down, all records should be preserved.


Apparently, before this issue is properly fixed on Parquet side, we need 
to workaround this issue from Spark side. Please see my comments of all 
3 of your solutions inlined below. In short, I'd like to have approach 1 
for branch-1.5 and approach 2 for master.


Cheng

On 10/28/15 10:11 AM, Hyukjin Kwon wrote:
When enabling mergedSchema and predicate filter, this fails 
since Parquet filters are pushed down regardless of each schema of the 
splits (or rather files).


Dominic Ricard reported this 
issue (https://issues.apache.org/jira/browse/SPARK-11103)


Even though this would work okay by setting 
spark.sql.parquet.filterPushdown to false, the default value of this 
is true. So this looks an issue.


My questions are,
is this clearly an issue?
and if so, which way would this be handled?


I thought this is an issue and I made three rough patches for this and 
tested them and this looks fine though.


The first approach looks simpler and appropriate as I presume from the 
previous approaches such as 
https://issues.apache.org/jira/browse/SPARK-11153
However, in terms of safety and performances, I also want to ensure 
which one would be a proper approach before trying to open a PR.


1. Simply set false to spark.sql.parquet.filterPushdown when using 
mergeSchema
This one is pretty simple and safe, I'd like to have this for 1.5.2, or 
1.5.3 if we can't make it for 1.5.2.


2. If spark.sql.parquet.filterPushdown is true, retrieve all the 
schema of every part-files (and also merged one) and check if each can 
accept the given schema and then, apply the filter only when they all 
can accept, which I think it's a bit over-implemented.
Actually we only need to calculate the intersection of all file 
schemata. We can make ParquetRelation.mergeSchemaInParallel return two 
StructTypes, the first one is the original merged schema, the other is 
the intersection of all file schemata, which only contains fields that 
exist in all file schemata. Then we decide which filter to pushed down 
according to the second StructType.


3. If spark.sql.parquet.filterPushdown is true, retrieve all the 
schema of every part-files (and also merged one) and apply the filter 
to each split (rather file) that can accept the filter which (I think 
it's hacky) ends up different configurations for each task in a job.
The idea I came up with at first was similar to this one. Instead of 
pulling all file schemata to driver side, we can push filter push-down 
to executor side. Namely, passing candidate filters to executor side, 
and compute the Parquet predicate filter according to each file schema. 
I haven't looked into this direction in depth, but we can probably put 
this part into CatalystReadSupport, which is now initialized on executor 
side.


However, correctness of this approach can only guaranteed by the 
defensive filtering we do in Spark SQL (i.e. apply all the filters no 
matter they are pushed down or not), but we are considering to remove it 
because it imposes unnecessary performance cost. This makes me hesitant 
to go along this way.


Re: Fixed writer version as version1 for Parquet as wring a Parquet file.

2015-10-09 Thread Cheng Lian

Hi Hyukjin,

Thanks for bringing this up. Could you please make a PR for this one? We 
didn't use PARQUET_2_0 mostly because it's less mature than PARQUET_1_0, 
but we should let users choose the writer version, as long as 
PARQUET_1_0 remains the default option.


Cheng

On 10/8/15 11:04 PM, Hyukjin Kwon wrote:

Hi all,

While wring some parquet files by Spark, I found it actually only 
writes the parquet files with writer version1.


This differs encoding types of the file.

Is this intendedly fixed for some reasons?


I changed codes and tested to write this as writer version2 and it 
looks fine.


In more details,
I found it fixes the writer version in 
org.apache.spark.sql.execution.datasources.parquet.CatalystWriteSupport.scala


|def setSchema(schema: StructType, configuration: Configuration): Unit 
= { schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) 
configuration.set(SPARK_ROW_SCHEMA, schema.json) configuration.set( 
ParquetOutputFormat.WRITER_VERSION, 
ParquetProperties.WriterVersion.PARQUET_1_0.toString) } |

​

I changed this to this in order to keep the given configuration

|def setSchema(schema: StructType, configuration: Configuration): Unit 
= { schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) 
configuration.set(SPARK_ROW_SCHEMA, schema.json) configuration.set( 
ParquetOutputFormat.WRITER_VERSION, 
configuration.get(ParquetOutputFormat.WRITER_VERSION, 
ParquetProperties.WriterVersion.PARQUET_1_0.toString) ) } |

​

and set the version to version2
|sc.hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION, 
ParquetProperties.WriterVersion.PARQUET_2_0.toString) |

​








Re: Parquet file size

2015-10-08 Thread Cheng Lian
How many tasks are there in the write job? Since each task may write one 
file for each partition, you may end up with taskNum * 31 files.


Increasing SPLIT_MINSIZE does help reducing task number. Another way to 
address this issue is to use DataFrame.coalesce(n) to shrink task number 
to n explicitly.


Cheng

On 10/7/15 6:40 PM, Younes Naguib wrote:

Thanks, I'll try that.

*Younes Naguib**Streaming Division*

Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 
1R8**


Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | 
younes.nag...@tritondigital.com<mailto:younes.nag...@streamtheworld.com>**



*From:* odeach...@gmail.com [odeach...@gmail.com] on behalf of Deng 
Ching-Mallete [och...@apache.org]

*Sent:* Wednesday, October 07, 2015 9:14 PM
*To:* Younes Naguib
*Cc:* Cheng Lian; user@spark.apache.org
*Subject:* Re: Parquet file size

Hi,

In our case, we're using 
the org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE to 
increase the size of the RDD partitions when loading text files, so it 
would generate larger parquet files. We just set it in the Hadoop conf 
of the SparkContext. You need to be careful though about setting it to 
a large value, as you might encounter issues related to this:

https://issues.apache.org/jira/browse/SPARK-6235

For our jobs, we're setting the split size to 512MB which generates 
between 110-200MB parquet files using the default compression. We're 
using Spark-1.3.1, btw, and we also have the same partitioning of 
year/month/day for our parquet files.


HTH,
Deng

On Thu, Oct 8, 2015 at 8:25 AM, Younes Naguib 
<younes.nag...@tritondigital.com 
<mailto:younes.nag...@tritondigital.com>> wrote:


Well, I only have data for 2015-08. So, in the end, only 31
partitions
What I'm looking for, is some reasonably sized partitions.
In any case, just the idea of controlling the output parquet files
size or number would be nice.

*Younes Naguib**Streaming Division*

Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC 
H3G 1R8**


Tel.: +1 514 448 4037 x2688 <tel:%2B1%20514%20448%204037%20x2688>
| Tel.: +1 866 448 4037 x2688
<tel:%2B1%20866%20448%204037%20x2688> |
younes.nag...@tritondigital.com<mailto:younes.nag...@streamtheworld.com>**

----
*From:* Cheng Lian [lian.cs@gmail.com
<mailto:lian.cs@gmail.com>]
*Sent:* Wednesday, October 07, 2015 7:01 PM

*To:* Younes Naguib; 'user@spark.apache.org
<mailto:user@spark.apache.org>'
*Subject:* Re: Parquet file size

The reason why so many small files are generated should probably
be the fact that you are inserting into a partitioned table with
three partition columns.

If you want a large Parquet files, you may try to either avoid
using partitioned table, or using less partition columns (e.g.,
only year, without month and day).

Cheng

So you want to dump all data into a single large Parquet file?

On 10/7/15 1:55 PM, Younes Naguib wrote:


The TSV original files is 600GB and generated 40k files of 15-25MB.

y

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* October-07-15 3:18 PM
*To:* Younes Naguib; 'user@spark.apache.org
<mailto:user@spark.apache.org>'
*Subject:* Re: Parquet file size

Why do you want larger files? Doesn't the result Parquet file
contain all the data in the original TSV file?

Cheng

On 10/7/15 11:07 AM, Younes Naguib wrote:

Hi,

I’m reading a large tsv file, and creating parquet files
using sparksql:

insert overwrite

table tbl partition(year, month, day)

Select  from tbl_tsv;

This works nicely, but generates small parquet files (15MB).

I wanted to generate larger files, any idea how to address this?

*Thanks,*

*Younes Naguib*

Triton Digital | 1440 Ste-Catherine W., Suite 1200 |
Montreal, QC  H3G 1R8

Tel.: +1 514 448 4037 x2688
<tel:%2B1%20514%20448%204037%20x2688> | Tel.: +1 866 448 4037
x2688 <tel:%2B1%20866%20448%204037%20x2688> |
younes.nag...@tritondigital.com<mailto:younes.nag...@streamtheworld.com>








Re: Parquet file size

2015-10-07 Thread Cheng Lian
The reason why so many small files are generated should probably be the 
fact that you are inserting into a partitioned table with three 
partition columns.


If you want a large Parquet files, you may try to either avoid using 
partitioned table, or using less partition columns (e.g., only year, 
without month and day).


Cheng

So you want to dump all data into a single large Parquet file?

On 10/7/15 1:55 PM, Younes Naguib wrote:


The TSV original files is 600GB and generated 40k files of 15-25MB.

y

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* October-07-15 3:18 PM
*To:* Younes Naguib; 'user@spark.apache.org'
*Subject:* Re: Parquet file size

Why do you want larger files? Doesn't the result Parquet file contain 
all the data in the original TSV file?


Cheng

On 10/7/15 11:07 AM, Younes Naguib wrote:

Hi,

I’m reading a large tsv file, and creating parquet files using
sparksql:

insert overwrite

table tbl partition(year, month, day)

Select  from tbl_tsv;

This works nicely, but generates small parquet files (15MB).

I wanted to generate larger files, any idea how to address this?

*Thanks,*

*Younes Naguib*

Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC 
H3G 1R8


Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 |
younes.nag...@tritondigital.com<mailto:younes.nag...@streamtheworld.com>





Re: Parquet file size

2015-10-07 Thread Cheng Lian
Why do you want larger files? Doesn't the result Parquet file contain 
all the data in the original TSV file?


Cheng

On 10/7/15 11:07 AM, Younes Naguib wrote:


Hi,

I’m reading a large tsv file, and creating parquet files using sparksql:

insert overwrite

table tbl partition(year, month, day)

Select  from tbl_tsv;

This works nicely, but generates small parquet files (15MB).

I wanted to generate larger files, any idea how to address this?

*Thanks,*

*Younes Naguib***

Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8

Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | 
younes.nag...@tritondigital.com






Re: Metadata in Parquet

2015-09-30 Thread Cheng Lian
Unfortunately this isn't supported at the moment 
https://issues.apache.org/jira/browse/SPARK-10803


Cheng

On 9/30/15 10:54 AM, Philip Weaver wrote:
Hi, I am using org.apache.spark.sql.types.Metadata to store extra 
information along with each of my fields. I'd also like to store 
Metadata for the entire DataFrame, not attached to any specific field. 
Is this supported?


- Philip




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian
I guess you're probably using Spark 1.5? Spark SQL does support schema 
merging, but we disabled it by default since 1.5 because it introduces 
extra performance costs (it's turned on by default in 1.4 and 1.3).


You may enable schema merging via either the Parquet data source 
specific option "mergeSchema":


  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng

On 9/28/15 3:45 PM, jordan.tho...@accenture.com wrote:


Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try the sequence 
notation.  It doesn’t seem to have the desired effect.  Maybe I should 
say that each one of these files has a different schema.  When I use 
that call, I’m not ending up with a data frame with columns from all 
of the files taken together, but just one of them.  I’m tracing 
through the code trying to understand exactly what is happening with 
the Seq[String] call.  Maybe you know?  Is it trying to do some kind 
of schema merging?


Also, it seems that even if I could get it to work, it would require 
some parsing of the resulting schemas to find the invalid files.  I 
would like to capture these errors on read.


The parquet files  currently average about 60 MB in size, with min 
about 40 MB and max about 500 or so.  I could coalesce, but they do 
correspond to logical entities and there are a number of use-case 
specific reasons to keep them separate.


Thanks,

Jordan

*From:*Michael Armbrust [mailto:mich...@databricks.com]
*Sent:* Monday, September 28, 2015 4:02 PM
*To:* Thomas, Jordan 
*Cc:* user 
*Subject:* Re: Performance when iterating over many parquet files

Another note: for best performance you are going to want your parquet 
files to be pretty big (100s of mb).  You could coalesce them and 
write them out for more efficient repeat querying.


On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust 
> wrote:


sqlContext.read.parquet


takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() // this
works but using spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 1:35 PM, jwthomas
>
wrote:

We are working with use cases where we need to do batch
processing on a large
number (hundreds of thousands) of Parquet files.  The
processing is quite
similar per file.  There are a many aggregates that are very
SQL-friendly
(computing averages, maxima, minima, aggregations on single
columns with
some selection criteria).  There are also some processing that
is more
advanced time-series processing (continuous wavelet transforms
and the
like).  This all seems like a good use case for Spark.

But I'm having performance problems.  Let's take a look at
something very
simple, which simply checks whether the parquet files are
readable.

Code that seems natural but doesn't work:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes =
parquetFiles.map(x => (x,
Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x
=> x._1)

My understanding is that this doesn't work because sqlContext
can't be used
inside of a transformation like "map" (or inside an action). 
That it only

makes sense in the driver.  Thus, it becomes a null reference
in the above
code, so all reads fail.

Code that works:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes =
parquetFiles.collect().map(x =>
(x,
Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x
=> x._1)


This works because the collect() means that everything happens
back on the
driver.  So the sqlContext object makes sense and everything
works fine.

But it is slow.  I'm using yarn-client mode on a 6-node
cluster with 17
executors, 40 GB ram on driver, 19GB on executors.  And it
takes about 1
minute to execute for 100 parquet files. Which is too long. 
Recall we need

to do this across hundreds of thousands of files.

I realize it is possible to parallelize after the read:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val 

Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian

Also, you may find more details in the programming guide:

- 
http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
- 
http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration


Cheng

On 9/28/15 3:54 PM, Cheng Lian wrote:
I guess you're probably using Spark 1.5? Spark SQL does support schema 
merging, but we disabled it by default since 1.5 because it introduces 
extra performance costs (it's turned on by default in 1.4 and 1.3).


You may enable schema merging via either the Parquet data source 
specific option "mergeSchema":


  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng

On 9/28/15 3:45 PM, jordan.tho...@accenture.com wrote:


Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try the sequence 
notation.  It doesn’t seem to have the desired effect.  Maybe I 
should say that each one of these files has a different schema.  When 
I use that call, I’m not ending up with a data frame with columns 
from all of the files taken together, but just one of them.  I’m 
tracing through the code trying to understand exactly what is 
happening with the Seq[String] call.  Maybe you know?  Is it trying 
to do some kind of schema merging?


Also, it seems that even if I could get it to work, it would require 
some parsing of the resulting schemas to find the invalid files.  I 
would like to capture these errors on read.


The parquet files  currently average about 60 MB in size, with min 
about 40 MB and max about 500 or so.  I could coalesce, but they do 
correspond to logical entities and there are a number of use-case 
specific reasons to keep them separate.


Thanks,

Jordan

*From:*Michael Armbrust [mailto:mich...@databricks.com]
*Sent:* Monday, September 28, 2015 4:02 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>
*Cc:* user <user@spark.apache.org>
*Subject:* Re: Performance when iterating over many parquet files

Another note: for best performance you are going to want your parquet 
files to be pretty big (100s of mb).  You could coalesce them and 
write them out for more efficient repeat querying.


On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust 
<mich...@databricks.com <mailto:mich...@databricks.com>> wrote:


sqlContext.read.parquet

<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L258>
takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() // this
works but using spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 1:35 PM, jwthomas
<jordan.tho...@accenture.com
<mailto:jordan.tho...@accenture.com>> wrote:

We are working with use cases where we need to do batch
processing on a large
number (hundreds of thousands) of Parquet files.  The
processing is quite
similar per file.  There are a many aggregates that are very
SQL-friendly
(computing averages, maxima, minima, aggregations on single
columns with
some selection criteria).  There are also some processing
that is more
advanced time-series processing (continuous wavelet
transforms and the
like).  This all seems like a good use case for Spark.

But I'm having performance problems.  Let's take a look at
something very
simple, which simply checks whether the parquet files are
readable.

Code that seems natural but doesn't work:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes =
parquetFiles.map(x => (x,
Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x =>
x._1)

My understanding is that this doesn't work because sqlContext
can't be used
inside of a transformation like "map" (or inside an action). 
That it only

makes sense in the driver.  Thus, it becomes a null reference
in the above
code, so all reads fail.

Code that works:

import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes =
parquetFiles.collect().map(x =>
(x,
Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x =>
x._1)


This works because the collect() means that everything
happens back on the
driver.  So the sqlContext object makes sense and everything
works fine.

But it is slow.  I'm using yarn-client mode on a 6-node
cluster with 17
  

Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian
Could you please elaborate on what kind of errors are those bad Parquet 
files causing? In what ways are they miswritten?


Cheng

On 9/28/15 4:03 PM, jordan.tho...@accenture.com wrote:


Ah, yes, I see that it has been turned off now, that’s why it wasn’t 
working.  Thank you, this is helpful!  The problem now is to filter 
out bad (miswritten) Parquet files, as they are causing this operation 
to fail.


Any suggestions on detecting them quickly and easily?

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 5:56 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>; mich...@databricks.com
*Cc:* user@spark.apache.org
*Subject:* Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

- 
http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
- 
http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration


Cheng

On 9/28/15 3:54 PM, Cheng Lian wrote:

I guess you're probably using Spark 1.5? Spark SQL does support
schema merging, but we disabled it by default since 1.5 because it
introduces extra performance costs (it's turned on by default in
1.4 and 1.3).

You may enable schema merging via either the Parquet data source
specific option "mergeSchema":

  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng

On 9/28/15 3:45 PM, jordan.tho...@accenture.com
<mailto:jordan.tho...@accenture.com> wrote:

Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try the
sequence notation.  It doesn’t seem to have the desired
effect.  Maybe I should say that each one of these files has a
different schema.  When I use that call, I’m not ending up
with a data frame with columns from all of the files taken
together, but just one of them.  I’m tracing through the code
trying to understand exactly what is happening with the
Seq[String] call. Maybe you know?  Is it trying to do some
kind of schema merging?

Also, it seems that even if I could get it to work, it would
require some parsing of the resulting schemas to find the
invalid files.  I would like to capture these errors on read.

The parquet files  currently average about 60 MB in size, with
min about 40 MB and max about 500 or so.  I could coalesce,
but they do correspond to logical entities and there are a
number of use-case specific reasons to keep them separate.

Thanks,

Jordan

*From:*Michael Armbrust [mailto:mich...@databricks.com]
*Sent:* Monday, September 28, 2015 4:02 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>
<mailto:jordan.tho...@accenture.com>
*Cc:* user <user@spark.apache.org> <mailto:user@spark.apache.org>
*Subject:* Re: Performance when iterating over many parquet files

Another note: for best performance you are going to want your
parquet files to be pretty big (100s of mb).  You could
coalesce them and write them out for more efficient repeat
querying.

On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust
<mich...@databricks.com <mailto:mich...@databricks.com>> wrote:

sqlContext.read.parquet

<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L258>
takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() //
this works but using spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 1:35 PM, jwthomas
<jordan.tho...@accenture.com
<mailto:jordan.tho...@accenture.com>> wrote:

We are working with use cases where we need to do
batch processing on a large
number (hundreds of thousands) of Parquet files.  The
processing is quite
similar per file.  There are a many aggregates that
are very SQL-friendly
(computing averages, maxima, minima, aggregations on
single columns with
some selection criteria).  There are also some
processing that is more
advanced time-series processing (continuous wavelet
transforms and the
like).  This all seems like a good use case for Spark.

But I'm having performance problems. Let's take a look
  

Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian

Probably parquet-tools and the following shell script helps:

root="/path/to/your/data"

for f in `find $root -type f -name "*.parquet"`; do
  parquet-schema $f 2&>1 /dev/null
  if [ ! -z $? ]; then echo $f; fi
end

This should print out all non-Parquet files under $root. Please refer to 
this link to see how to build and install parquet-tools 
https://github.com/Parquet/parquet-mr/issues/321


Cheng

On 9/28/15 4:29 PM, jordan.tho...@accenture.com wrote:


Sure. FI would just like to remove ones that fail the basic checks 
done by the Parquet readFooters function, in that their length is 
wrong or magic number is incorrect, which throws exceptions in the 
read method.


Errors like:

java.io.IOException: Could not read footer: 
java.lang.RuntimeException: data.parquet is not a Parquet file (too small)


and

java.io.IOException: Could not read footer: 
java.lang.RuntimeException: data.parquet is not a Parquet file. 
expected magic number at tail [80, 65, 82, 49] but found [54, -4, -10, 
-102]


Backstory: We had a migration from one cluster to another and 
thousands of incomplete files were transferred.  In addition, they are 
still trying to handle the kickouts from their write methods (they are 
converting from a proprietary binary format).  A lot of that is 
captured in the Splunk logs and will improve in the coming weeks as 
they continue tuning, but on the reading end I want to make sure we’re 
in sync about what needs to be re-converted and re-transferred.


Thanks,

Jordan

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 6:15 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>; mich...@databricks.com
*Cc:* user@spark.apache.org
*Subject:* Re: Performance when iterating over many parquet files

Could you please elaborate on what kind of errors are those bad 
Parquet files causing? In what ways are they miswritten?


Cheng

On 9/28/15 4:03 PM, jordan.tho...@accenture.com 
<mailto:jordan.tho...@accenture.com> wrote:


Ah, yes, I see that it has been turned off now, that’s why it
wasn’t working.  Thank you, this is helpful!  The problem now is
to filter out bad (miswritten) Parquet files, as they are causing
this operation to fail.

Any suggestions on detecting them quickly and easily?

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 5:56 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>
<mailto:jordan.tho...@accenture.com>; mich...@databricks.com
<mailto:mich...@databricks.com>
*Cc:* user@spark.apache.org <mailto:user@spark.apache.org>
*Subject:* Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

-

http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
-
http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

Cheng

On 9/28/15 3:54 PM, Cheng Lian wrote:

I guess you're probably using Spark 1.5? Spark SQL does
support schema merging, but we disabled it by default since
1.5 because it introduces extra performance costs (it's turned
on by default in 1.4 and 1.3).

You may enable schema merging via either the Parquet data
source specific option "mergeSchema":

  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng

On 9/28/15 3:45 PM, jordan.tho...@accenture.com
<mailto:jordan.tho...@accenture.com> wrote:

Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try
the sequence notation.  It doesn’t seem to have the
desired effect.  Maybe I should say that each one of these
files has a different schema.  When I use that call, I’m
not ending up with a data frame with columns from all of
the files taken together, but just one of them.  I’m
tracing through the code trying to understand exactly what
is happening with the Seq[String] call.  Maybe you know? 
Is it trying to do some kind of schema merging?


Also, it seems that even if I could get it to work, it
would require some parsing of the resulting schemas to
find the invalid files.  I would like to capture these
errors on read.

The parquet files  currently average about 60 MB in size,
with min about 40 MB and max about 500 or so.  I could
coalesce, but they do correspond to logical entities and
there are a number of use-case specific r

Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian
Oh I see, then probably this one, basically the parallel Spark version 
of my last script, using ParquetFileReader:


import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.format.converter.ParquetMetadataConverter

val badFiles = sc.parallelize(paths).mapPartitions { iterator =>
  val conf = new Configuration()
  iterator.filter { path =>
Try(ParquetFileReader.readFooter(
  conf, path, ParquetMetadataConverter.SKIP_ROW_GROUPS)).isFailure
  }
}.collect()


Cheng

On 9/28/15 4:48 PM, jordan.tho...@accenture.com wrote:


Ok thanks.  Actually we ran something very similar this weekend.  It 
works but is very slow.


The Spark method I included in my original post is about 5-6 times 
faster.  Just wondering if there is something even faster than that.  
I see this as being a recurring problem over the next few months.


*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 6:46 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>; mich...@databricks.com
*Cc:* user@spark.apache.org
*Subject:* Re: Performance when iterating over many parquet files

Probably parquet-tools and the following shell script helps:

root="/path/to/your/data"

for f in `find $root -type f -name "*.parquet"`; do
  parquet-schema $f 2&>1 /dev/null
  if [ ! -z $? ]; then echo $f; fi
end

This should print out all non-Parquet files under $root. Please refer 
to this link to see how to build and install parquet-tools 
https://github.com/Parquet/parquet-mr/issues/321


Cheng

On 9/28/15 4:29 PM, jordan.tho...@accenture.com 
<mailto:jordan.tho...@accenture.com> wrote:


Sure. FI would just like to remove ones that fail the basic checks
done by the Parquet readFooters function, in that their length is
wrong or magic number is incorrect, which throws exceptions in the
read method.

Errors like:

java.io.IOException: Could not read footer:
java.lang.RuntimeException: data.parquet is not a Parquet file
(too small)

and

java.io.IOException: Could not read footer:
java.lang.RuntimeException: data.parquet is not a Parquet file.
expected magic number at tail [80, 65, 82, 49] but found [54, -4,
-10, -102]

Backstory: We had a migration from one cluster to another and
thousands of incomplete files were transferred.  In addition, they
are still trying to handle the kickouts from their write methods
(they are converting from a proprietary binary format).  A lot of
that is captured in the Splunk logs and will improve in the coming
weeks as they continue tuning, but on the reading end I want to
make sure we’re in sync about what needs to be re-converted and
re-transferred.

    Thanks,

Jordan

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 6:15 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>
<mailto:jordan.tho...@accenture.com>; mich...@databricks.com
<mailto:mich...@databricks.com>
*Cc:* user@spark.apache.org <mailto:user@spark.apache.org>
*Subject:* Re: Performance when iterating over many parquet files

Could you please elaborate on what kind of errors are those bad
Parquet files causing? In what ways are they miswritten?

Cheng

On 9/28/15 4:03 PM, jordan.tho...@accenture.com
<mailto:jordan.tho...@accenture.com> wrote:

Ah, yes, I see that it has been turned off now, that’s why it
wasn’t working.  Thank you, this is helpful!  The problem now
is to filter out bad (miswritten) Parquet files, as they are
causing this operation to fail.

    Any suggestions on detecting them quickly and easily?

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 5:56 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>
<mailto:jordan.tho...@accenture.com>; mich...@databricks.com
<mailto:mich...@databricks.com>
*Cc:* user@spark.apache.org <mailto:user@spark.apache.org>
*Subject:* Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

-

http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
-

http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

Cheng

On 9/28/15 3:54 PM, Cheng Lian wrote:

I guess you're probably using Spark 1.5? Spark SQL does
support schema merging, but we disabled it by default
since 1.5 because it introduces extra performance costs
(it's turned on by default in 1.4 and 1.3).

You may enable schema merging via either the Parquet data
source specific option "mergeSchema":

  sqlContext.read.option("mergeSchema", "tr

Re: Performance when iterating over many parquet files

2015-09-28 Thread Cheng Lian
Oh I see, then probably this one, basically the parallel Spark version 
of my last script, using ParquetFileReader:


import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.format.converter.ParquetMetadataConverter

val badFiles = sc.parallelize(paths).mapPartitions { iterator =>
  val conf = new Configuration()
  iterator.filter { path =>
Try(ParquetFileReader.readFooter(
  conf, path, ParquetMetadataConverter.SKIP_ROW_GROUPS)).isFailure
  }
}.collect()


Cheng

On 9/28/15 4:48 PM, jordan.tho...@accenture.com wrote:


Ok thanks.  Actually we ran something very similar this weekend.  It 
works but is very slow.


The Spark method I included in my original post is about 5-6 times 
faster.  Just wondering if there is something even faster than that.  
I see this as being a recurring problem over the next few months.


*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 6:46 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>; mich...@databricks.com
*Cc:* user@spark.apache.org
*Subject:* Re: Performance when iterating over many parquet files

Probably parquet-tools and the following shell script helps:

root="/path/to/your/data"

for f in `find $root -type f -name "*.parquet"`; do
  parquet-schema $f 2&>1 /dev/null
  if [ ! -z $? ]; then echo $f; fi
end

This should print out all non-Parquet files under $root. Please refer 
to this link to see how to build and install parquet-tools 
https://github.com/Parquet/parquet-mr/issues/321


Cheng

On 9/28/15 4:29 PM, jordan.tho...@accenture.com 
<mailto:jordan.tho...@accenture.com> wrote:


Sure. FI would just like to remove ones that fail the basic checks
done by the Parquet readFooters function, in that their length is
wrong or magic number is incorrect, which throws exceptions in the
read method.

Errors like:

java.io.IOException: Could not read footer:
java.lang.RuntimeException: data.parquet is not a Parquet file
(too small)

and

java.io.IOException: Could not read footer:
java.lang.RuntimeException: data.parquet is not a Parquet file.
expected magic number at tail [80, 65, 82, 49] but found [54, -4,
-10, -102]

Backstory: We had a migration from one cluster to another and
thousands of incomplete files were transferred.  In addition, they
are still trying to handle the kickouts from their write methods
(they are converting from a proprietary binary format).  A lot of
that is captured in the Splunk logs and will improve in the coming
weeks as they continue tuning, but on the reading end I want to
make sure we’re in sync about what needs to be re-converted and
re-transferred.

    Thanks,

Jordan

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 6:15 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>
<mailto:jordan.tho...@accenture.com>; mich...@databricks.com
<mailto:mich...@databricks.com>
*Cc:* user@spark.apache.org <mailto:user@spark.apache.org>
*Subject:* Re: Performance when iterating over many parquet files

Could you please elaborate on what kind of errors are those bad
Parquet files causing? In what ways are they miswritten?

Cheng

On 9/28/15 4:03 PM, jordan.tho...@accenture.com
<mailto:jordan.tho...@accenture.com> wrote:

Ah, yes, I see that it has been turned off now, that’s why it
wasn’t working.  Thank you, this is helpful!  The problem now
is to filter out bad (miswritten) Parquet files, as they are
causing this operation to fail.

    Any suggestions on detecting them quickly and easily?

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* Monday, September 28, 2015 5:56 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>
<mailto:jordan.tho...@accenture.com>; mich...@databricks.com
<mailto:mich...@databricks.com>
*Cc:* user@spark.apache.org <mailto:user@spark.apache.org>
*Subject:* Re: Performance when iterating over many parquet files

Also, you may find more details in the programming guide:

-

http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
-

http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

Cheng

On 9/28/15 3:54 PM, Cheng Lian wrote:

I guess you're probably using Spark 1.5? Spark SQL does
support schema merging, but we disabled it by default
since 1.5 because it introduces extra performance costs
(it's turned on by default in 1.4 and 1.3).

You may enable schema merging via either the Parquet data
source specific option "mergeSchema":

  sqlContext.read.option("mergeSchema", "tr

Re: Is this a Spark issue or Hive issue that Spark cannot read the string type data in the Parquet generated by Hive

2015-09-25 Thread Cheng Lian
Please set the the SQL option spark.sql.parquet.binaryAsString to true 
when reading Parquet files containing strings generated by Hive.


This is actually a bug of parquet-hive. When generating Parquet schema 
for a string field, Parquet requires a "UTF8" annotation, something like:


message hive_schema {
  ...
  optional binary column2 (UTF8);
  ...
}

but parquet-hive fails to add it, and produces:

message hive_schema {
  ...
  optional binary column2;
  ...
}

Thus binary fields and string fields are made indistinguishable.

Interestingly, there's another bug in parquet-thrift, which always adds 
UTF8 annotation to all binary fields :)


Cheng

On 9/25/15 2:03 PM, java8964 wrote:

Hi, Spark Users:

I have a problem related to Spark cannot recognize the string type in 
the Parquet schema generated by Hive.


Version of all components:

Spark 1.3.1
Hive 0.12.0
Parquet 1.3.2

I generated a detail low level table in the Parquet format using 
MapReduce java code. This table can be read in the Hive and Spark 
without any issue.


Now I create a Hive aggregation table like following:

create external table T (
column1 bigint,
*column2 string,*
..
)
partitioned by (dt string)
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat"
location '/hdfs_location'

Then the table is populated in the Hive by:

set hive.exec.compress.output=true;
set parquet.compression=snappy;

insert into table T partition(dt='2015-09-23')
select
.
from Detail_Table
group by

After this, we can query the T table in the Hive without issue.

But if I try to use it in the Spark 1.3.1 like following:

import org.apache.spark.sql.SQLContext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val v_event_cnt=sqlContext.parquetFile("/hdfs_location/dt=2015-09-23")

scala> v_event_cnt.printSchema
root
 |-- column1: long (nullable = true)
* |-- column2: binary (nullable = true)*
 |-- 
 |-- dt: string (nullable = true)

The Spark will recognize column2 as binary type, instead of string 
type in this case, but in the Hive, it works fine.
So this bring an issue that in the Spark, the data will be dumped as 
"[B@e353d68". To use it in the Spark, I have to cast it as string, to 
get the correct value out of it.


I wonder this mismatch type of Parquet file could be caused by which 
part? Is the Hive not generate the correct Parquet file with schema, 
or Spark in fact cannot recognize it due to problem in it.


Is there a way I can do either Hive or Spark to make this parquet 
schema correctly on both ends?


Thanks

Yong




Re: Is this a Spark issue or Hive issue that Spark cannot read the string type data in the Parquet generated by Hive

2015-09-25 Thread Cheng Lian
BTW, just checked that this bug should have been fixed since Hive 
0.14.0. So the SQL option I mentioned is mostly used for reading legacy 
Parquet files generated by older versions of Hive.


Cheng

On 9/25/15 2:42 PM, Cheng Lian wrote:
Please set the the SQL option spark.sql.parquet.binaryAsString to true 
when reading Parquet files containing strings generated by Hive.


This is actually a bug of parquet-hive. When generating Parquet schema 
for a string field, Parquet requires a "UTF8" annotation, something like:


message hive_schema {
  ...
  optional binary column2 (UTF8);
  ...
}

but parquet-hive fails to add it, and produces:

message hive_schema {
  ...
  optional binary column2;
  ...
}

Thus binary fields and string fields are made indistinguishable.

Interestingly, there's another bug in parquet-thrift, which always 
adds UTF8 annotation to all binary fields :)


Cheng

On 9/25/15 2:03 PM, java8964 wrote:

Hi, Spark Users:

I have a problem related to Spark cannot recognize the string type in 
the Parquet schema generated by Hive.


Version of all components:

Spark 1.3.1
Hive 0.12.0
Parquet 1.3.2

I generated a detail low level table in the Parquet format using 
MapReduce java code. This table can be read in the Hive and Spark 
without any issue.


Now I create a Hive aggregation table like following:

create external table T (
column1 bigint,
*column2 string,*
..
)
partitioned by (dt string)
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat"
location '/hdfs_location'

Then the table is populated in the Hive by:

set hive.exec.compress.output=true;
set parquet.compression=snappy;

insert into table T partition(dt='2015-09-23')
select
.
from Detail_Table
group by

After this, we can query the T table in the Hive without issue.

But if I try to use it in the Spark 1.3.1 like following:

import org.apache.spark.sql.SQLContext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val v_event_cnt=sqlContext.parquetFile("/hdfs_location/dt=2015-09-23")

scala> v_event_cnt.printSchema
root
 |-- column1: long (nullable = true)
* |-- column2: binary (nullable = true)*
 |-- 
 |-- dt: string (nullable = true)

The Spark will recognize column2 as binary type, instead of string 
type in this case, but in the Hive, it works fine.
So this bring an issue that in the Spark, the data will be dumped as 
"[B@e353d68". To use it in the Spark, I have to cast it as string, to 
get the correct value out of it.


I wonder this mismatch type of Parquet file could be caused by which 
part? Is the Hive not generate the correct Parquet file with schema, 
or Spark in fact cannot recognize it due to problem in it.


Is there a way I can do either Hive or Spark to make this parquet 
schema correctly on both ends?


Thanks

Yong






Re: Using Map and Basic Operators yield java.lang.ClassCastException (Parquet + Hive + Spark SQL 1.5.0 + Thrift)

2015-09-25 Thread Cheng Lian
Thanks for the clarification. Could you please provide the full schema 
of your table and query plans of your query? You may obtain them via:


hiveContext.table("your_table").printSchema()

and

hiveContext.sql("your query").explain(extended = true)

You also mentioned "Thrift" in the subject, did you mean the Thrift 
server? Or maybe the Parquet files were written by parquet-thrift? Could 
you please also provide the full Parquet schema of the Parquet files you 
were reading? You may get the schema using the parquet-schema CLI tool:


$ parquet-schema 

Here you can find instructions of how to build parquet-tools, just in 
case you don't have it at hand: 
https://github.com/Parquet/parquet-mr/issues/321


If you don't want to bother building parquet-tools (which can be 
sometimes troublesome), you may also try this in spark-shell:


hiveContext.table("your_table").head(1)

Then you should be able to find the Parquet schema from Spark driver log 
(please make sure you enable INFO log).



Cheng

On 9/24/15 7:59 PM, Dominic Ricard wrote:

No, those were just examples on how maps can look like. In my case, the 
key-value is either there or not in the form of the later:

{"key1":{"key2":"value"}}

If key1 is present, then it will contain a tuple of key2:value, value being a 
'int'

I guess, after some testing, that my problem is on how casting a Map value to 
the primitives Float and Double are handled. Handling INT is all good but float 
and double are causing the exception.

Thanks.

Dominic Ricard
Triton Digital

-Original Message-
From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Thursday, September 24, 2015 5:47 PM
To: Dominic Ricard; user@spark.apache.org
Subject: Re: Using Map and Basic Operators yield java.lang.ClassCastException 
(Parquet + Hive + Spark SQL 1.5.0 + Thrift)



On 9/24/15 11:34 AM, Dominic Ricard wrote:

Hi,
 I stumbled on the following today. We have Parquet files that
expose a column in a Map format. This is very convenient as we have
data parts that can vary in time. Not knowing what the data will be,
we simply split it in tuples and insert it as a map inside 1 column.

Retrieving the data is very easy. Syntax looks like this:

select column.key1.key2 from table;

Column value look like this:
{}
{"key1":"value"}
{"key1":{"key2":"value"}}

Do you mean that the value type of the map may also vary? The 2nd record has a 
string value, while the 3rd one has another nested map as its value. This isn't 
supported in Spark SQL.

But when trying to do basic operators on that column, I get the
following
error:

query: select (column.key1.key2 / 30 < 1) from table

ERROR processing query/statement. Error Code: 0, SQL state:
TStatus(statusCode:ERROR_STATUS,
infoMessages:[*org.apache.hive.service.cli.HiveSQLException:java.lang.ClassCastException:
org.apache.spark.sql.types.NullType$ cannot be cast to
org.apache.spark.sql.types.MapType:26:25,
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:
runInternal:SparkExecuteStatementOperation.scala:259,
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:
run:SparkExecuteStatementOperation.scala:144,
org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementIn
ternal:HiveSessionImpl.java:388,
org.apache.hive.service.cli.session.HiveSessionImpl:executeStatement:H
iveSessionImpl.java:369,
sun.reflect.GeneratedMethodAccessor115:invoke::-1,
sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccess
orImpl.java:43, java.lang.reflect.Method:invoke:Method.java:497,
org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessio
nProxy.java:78,
org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSe
ssionProxy.java:36,
org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSession
Proxy.java:63,
java.security.AccessController:doPrivileged:AccessController.java:-2,
javax.security.auth.Subject:doAs:Subject.java:422,
org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformat
ion.java:1628,
org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessio
nProxy.java:59, com.sun.proxy.$Proxy39:executeStatement::-1,
org.apache.hive.service.cli.CLIService:executeStatement:CLIService.jav
a:261,
org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:T
hriftCLIService.java:486,
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatem
ent:getResult:TCLIService.java:1313,
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatem
ent:getResult:TCLIService.java:1298,
org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39,
org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39,
org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddr
essProcessor.java:56,
org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPo
olServer.java:28

Re: spark + parquet + schema name and metadata

2015-09-24 Thread Cheng Lian
Thanks for the feedback, just filed 
https://issues.apache.org/jira/browse/SPARK-10803 to track this issue.


Cheng

On 9/24/15 4:25 AM, Borisa Zivkovic wrote:

Hi,

your suggestion works nicely.. I was able to attach metadata to 
columns and read that metadata from spark and by using ParquetFileReader
It would be nice if we had a way to manipulate parquet metadata 
directly from DataFrames though.


regards

On Wed, 23 Sep 2015 at 09:25 Borisa Zivkovic 
<borisha.zivko...@gmail.com <mailto:borisha.zivko...@gmail.com>> wrote:


Hi,

thanks a lot for this! I will try it out to see if this works ok.

I am planning to use "stable" metadata - so those will be same
across all parquet files inside directory hierarchy...



On Tue, 22 Sep 2015 at 18:54 Cheng Lian <lian.cs@gmail.com
<mailto:lian.cs@gmail.com>> wrote:

Michael reminded me that although we don't support direct
manipulation over Parquet metadata, you can still save/query
metadata to/from Parquet via DataFrame per-column metadata.
For example:

import sqlContext.implicits._
import org.apache.spark.sql.types.MetadataBuilder

val path = "file:///tmp/parquet/meta"

// Saving metadata
val meta = new MetadataBuilder().putString("appVersion",
"1.0.2").build()
sqlContext.range(10).select($"id".as("id",
meta)).coalesce(1).write.mode("overwrite").parquet(path)

// Querying metadata

sqlContext.read.parquet(path).schema("id").metadata.getString("appVersion")

The metadata is saved together with Spark SQL schema as a JSON
string. For example, the above code generates the following
Parquet metadata (inspected with parquet-meta):

file:

file:/private/tmp/parquet/meta/part-r-0-77cb2237-e6a8-4cb6-a452-ae205ba7b660.gz.parquet
creator: parquet-mr version 1.6.0
extra: org.apache.spark.sql.parquet.row.metadata =

{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,*"metadata":{"appVersion":"1.0.2"}*}]}


Cheng


On 9/22/15 9:37 AM, Cheng Lian wrote:

I see, this makes sense. We should probably add this in Spark
SQL.

However, there's one corner case to note about user-defined
Parquet metadata. When committing a write job,
ParquetOutputCommitter writes Parquet summary files
(_metadata and _common_metadata), and user-defined key-value
metadata written in all Parquet part-files get merged here.
The problem is that, if a single key is associated with
multiple values, Parquet doesn't know how to reconcile this
situation, and simply gives up writing summary files. This
can be particular annoying for appending. In general, users
should avoid storing "unstable" values like timestamps as
Parquet metadata.

Cheng

On 9/22/15 1:58 AM, Borisa Zivkovic wrote:

thanks for answer.

I need this in order to be able to track schema metadata.

basically when I create parquet files from Spark I want to
be able to "tag" them in some way (giving the schema
appropriate name or attaching some key/values) and then it
is fairly easy to get basic metadata about parquet files
when processing and discovering those later on.

On Mon, 21 Sep 2015 at 18:17 Cheng Lian
<lian.cs@gmail.com <mailto:lian.cs@gmail.com>> wrote:

Currently Spark SQL doesn't support customizing schema
name and
metadata. May I know why these two matters in your use
case? Some
Parquet data models, like parquet-avro, do support it,
while some others
don't (e.g. parquet-hive).

Cheng

On 9/21/15 7:13 AM, Borisa Zivkovic wrote:
> Hi,
>
> I am trying to figure out how to write parquet
metadata when
> persisting DataFrames to parquet using Spark (1.4.1)
>
> I could not find a way to change schema name (which
seems to be
> hardcoded to root) and also how to add data to
key/value metadata in
> parquet footer.
>
>
org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData
>
> org.apache.parquet.schema.Type#getName
>
> thanks
>
>









Re: spark + parquet + schema name and metadata

2015-09-22 Thread Cheng Lian

I see, this makes sense. We should probably add this in Spark SQL.

However, there's one corner case to note about user-defined Parquet 
metadata. When committing a write job, ParquetOutputCommitter writes 
Parquet summary files (_metadata and _common_metadata), and user-defined 
key-value metadata written in all Parquet part-files get merged here. 
The problem is that, if a single key is associated with multiple values, 
Parquet doesn't know how to reconcile this situation, and simply gives 
up writing summary files. This can be particular annoying for appending. 
In general, users should avoid storing "unstable" values like timestamps 
as Parquet metadata.


Cheng

On 9/22/15 1:58 AM, Borisa Zivkovic wrote:

thanks for answer.

I need this in order to be able to track schema metadata.

basically when I create parquet files from Spark I want to be able to 
"tag" them in some way (giving the schema appropriate name or 
attaching some key/values) and then it is fairly easy to get basic 
metadata about parquet files when processing and discovering those 
later on.


On Mon, 21 Sep 2015 at 18:17 Cheng Lian <lian.cs@gmail.com 
<mailto:lian.cs@gmail.com>> wrote:


Currently Spark SQL doesn't support customizing schema name and
metadata. May I know why these two matters in your use case? Some
Parquet data models, like parquet-avro, do support it, while some
others
don't (e.g. parquet-hive).

Cheng

On 9/21/15 7:13 AM, Borisa Zivkovic wrote:
> Hi,
>
> I am trying to figure out how to write parquet metadata when
> persisting DataFrames to parquet using Spark (1.4.1)
>
> I could not find a way to change schema name (which seems to be
> hardcoded to root) and also how to add data to key/value metadata in
> parquet footer.
>
> org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData
>
> org.apache.parquet.schema.Type#getName
>
> thanks
>
>





Re: spark + parquet + schema name and metadata

2015-09-21 Thread Cheng Lian
Currently Spark SQL doesn't support customizing schema name and 
metadata. May I know why these two matters in your use case? Some 
Parquet data models, like parquet-avro, do support it, while some others 
don't (e.g. parquet-hive).


Cheng

On 9/21/15 7:13 AM, Borisa Zivkovic wrote:

Hi,

I am trying to figure out how to write parquet metadata when 
persisting DataFrames to parquet using Spark (1.4.1)


I could not find a way to change schema name (which seems to be 
hardcoded to root) and also how to add data to key/value metadata in 
parquet footer.


org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData

org.apache.parquet.schema.Type#getName

thanks





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: parquet error

2015-09-18 Thread Cheng Lian
Not sure what's happening here, but I guess it's probably a dependency 
version issue. Could you please give vanilla Apache Spark a try to see 
whether its a CDH specific issue or not?


Cheng

On 9/17/15 11:44 PM, Chengi Liu wrote:

Hi,
  I did some digging..
I believe the error is caused by jets3t jar.
Essentially these lines

locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
'java/net/URI', 'org/apache/hadoop/conf/Configuration', 
'org/apache/hadoop/fs/s3/S3Credentials', 
'org/jets3t/service/security/AWSCredentials' }


stack: { 
'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
uninitialized 32, uninitialized 32, 
'org/jets3t/service/security/AWSCredentials' }



But I am not sure how to fix this. Is it jar version issue?

I am using cloudera cdh 5.2 distro and have created the symlink of 
jets3t jar from hadoop/lib to spark/lib (which I believe is of version 
0.9ish version)?




On Wed, Sep 16, 2015 at 4:59 PM, Chengi Liu > wrote:


Hi,
  I have a spark cluster setup and I am trying to write the data
to s3 but in parquet format.
Here is what I am doing

df = sqlContext.load('test', 'com.databricks.spark.avro')

df.saveAsParquetFile("s3n://test")

But I get some nasty error:

Py4JJavaError: An error occurred while calling o29.saveAsParquetFile.

: org.apache.spark.SparkException: Job aborted.

at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:166)

at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:139)

at

org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)

at

org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)

at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)

at

org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)

at

org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)

at

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)

at

org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)

at
org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:336)

at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)

at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)

at
org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:1508)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

at py4j.Gateway.invoke(Gateway.java:259)

at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:207)

at java.lang.Thread.run(Thread.java:744)

Caused by: org.apache.spark.SparkException: Job aborted due to
stage failure: Task 3 in stage 0.0 failed 4 times, most recent
failure: Lost task 3.3 in stage 0.0 (TID 12, srv-110-29.720.rdio):
org.apache.spark.SparkException: Task failed while writing rows.

at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org

$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:191)

at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)

at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)

at org.apache.spark.scheduler.Task.run(Task.scala:70)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.VerifyError: Bad type on operand stack

Exception Details:

  Location:


org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.initialize(Ljava/net/URI;Lorg/apache/hadoop/conf/Configuration;)V
@38: 

Re: Spark-shell throws Hive error when SQLContext.parquetFile, v1.3

2015-09-10 Thread Cheng Lian
If you don't need to interact with Hive, you may compile Spark without 
using the -Phive flag to eliminate Hive dependencies. In this way, the 
sqlContext instance in Spark shell will be of type SQLContext instead of 
HiveContext.


The reason behind the Hive metastore error is probably due to Hive 
misconfiguration.


Cheng

On 9/10/15 6:02 PM, Petr Novak wrote:

Hello,

sqlContext.parquetFile(dir)

throws exception " Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient"


The strange thing is that on the second attempt to open the file it is 
successful:


try {
sqlContext.parquetFile(dir)
  } catch {
case e: Exception => sqlContext.parquetFile(dir)
}

What should I do to make my script to run flawlessly in spark-shell 
when opening parquetFiles. It is probably missing some dependency. Or 
how should I write the code because this double attempt is awfull and 
I don't need HiveMetaStoreClient, I just need to open parquet file.


Many thanks for any idea,
Petr





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to read compressed parquet file

2015-09-09 Thread Cheng Lian
You need to use "har://" instead of "hdfs://" to read HAR files. Just 
tested against Spark 1.5, and it works as expected.


Cheng

On 9/9/15 3:29 PM, 李铖 wrote:
I think too many parquet files may be affect reading capability,so I 
use hadoop archive to combine them,but 
 sql_context.read.parquet(output_path) does not work on the file.

How to fix it ,please help me.
:)




Re: Split content into multiple Parquet files

2015-09-08 Thread Cheng Lian

In Spark 1.4 and 1.5, you can do something like this:

df.write.partitionBy("key").parquet("/datasink/output-parquets")

BTW, I'm curious about how did you do it without partitionBy using 
saveAsHadoopFile?


Cheng

On 9/8/15 2:34 PM, Adrien Mogenet wrote:

Hi there,

We've spent several hours to split our input data into several parquet 
files (or several folders, i.e. 
/datasink/output-parquets//foobar.parquet), based on a 
low-cardinality key. This works very well with a when using 
saveAsHadoopFile, but we can't achieve a similar thing with Parquet files.


The only working solution so far is to persist the RDD and then loop 
over it N times to write N files. That does not look acceptable...


Do you guys have any suggestion to do such an operation?

--

*Adrien Mogenet*
Head of Backend/Infrastructure
adrien.moge...@contentsquare.com 
(+33)6.59.16.64.22
http://www.contentsquare.com 
50, avenue Montaigne - 75008 Paris




Re: Parquet Array Support Broken?

2015-09-08 Thread Cheng Lian
Yeah, this is a typical Parquet interoperability issue due to 
unfortunate historical reasons. Hive (actually parquet-hive) gives the 
following schema for array:


message m0 {
  optional group f (LIST) {
repeated group bag {
  optional int32 array_element;
}
}
}

while Spark SQL gives

message m1 {
  optional group f (LIST) {
repeated group bag {
  optional int32 array;
}
  }
}

So Spark 1.4 couldn't find the expected field "array" in the target 
Parquet file.  As Ruslan suggested, Spark 1.5 addresses this issue 
properly and is able to read Parquet files generated by most, if not 
all, Parquet data models out there.


You may find more details about Parquet interoperability in this post if 
you are interested 
https://www.mail-archive.com/user@spark.apache.org/msg35663.html


Cheng

On 9/8/15 6:19 AM, Alex Kozlov wrote:

Thank you - it works if the file is created in Spark

On Mon, Sep 7, 2015 at 3:06 PM, Ruslan Dautkhanov 
<dautkha...@gmail.com <mailto:dautkha...@gmail.com>> wrote:


    Read response from Cheng Lian <lian.cs@gmail.com
<mailto:lian.cs@gmail.com>> on Aug/27th - it looks the same
problem.

Workarounds
1. write that parquet file in Spark;
2. upgrade to Spark 1.5.

--
Ruslan Dautkhanov

On Mon, Sep 7, 2015 at 3:52 PM, Alex Kozlov <ale...@gmail.com
<mailto:ale...@gmail.com>> wrote:

No, it was created in Hive by CTAS, but any help is
appreciated...

On Mon, Sep 7, 2015 at 2:51 PM, Ruslan Dautkhanov
<dautkha...@gmail.com <mailto:dautkha...@gmail.com>> wrote:

That parquet table wasn't created in Spark, is it?

There was a recent discussion on this list that complex
data types in Spark prior to 1.5 often incompatible with
Hive for example, if I remember correctly.

On Mon, Sep 7, 2015, 2:57 PM Alex Kozlov <ale...@gmail.com
<mailto:ale...@gmail.com>> wrote:

I am trying to read an (array typed) parquet file in
spark-shell (Spark 1.4.1 with Hadoop 2.6):

{code}
$ bin/spark-shell
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See
http://logging.apache.org/log4j/1.2/faq.html#noconfig
for more info.
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/09/07 13:45:22 INFO SecurityManager: Changing view
acls to: hivedata
15/09/07 13:45:22 INFO SecurityManager: Changing
modify acls to: hivedata
15/09/07 13:45:22 INFO SecurityManager:
SecurityManager: authentication disabled; ui acls
disabled; users with view permissions: Set(hivedata);
users with modify permissions: Set(hivedata)
15/09/07 13:45:23 INFO HttpServer: Starting HTTP Server
15/09/07 13:45:23 INFO Utils: Successfully started
service 'HTTP class server' on port 43731.
Welcome to
  __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\ version 1.4.1
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit
Server VM, Java 1.8.0)
Type in expressions to have them evaluated.
Type :help for more information.
15/09/07 13:45:26 INFO SparkContext: Running Spark
version 1.4.1
15/09/07 13:45:26 INFO SecurityManager: Changing view
acls to: hivedata
15/09/07 13:45:26 INFO SecurityManager: Changing
modify acls to: hivedata
15/09/07 13:45:26 INFO SecurityManager:
SecurityManager: authentication disabled; ui acls
disabled; users with view permissions: Set(hivedata);
users with modify permissions: Set(hivedata)
15/09/07 13:45:27 INFO Slf4jLogger: Slf4jLogger started
15/09/07 13:45:27 INFO Remoting: Starting remoting
15/09/07 13:45:27 INFO Remoting: Remoting started;
listening on addresses
:[akka.tcp://sparkDriver@10.10.30.52:46083
<http://sparkDriver@10.10.30.52:46083>]
15/09/07 13:45:27 INFO Utils: Successfully started
service 'sparkDriver' on port 46083.
15/09/07 13:45:27 INFO SparkEnv: Registering
MapOutputTracker
15/09/07 13:45:27 INFO SparkEnv: Registeri

Re: Parquet partitioning for unique identifier

2015-09-04 Thread Cheng Lian
What version of Spark were you using?  Have you tried increasing 
--executor-memory?


This schema looks pretty normal. And Parquet stores all keys of a map in 
a single column.


Cheng

On 9/4/15 4:00 PM, Kohki Nishio wrote:

The stack trace is this
java.lang.OutOfMemoryError: Java heap space
at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at 
parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)
at 
parquet.column.values.rle.RunLengthBitPackingHybridEncoder.(RunLengthBitPackingHybridEncoder.java:125)
at 
parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter.(RunLengthBitPackingHybridValuesWriter.java:36)
at 
parquet.column.ParquetProperties.getColumnDescriptorValuesWriter(ParquetProperties.java:61)
at parquet.column.impl.ColumnWriterImpl.(ColumnWriterImpl.java:72)
at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:68)
at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at 
parquet.hadoop.InternalParquetRecordWriter.(InternalParquetRecordWriter.java:94)
at 
parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:64)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)

It looks like this
https://issues.apache.org/jira/browse/PARQUET-222

Here's the schema I have, I don't think this is such different schema, 
... maybe use of Map is causing this. Is it trying to register all of 
keys of a map as a column ?


root
 |-- intId: integer (nullable = false)
 |-- uniqueId: string (nullable = true)
 |-- date1: string (nullable = true)
 |-- date2: string (nullable = true)
 |-- date3: string (nullable = true)
 |-- type: integer (nullable = false)
 |-- cat: string (nullable = true)
 |-- subCat: string (nullable = true)
 |-- unit: map (nullable = true)
 ||-- key: string
 ||-- value: double (valueContainsNull = false)
 |-- attr: map (nullable = true)
 ||-- key: string
 ||-- value: string (valueContainsNull = true)
 |-- price: map (nullable = true)
 ||-- key: string
 ||-- value: double (valueContainsNull = false)
 |-- imp1: map (nullable = true)
 ||-- key: string
 ||-- value: double (valueContainsNull = false)
 |-- imp2: map (nullable = true)
 ||-- key: string
 ||-- value: double (valueContainsNull = false)
 |-- imp3: map (nullable = true)
 ||-- key: string
 ||-- value: double (valueContainsNull = false)



On Thu, Sep 3, 2015 at 11:27 PM, Cheng Lian <lian.cs@gmail.com 
<mailto:lian.cs@gmail.com>> wrote:


Could you please provide the full stack track of the OOM
exception? Another common case of Parquet OOM is super wide
tables, say hundred or thousands of columns. And in this case, the
number of rows is mostly irrelevant.

Cheng


On 9/4/15 1:24 AM, Kohki Nishio wrote:

let's say I have a data like htis

   ID  |   Some1   |  Some2| Some3   | 
A1 | kdsfajfsa | dsafsdafa | fdsfafa  |
A2 | dfsfafasd | 23jfdsjkj | 980dfs   |
A3 | 99989df   | jksdljas  | 48dsaas  |
   ..
Z00..  | fdsafdsfa | fdsdafdas | 89sdaff  |

My understanding is that if I give the column 'ID' to use for
partition, it's going to generate a file per entry since it's
unique, no ? Using Json, I create 1000 files separated as
specified in parallelize parameter. But json is large and a bit
slow I'd like to try Parquet to see what happens.

On Wed, Sep 2, 2015 at 11:15 PM, Adrien Mogenet
<adrien.moge...@contentsquare.com
<mailto:adrien.moge...@contentsquare.com>> wrote:

Any code / Parquet schema to provide? I'm not sure to
understand which step fails right there...

On 3 September 2015 at 04:12, Raghavendra Pandey
<raghavendra.pan...@gmail.com
<mailto:raghavendra.pan...@gmail.com>> wrote:

Did you specify partitioning column while saving data..

On Sep 3, 2015 5:41 AM, "Kohki Nishio"
<tarop...@gmail.com <mailto:tarop...@gmail.com>> wrote:

Hello experts,

I have a huge json file (> 40G) and trying to use
Parquet as a file format. Each entry has a unique
identifier but other than that, it doesn't have 'well
balanced value' column to partition it. Right now it
just throws OOM and couldn't figure out what to do
 

Re: Parquet partitioning for unique identifier

2015-09-04 Thread Cheng Lian
Could you please provide the full stack track of the OOM exception? 
Another common case of Parquet OOM is super wide tables, say hundred or 
thousands of columns. And in this case, the number of rows is mostly 
irrelevant.


Cheng

On 9/4/15 1:24 AM, Kohki Nishio wrote:

let's say I have a data like htis

   ID  |   Some1   |  Some2| Some3   | 
A1 | kdsfajfsa | dsafsdafa | fdsfafa  |
A2 | dfsfafasd | 23jfdsjkj | 980dfs   |
A3 | 99989df   | jksdljas  | 48dsaas  |
   ..
Z00..  | fdsafdsfa | fdsdafdas | 89sdaff  |

My understanding is that if I give the column 'ID' to use for 
partition, it's going to generate a file per entry since it's unique, 
no ? Using Json, I create 1000 files separated as specified in 
parallelize parameter. But json is large and a bit slow I'd like to 
try Parquet to see what happens.


On Wed, Sep 2, 2015 at 11:15 PM, Adrien Mogenet 
> wrote:


Any code / Parquet schema to provide? I'm not sure to understand
which step fails right there...

On 3 September 2015 at 04:12, Raghavendra Pandey
> wrote:

Did you specify partitioning column while saving data..

On Sep 3, 2015 5:41 AM, "Kohki Nishio" > wrote:

Hello experts,

I have a huge json file (> 40G) and trying to use Parquet
as a file format. Each entry has a unique identifier but
other than that, it doesn't have 'well balanced value'
column to partition it. Right now it just throws OOM and
couldn't figure out what to do with it.

It would be ideal if I could provide a partitioner based
on the unique identifier value like computing its hash
value or something.  One of the option would be to produce
a hash value and add it as a separate column, but it
doesn't sound right to me. Is there any other ways I can try ?

Regards,
-- 
Kohki Nishio





-- 


*Adrien Mogenet*
Head of Backend/Infrastructure
adrien.moge...@contentsquare.com

(+33)6.59.16.64.22 
http://www.contentsquare.com 
50, avenue Montaigne - 75008 Paris




--
Kohki Nishio




Re: Schema From parquet file

2015-09-01 Thread Cheng Lian

What exactly do you mean by "get schema from a parquet file"?

- If you are trying to inspect Parquet files, parquet-tools can be 
pretty neat: https://github.com/Parquet/parquet-mr/issues/321
- If you are trying to get Parquet schema of Parquet MessageType, you 
may resort to readFooterX() and readAllFootersX() utility methods in 
ParquetFileReader
- If you are trying to get Spark SQL StructType schema out of a Parquet 
file, then the most convenient way is to load it as a DataFrame. 
However, "loading" it as a DataFrame doesn't mean we scan the whole 
file. Instead, we only try to do minimum metadata discovery work like 
schema discovery and schema merging.


Cheng

On 9/1/15 7:07 PM, Hafiz Mujadid wrote:

Hi all!

Is there any way to get schema from a parquet file without loading into
dataframe?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Schema-From-parquet-file-tp24535.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Group by specific key and save as parquet

2015-09-01 Thread Cheng Lian

Starting from Spark 1.4, you can do this via dynamic partitioning:

sqlContext.table("trade").write.partitionBy("date").parquet("/tmp/path")

Cheng

On 9/1/15 8:27 AM, gtinside wrote:

Hi ,

I have a set of data, I need to group by specific key and then save as
parquet. Refer to the code snippet below. I am querying trade and then
grouping by date

val df = sqlContext.sql("SELECT * FROM trade")
val dfSchema = df.schema
val partitionKeyIndex = dfSchema.fieldNames.seq.indexOf("date")
//group by date
val groupedByPartitionKey = df.rdd.groupBy { row =>
row.getString(partitionKeyIndex) }
val failure = groupedByPartitionKey.map(row => {
val rowDF = sqlContext.createDataFrame(sc.parallelize(row._2.toSeq),
dfSchema)
val fileName = config.getTempFileName(row._1)
try {
 val dest = new Path(fileName)
 if(DefaultFileSystem.getFS.exists(dest)) {
 DefaultFileSystem.getFS.delete(dest, true)
  }
  rowDF.saveAsParquetFile(fileName)
 } catch {
case e : Throwable => {
 logError("Failed to save parquet
file")
}
failure = true
}

This code doesn't work well because of NestedRDD , what is the best way to
solve this problem?

Regards,
Gaurav



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Group-by-specific-key-and-save-as-parquet-tp24527.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.3.1 saveAsParquetFile hangs on app exit

2015-08-26 Thread Cheng Lian

Could you please show jstack result of the hanged process? Thanks!

Cheng

On 8/26/15 10:46 PM, cingram wrote:

I have a simple test that is hanging when using s3a with spark 1.3.1. Is
there something I need to do to cleanup the S3A file system? The write to S3
appears to have worked but this job hangs in the spark-shell and using
spark-submit. Any help would be greatly appreciated. TIA.

import sqlContext.implicits._
import com.datastax.spark.connector._
case class LU(userid: String, timestamp: Long, lat: Double, lon: Double)
val uid =testuser
val lue = sc.cassandraTable[LU](test, foo).where(userid=?, uid).toDF
lue.saveAsParquetFile(s3a://twc-scratch/craig_lues)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-saveAsParquetFile-hangs-on-app-exit-tp24460.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to overwrite partition when writing Parquet?

2015-08-20 Thread Cheng Lian
You can apply a filter first to filter out data of needed dates and then 
append them.


Cheng

On 8/20/15 4:59 PM, Hemant Bhanawat wrote:
How can I overwrite only a given partition or manually remove a 
partition before writing?


I don't know if (and I don't think)  there is a way to do that using a 
mode. But doesn't manually deleting the directory of a particular 
partition help? For directory structure, check this out...


http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery


On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman r...@totango.com 
mailto:r...@totango.com wrote:


Hello,

I have a DataFrame, with a date column which I want to use as a
partition.
Each day I want to write the data for the same date in Parquet,
and then read a dataframe for a date range.

I'm using:

myDataframe.write().partitionBy(date).mode(SaveMode.Overwrite).parquet(parquetDir);

If I use SaveMode.Append, then writing data for the same partition
adds the same data there again.
If I use SaveMode.Overwrite, then writing data for a single
partition removes all the data for all partitions.

How can I overwrite only a given partition or manually remove a
partition before writing?

Many thanks!
Romi K.






Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-12 Thread Cheng Lian

Hi Philip,

What do you mean by saying still partitioned the same way? If you are 
trying to to save the partition columns encoded in partition directories 
directly into Parquet files, and put all Parquet part-files into a 
single directory without creating any intermediate sub-directories, then 
I'd expect it to be much faster. This is at least true for file systems 
like S3, haven't tested listing contents of super wide and flat 
directories (i.e. those containing no sub-directories but a lot of files).


And, as Hao suggested, sorting columns in which you are interested can 
give better performance on read path because of Parquet specific 
optimizations like filter push-down. However, I think in your case, the 
time spent in reading Parquet files is not the bottleneck, according to 
our previous discussion.


Cheng

On 8/12/15 8:56 AM, Cheng, Hao wrote:


Definitely worth to try. And you can sort the record before writing 
out, and then you will get the parquet files without overlapping keys.


Let us know if that helps.

Hao

*From:*Philip Weaver [mailto:philip.wea...@gmail.com]
*Sent:* Wednesday, August 12, 2015 4:05 AM
*To:* Cheng Lian
*Cc:* user
*Subject:* Re: Very high latency to initialize a DataFrame from 
partitioned parquet database.


Do you think it might be faster to put all the files in one directory 
but still partitioned the same way? I don't actually need to filter on 
the values of the partition keys, but I need to rely on there be no 
overlap in the value of the keys between any two parquet files.


On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver philip.wea...@gmail.com 
mailto:philip.wea...@gmail.com wrote:


Thanks, I also confirmed that the partition discovery is slow by
writing a non-Spark application that uses the parquet library
directly to load that partitions.

It's so slow that my colleague's Python application can read the
entire contents of all the parquet data files faster than my
application can even discover the partitions!

On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian lian.cs@gmail.com
mailto:lian.cs@gmail.com wrote:

However, it's weird that the partition discovery job only
spawns 2 tasks. It should use the default parallelism, which
is probably 8 according to the logs of the next Parquet
reading job. Partition discovery is already done in a
distributed manner via a Spark job. But the parallelism is
mysteriously low...

Cheng

On 8/7/15 3:32 PM, Cheng Lian wrote:

Hi Philip,

Thanks for providing the log file. It seems that most of
the time are spent on partition discovery. The code
snippet you provided actually issues two jobs. The first
one is for listing the input directories to find out all
leaf directories (and this actually requires listing all
leaf files, because we can only assert that a directory is
a leaf one when it contains no sub-directories). Then
partition information is extracted from leaf directory
paths. This process starts at:

10:51:44 INFO sources.HadoopFsRelation: Listing leaf
files and directories in parallel under:
file:/home/pweaver/work/parquet/day=20150225, …

and ends at:

10:52:31 INFO scheduler.TaskSchedulerImpl: Removed
TaskSet 0.0, whose tasks have all completed, from pool

The actual tasks execution time is about 36s:

10:51:54 INFO scheduler.TaskSetManager: Starting task
0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL,
3087 bytes)
…
10:52:30 INFO scheduler.TaskSetManager: Finished task
0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2)

You mentioned that your dataset has about 40,000+
partitions, so there are a lot of leaf directories and
files out there. My guess is that the local file system
spent lots of time listing FileStatus-es of all these files.

I also noticed that Mesos job scheduling takes more time
then expected. It is probably because this is the first
Spark job executed in the application, and the system is
not warmed up yet. For example, there’s a 6s gap between
these two adjacent lines:

10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task
set 0.0 with 2 tasks
10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos
task 0 is now TASK_RUNNING

The 2nd Spark job is the real Parquet reading job, and
this one actually finishes pretty quickly, only 3s (note
that the Mesos job scheduling latency is also included):

10:52:32 INFO scheduler.DAGScheduler: Got job 1

Re: Parquet without hadoop: Possible?

2015-08-12 Thread Cheng Lian
One thing to note is that, it would be good to add explicit file system 
scheme to the output path (i.e. file:///var/... instead of 
/var/...), esp. when you do have HDFS running. Because in this case 
the data might be written to HDFS rather than your local file system if 
Spark found Hadoop configuration files when starting the application.


Cheng

On 8/11/15 11:12 PM, saif.a.ell...@wellsfargo.com wrote:


I confirm that it works,

I was just having this issue: 
https://issues.apache.org/jira/browse/SPARK-8450


Saif

*From:*Ellafi, Saif A.
*Sent:* Tuesday, August 11, 2015 12:01 PM
*To:* Ellafi, Saif A.; deanwamp...@gmail.com
*Cc:* user@spark.apache.org
*Subject:* RE: Parquet without hadoop: Possible?

Sorry, I provided bad information. This example worked fine with 
reduced parallelism.


It seems my problem have to do with something specific with the real 
data frame at reading point.


Saif

*From:*saif.a.ell...@wellsfargo.com 
mailto:saif.a.ell...@wellsfargo.com 
[mailto:saif.a.ell...@wellsfargo.com]

*Sent:* Tuesday, August 11, 2015 11:49 AM
*To:* deanwamp...@gmail.com mailto:deanwamp...@gmail.com
*Cc:* user@spark.apache.org mailto:user@spark.apache.org
*Subject:* RE: Parquet without hadoop: Possible?

I am launching my spark-shell

spark-1.4.1-bin-hadoop2.6/bin/spark-shell

15/08/11 09:43:32 INFO SparkILoop: Created sql context (with Hive 
support)..


SQL context available as sqlContext.

scala val data = sc.parallelize(Array(2,3,5,7,2,3,6,1)).toDF

scala data.write.parquet(/var/ data/Saif/pq)

Then I get a million errors:

15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]

15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]

15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]

15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)


at 
parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)


at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)


at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)


at 
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)


at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)


at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)


at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)


at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)

at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)


at 
parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)


at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)

at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)


at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)


at 
org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:83)


at 
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)


at 
org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470)


at 
org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360)


at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172)


at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)


at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)


at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)

at org.apache.spark.scheduler.Task.run(Task.scala:70)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at 

Re: Merge metadata error when appending to parquet table

2015-08-09 Thread Cheng Lian
The conflicting metadata values warning is a known issue 
https://issues.apache.org/jira/browse/PARQUET-194


The option parquet.enable.summary-metadata is a Hadoop option rather 
than a Spark option, so you need to either add it to your Hadoop 
configuration file(s) or add it via `sparkContext.hadoopConfiguration` 
before starting your job.


Cheng

On 8/9/15 8:57 PM, Krzysztof Zarzycki wrote:
Besides finding to this problem, I think I can workaround at least the 
WARNING message by overwriting parquet variable: 
parquet.enable.summary-metadata
That according to this PARQUET-107 
https://issues.apache.org/jira/browse/PARQUET-107 ticket  can be 
used to disable writing summary file which is an issue here.

How can I set this variable? I tried
sql.setConf(parquet.enable.summary-metadata, false)
sql.sql(SET parquet.enable.summary-metadata=false)
As well as: spark-submit --conf parquet.enable.summary-metadata=false

But neither helped. Anyone can help? Of course the original problem 
stays open.

Thanks!
Krzysiek

2015-08-09 14:19 GMT+02:00 Krzysztof Zarzycki k.zarzy...@gmail.com 
mailto:k.zarzy...@gmail.com:


Hi there,
I have a problem with a spark streaming job  running on Spark
1.4.1, that appends to parquet table.

My job receives json strings and creates JsonRdd out of it. The
jsons might come in different shape as most of the fields are
optional. But they never have conflicting schemas.
Next, for each (non-empty) Rdd I'm saving it to parquet files,
using append to existing table:

jsonRdd.write.mode(SaveMode.Append).parquet(dataDirPath)

Unfortunately I'm hitting now an issue on every append of conflict:

Aug 9, 2015 7:58:03 AM WARNING:
parquet.hadoop.ParquetOutputCommitter: could not write summary
file for hdfs://example.com:8020/tmp/parquet
http://example.com:8020/tmp/parquet
java.lang.RuntimeException: could not merge metadata: key
org.apache.spark.sql.parquet.row.metadata has conflicting values:
[{...schema1...}, {...schema2...} ]

The schemas are very similar, some attributes may be missing
comparing to other, but for sure they are not conflicting. They
are pretty lengthy, but I compared them with diff and ensured,
that there are no conflicts.

Even with this WARNING, the write actually succeeds, I'm able to
read this data.  But on every batch, there is yet another schema
in the displayed conflicting values array. I would like the job
to run forever, so I can't even ignore this warning because it
will probably end with OOM.

Do you know what might be the reason of this error/ warning? How
to overcome this? Maybe it is a Spark bug/regression? I saw
tickets like SPARK-6010
https://issues.apache.org/jira/browse/SPARK-6010, but they seem
to be fixed in 1.3.0 (I'm using 1.4.1).


Thanks for any help!
Krzysiek







Re: Spark failed while trying to read parquet files

2015-08-07 Thread Cheng Lian
It doesn't seem to be Parquet 1.7.0 since the package name isn't under 
org.apache.parquet (1.7.0 is the first official Apache release of 
Parquet). The version you were using is probably Parquet 1.6.0rc3 
according to the line number information: 
https://github.com/apache/parquet-mr/blob/parquet-1.6.0rc3/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java#L249 
And you're hitting PARQUET-136, which has been fixed in (the real) 
Parquet 1.7.0 https://issues.apache.org/jira/browse/PARQUET-136


Cheng

On 8/8/15 6:20 AM, Jerrick Hoang wrote:

Hi all,

I have a partitioned parquet table (very small table with only 2 
partitions). The version of spark is 1.4.1, parquet version is 1.7.0. 
I applied this patch to spark [SPARK-7743] so I assume that spark can 
read parquet files normally, however, I'm getting this when trying to 
do a simple `select count(*) from table`,


```org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 29 in stage 44.0 failed 15 times, most recent failure: Lost task 
29.14 in stage 44.0: java.lang.NullPointerException
at 
parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249)
at 
parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543)
at 
parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520)

at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426)
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:381)
at 
parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:155)
at 
parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
at 
org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.init(SqlNewHadoopRDD.scala:153)
at 
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124)
at 
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)```

Has anybody seen this before?

Thanks




Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-07 Thread Cheng Lian
However, it's weird that the partition discovery job only spawns 2 
tasks. It should use the default parallelism, which is probably 8 
according to the logs of the next Parquet reading job. Partition 
discovery is already done in a distributed manner via a Spark job. But 
the parallelism is mysteriously low...


Cheng

On 8/7/15 3:32 PM, Cheng Lian wrote:


Hi Philip,

Thanks for providing the log file. It seems that most of the time are 
spent on partition discovery. The code snippet you provided actually 
issues two jobs. The first one is for listing the input directories to 
find out all leaf directories (and this actually requires listing all 
leaf files, because we can only assert that a directory is a leaf one 
when it contains no sub-directories). Then partition information is 
extracted from leaf directory paths. This process starts at:


10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and
directories in parallel under:
file:/home/pweaver/work/parquet/day=20150225, …

and ends at:

10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
whose tasks have all completed, from pool

The actual tasks execution time is about 36s:

10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes)
…
10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
0.0 (TID 0) in 36107 ms on lindevspark5 (1/2)

You mentioned that your dataset has about 40,000+ partitions, so there 
are a lot of leaf directories and files out there. My guess is that 
the local file system spent lots of time listing FileStatus-es of all 
these files.


I also noticed that Mesos job scheduling takes more time then 
expected. It is probably because this is the first Spark job executed 
in the application, and the system is not warmed up yet. For example, 
there’s a 6s gap between these two adjacent lines:


10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
with 2 tasks
10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is
now TASK_RUNNING

The 2nd Spark job is the real Parquet reading job, and this one 
actually finishes pretty quickly, only 3s (note that the Mesos job 
scheduling latency is also included):


10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at
App.scala:182) with 8 output partitions
…
10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes)
10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes)
…
10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage
1.0 (TID 8) in 1527 ms on lindevspark4 (6/8)
10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage
1.0 (TID 6) in 1533 ms on lindevspark4 (7/8)
10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage
1.0 (TID 9) in 2886 ms on lindevspark5 (8/8)

That might be the reason why you observed that the C parquet library 
you mentioned (is it parquet-cpp?) is an order of magnitude faster?


Cheng

On 8/7/15 2:02 AM, Philip Weaver wrote:

With DEBUG, the log output was over 10MB, so I opted for just INFO 
output. The (sanitized) log is attached.


The driver is essentially this code:

info(A)

val t = System.currentTimeMillis
val df = sqlContext.read.parquet(dir).select(...).cache

val elapsed = System.currentTimeMillis - t
info(sInit time: ${elapsed} ms)

We've also observed that it is very slow to read the contents of the 
parquet files. My colleague wrote a PySpark application that gets the 
list of files, parallelizes it, maps across it and reads each file 
manually using a C parquet library, and aggregates manually in the 
loop. Ignoring the 1-2 minute initialization cost, compared to a 
Spark SQL or DataFrame query in Scala, his is an order of magnitude 
faster. Since he is parallelizing the work through Spark, and that 
isn't causing any performance issues, it seems to be a problem with 
the parquet reader. I may try to do what he did to construct a 
DataFrame manually, and see if I can query it with Spark SQL with 
reasonable performance.


- Philip


On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Would you mind to provide the driver log?


On 8/6/15 3:58 PM, Philip Weaver wrote:

I built spark from the v1.5.0-snapshot-20150803 tag in the repo
and tried again.

The initialization time is about 1 minute now, which is still
pretty terrible.

On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver
philip.wea...@gmail.com wrote:

Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian
lian.cs@gmail.com wrote:

We've fixed

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-07 Thread Cheng Lian

Hi Philip,

Thanks for providing the log file. It seems that most of the time are 
spent on partition discovery. The code snippet you provided actually 
issues two jobs. The first one is for listing the input directories to 
find out all leaf directories (and this actually requires listing all 
leaf files, because we can only assert that a directory is a leaf one 
when it contains no sub-directories). Then partition information is 
extracted from leaf directory paths. This process starts at:


   10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and
   directories in parallel under:
   file:/home/pweaver/work/parquet/day=20150225, …

and ends at:

   10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
   whose tasks have all completed, from pool

The actual tasks execution time is about 36s:

   10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
   0.0 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes)
   …
   10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
   0.0 (TID 0) in 36107 ms on lindevspark5 (1/2)

You mentioned that your dataset has about 40,000+ partitions, so there 
are a lot of leaf directories and files out there. My guess is that the 
local file system spent lots of time listing FileStatus-es of all these 
files.


I also noticed that Mesos job scheduling takes more time then expected. 
It is probably because this is the first Spark job executed in the 
application, and the system is not warmed up yet. For example, there’s a 
6s gap between these two adjacent lines:


   10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with
   2 tasks
   10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now
   TASK_RUNNING

The 2nd Spark job is the real Parquet reading job, and this one actually 
finishes pretty quickly, only 3s (note that the Mesos job scheduling 
latency is also included):


   10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at
   App.scala:182) with 8 output partitions
   …
   10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
   1.0 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes)
   10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
   1.0 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes)
   10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
   1.0 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes)
   …
   10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage
   1.0 (TID 8) in 1527 ms on lindevspark4 (6/8)
   10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage
   1.0 (TID 6) in 1533 ms on lindevspark4 (7/8)
   10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage
   1.0 (TID 9) in 2886 ms on lindevspark5 (8/8)

That might be the reason why you observed that the C parquet library you 
mentioned (is it parquet-cpp?) is an order of magnitude faster?


Cheng

On 8/7/15 2:02 AM, Philip Weaver wrote:

With DEBUG, the log output was over 10MB, so I opted for just INFO 
output. The (sanitized) log is attached.


The driver is essentially this code:

info(A)

val t = System.currentTimeMillis
val df = sqlContext.read.parquet(dir).select(...).cache

val elapsed = System.currentTimeMillis - t
info(sInit time: ${elapsed} ms)

We've also observed that it is very slow to read the contents of the 
parquet files. My colleague wrote a PySpark application that gets the 
list of files, parallelizes it, maps across it and reads each file 
manually using a C parquet library, and aggregates manually in the 
loop. Ignoring the 1-2 minute initialization cost, compared to a Spark 
SQL or DataFrame query in Scala, his is an order of magnitude faster. 
Since he is parallelizing the work through Spark, and that isn't 
causing any performance issues, it seems to be a problem with the 
parquet reader. I may try to do what he did to construct a DataFrame 
manually, and see if I can query it with Spark SQL with reasonable 
performance.


- Philip


On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Would you mind to provide the driver log?


On 8/6/15 3:58 PM, Philip Weaver wrote:

I built spark from the v1.5.0-snapshot-20150803 tag in the repo
and tried again.

The initialization time is about 1 minute now, which is still
pretty terrible.

On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver
philip.wea...@gmail.com mailto:philip.wea...@gmail.com wrote:

Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian
lian.cs@gmail.com mailto:lian.cs@gmail.com wrote:

We've fixed this issue in 1.5
https://github.com/apache/spark/pull/7396

Could you give it a shot to see whether it helps in your
case? We've observed ~50x performance boost with schema
merging turned on.

Cheng


On 8/6/15 8:26 AM, Philip Weaver wrote:

I

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Cheng Lian

Would you mind to provide the driver log?

On 8/6/15 3:58 PM, Philip Weaver wrote:
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and 
tried again.


The initialization time is about 1 minute now, which is still pretty 
terrible.


On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com 
mailto:philip.wea...@gmail.com wrote:


Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com
mailto:lian.cs@gmail.com wrote:

We've fixed this issue in 1.5
https://github.com/apache/spark/pull/7396

Could you give it a shot to see whether it helps in your case?
We've observed ~50x performance boost with schema merging
turned on.

Cheng


On 8/6/15 8:26 AM, Philip Weaver wrote:

I have a parquet directory that was produced by partitioning
by two keys, e.g. like this:

df.write.partitionBy(a, b).parquet(asdf)


There are 35 values of a, and about 1100-1200 values of b
for each value of a, for a total of over 40,000 partitions.

Before running any transformations or actions on the
DataFrame, just initializing it like this takes *2 minutes*:

val df = sqlContext.read.parquet(asdf)


Is this normal? Is this because it is doing some bookeeping
to discover all the partitions? Is it perhaps having to merge
the schema from each partition? Would you expect it to get
better or worse if I subpartition by another key?

- Philip










Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Cheng Lian

We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396

Could you give it a shot to see whether it helps in your case? We've 
observed ~50x performance boost with schema merging turned on.


Cheng

On 8/6/15 8:26 AM, Philip Weaver wrote:
I have a parquet directory that was produced by partitioning by two 
keys, e.g. like this:


df.write.partitionBy(a, b).parquet(asdf)


There are 35 values of a, and about 1100-1200 values of b for each 
value of a, for a total of over 40,000 partitions.


Before running any transformations or actions on the DataFrame, just 
initializing it like this takes *2 minutes*:


val df = sqlContext.read.parquet(asdf)


Is this normal? Is this because it is doing some bookeeping to 
discover all the partitions? Is it perhaps having to merge the schema 
from each partition? Would you expect it to get better or worse if I 
subpartition by another key?


- Philip






Re: Safe to write to parquet at the same time?

2015-08-04 Thread Cheng Lian

It should be safe for Spark 1.4.1 and later versions.

Now Spark SQL adds a job-wise UUID to output file names to distinguish 
files written by different write jobs. So those two write jobs you gave 
should play well with each other. And the job committed later will 
generate a summary file for all Parquet data files it sees. (However, 
Parquet summary file generation may fail due to various reasons and is 
generally not reliable.)


Cheng

On 8/4/15 10:37 AM, Philip Weaver wrote:
I think this question applies regardless if I have two completely 
separate Spark jobs or tasks on different machines, or two cores that 
are part of the same task on the same machine.


If two jobs/tasks/cores/stages both save to the same parquet directory 
in parallel like this:


df1.write.mode(SaveMode.Append).partitionBy(a, b).parquet(dir)

df2.write.mode(SaveMode.Append).partitionBy(a, b).parquet(dir)


Will the result be equivalent to this?

df1.unionAll(df2).write.mode(SaveMode.Append).partitionBy(a,
b).parquet(dir)


What if we ensure that 'dir' does not exist first?

- Philip





Re: Parquet SaveMode.Append Trouble.

2015-08-04 Thread Cheng Lian

You need to import org.apache.spark.sql.SaveMode

Cheng

On 7/31/15 6:26 AM, satyajit vegesna wrote:

Hi,

I am new to using Spark and Parquet files,

Below is what i am trying to do, on Spark-shell,

val df = 
sqlContext.parquetFile(/data/LM/Parquet/Segment/pages/part-m-0.gz.parquet) 


Have also tried below command,

val 
df=sqlContext.read.format(parquet).load(/data/LM/Parquet/Segment/pages/part-m-0.gz.parquet)


Now i have an other existing parquet file to which i want to append 
this Parquet file data of df.


so i use,

df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet,parquet, 
SaveMode.Append )


also tried below command,

df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet, 
SaveMode.Append )



and it throws me below error,

console:26: error: not found: value SaveMode
df.save(/data/LM/Parquet/Segment/pages2/part-m-0.gz.parquet,parquet, 
SaveMode.Append )


Please help me, in case i am doing something wrong here.

Regards,
Satyajit.






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unexpected performance issues with Spark SQL using Parquet

2015-07-27 Thread Cheng Lian

Hi Jerry,

Thanks for the detailed report! I haven't investigate this issue in 
detail. But for the input size issue, I believe this is due to a 
limitation of HDFS API. It seems that Hadoop FileSystem adds the size of 
a whole block to the metrics even if you only touch a fraction of that 
block. In Parquet, all columns within a single row group are stored in a 
single HDFS block. This is probably the reason why you observed weird 
task input size. You may find more information in one of my earlier 
posts 
http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3c54c9899e.2030...@gmail.com%3E


For the performance issue, I don't have a proper explanation yet. Need 
further investigation.


Cheng

On 7/28/15 2:37 AM, Jerry Lam wrote:

Hi spark users and developers,

I have been trying to understand how Spark SQL works with Parquet for 
the couple of days. There is a performance problem that is unexpected 
using the column pruning. Here is a dummy example:


The parquet file has the 3 fields:

 |-- customer_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- mapping: map (nullable = true)
 ||-- key: string
 ||-- value: string (nullable = true)

Note that mapping is just a field with a lot of key value pairs.
I just created a parquet files with 1 billion entries with each entry 
having 10 key-value pairs in the mapping.


After I generate this parquet file, I generate another parquet without 
the mapping field that is:

 |-- customer_id: string (nullable = true)
 |-- type: string (nullable = true)

Let call the first parquet file data-with-mapping and the second 
parquet file data-without-mapping.


Then I ran a very simple query over two parquet files:
val df = sqlContext.read.parquet(path)
df.select(df(type)).count

The run on the data-with-mapping takes 34 seconds with the input size 
of 11.7 MB.
The run on the data-without-mapping takes 8 seconds with the input 
size of 7.6 MB.


They all ran on the same cluster with spark 1.4.1.
What bothers me the most is the input size because I supposed column 
pruning will only deserialize columns that are relevant to the query 
(in this case the field type) but for sure, it reads more data on the 
data-with-mapping than the data-without-mapping. The speed is 4x 
faster in the data-without-mapping that means that the more columns a 
parquet file has the slower it is even only a specific column is needed.


Anyone has an explanation on this? I was expecting both of them will 
finish approximate the same time.


Best Regards,

Jerry





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Partition parquet data by ENUM column

2015-07-24 Thread Cheng Lian
Could you please provide the full stack trace of the exception? And 
what's the Git commit hash of the version you were using?


Cheng

On 7/24/15 6:37 AM, Jerry Lam wrote:

Hi Cheng,

I ran into issues related to ENUM when I tried to use Filter push 
down. I'm using Spark 1.5.0 (which contains fixes for parquet filter 
push down). The exception is the following:


java.lang.IllegalArgumentException: FilterPredicate column: item's 
declared type (org.apache.parquet.io.api.Binary) does not match the 
schema found in file metadata. Column item is of type: 
FullTypeDescriptor(PrimitiveType: BINARY, OriginalType: ENUM)

Valid types for this column are: null

Is it because Spark does not recognize ENUM type in parquet?

Best Regards,

Jerry



On Wed, Jul 22, 2015 at 12:21 AM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


On 7/22/15 9:03 AM, Ankit wrote:


Thanks a lot Cheng. So it seems even in spark 1.3 and 1.4,
parquet ENUMs were treated as Strings in Spark SQL right? So does
this mean partitioning for enums already works in previous
versions too since they are just treated as strings?


It’s a little bit complicated. A Thrift/Avro/ProtoBuf |ENUM| value
is represented as a |BINARY| annotated with original type |ENUM|
in Parquet. For example, an optional |ENUM| field |e| is
translated to something like |optional BINARY e (ENUM)| in
Parquet. And the underlying data is always a UTF8 string of the
|ENUM| name. However, the Parquet original type |ENUM| is not
documented, thus Spark 1.3 and 1.4 doesn’t recognize the |ENUM|
annotation and just see it as a normal |BINARY|. (I didn’t even
notice the existence of |ENUM| in Parquet before PR #7048…)

On the other hand, Spark SQL has a boolean option named
|spark.sql.parquet.binaryAsString|. When this option is set to
|true|, all Parquet |BINARY| values are considered and converted
to UTF8 strings. The original purpose of this option is used to
work around a bug of Hive, which writes strings as plain Parquet
|BINARY| values without a proper |UTF8| annotation.

That said, by using
|sqlContext.setConf(spark.sql.parquet.binaryAsString, true)|
in Scala/Java/Python, or |SET
spark.sql.parquet.binaryAsString=true| in SQL, you may read those
|ENUM| values as plain UTF8 strings.



Also, is there a good way to verify that the partitioning is
being used? I tried explain like (where data is partitioned by
type column)

scala ev.filter(type = 'NON').explain
== Physical Plan ==
Filter (type#4849 = NON)
 PhysicalRDD [...cols..], MapPartitionsRDD[103] at map at
newParquet.scala:573

but that is the same even with non partitioned data.


Do you mean how to verify whether partition pruning is effective?
You should be able to see log lines like this:

15/07/22 11:14:35 INFO DataSourceStrategy: Selected 1
partitions out of 3, pruned 66.67% partitions.




On Tue, Jul 21, 2015 at 4:35 PM, Cheng Lian
lian.cs@gmail.com mailto:lian.cs@gmail.com wrote:

Parquet support for Thrift/Avro/ProtoBuf ENUM types are just
added to the master branch.
https://github.com/apache/spark/pull/7048

ENUM types are actually not in the Parquet format spec,
that's why we didn't have it at the first place. Basically,
ENUMs are always treated as UTF8 strings in Spark SQL now.

Cheng

On 7/22/15 3:41 AM, ankits wrote:

Hi, I am using a custom build of spark 1.4 with the
parquet dependency
upgraded to 1.7. I have thrift data encoded with parquet
that i want to
partition by a column of type ENUM. Spark programming
guide says partition
discovery is only supported for string and numeric
columns, so it seems
partition discovery won't work out of the box here.

Is there any workaround that will allow me to partition
by ENUMs? Will hive
partitioning help here? I am unfamiliar with Hive, and
how it plays into
parquet, thrift and spark so I would appreciate any
pointers in the right
direction. Thanks.



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Partition-parquet-data-by-ENUM-column-tp23939.html
Sent from the Apache Spark User List mailing list archive
at Nabble.com.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail:
user-h...@spark.apache.org
mailto:user-h...@spark.apache.org





​






Re: writing/reading multiple Parquet files: Failed to merge incompatible data types StringType and StructType

2015-07-24 Thread Cheng Lian
I don’t think this is a bug either. For an empty JSON array |[]|, 
there’s simply no way to infer its actual data type, and in this case 
Spark SQL just tries to fill in the “safest” type, which is 
|StringType|, because basically you can cast any data type to |StringType|.


In general, schema inference/evolution is a hard problem. Especially 
when schemaless data formats like JSON are used in the data pipeline, 
because type information gets lost along the way. Spark SQL tries to 
minimize the efforts, but it can’t do all the work for you if the type 
information of your data is intrinsically incomplete, or the schema is 
evolving in an incompatible way (required columns become optional, or 
changing data types of existing columns).


Cheng

On 7/24/15 12:23 AM, Akhil Das wrote:

Currently, the only way for you would be to create proper schema for 
the data. This is not a bug, but you could open a jira (since this 
would help others to solve their similar use-cases) for feature and in 
future version it could be implemented and included.


Thanks
Best Regards

On Tue, Jul 21, 2015 at 4:41 PM, Krzysztof Zarzycki 
k.zarzy...@gmail.com mailto:k.zarzy...@gmail.com wrote:


Hi everyone,
I have pretty challenging problem with reading/writing multiple
parquet files with streaming, but let me introduce my data flow:

I have a lot of json events streaming to my platform. All of them
have the same structure, but fields are mostly optional. Some of
the fields are arrays with structs inside.
These arrays can be empty, but sometimes they contain the data
(structs).

Now I'm using Spark SQL  Streaming to:
0. Stream data from Kafka

val stream = KafkaUtils.createDirectStream ...

1. read json data to json dataframe:

stream.foreachRDD( rdd = {

val dataRdd : RDD[String] = myTransform(rdd)

val jsonDf = sql.read.json(dataRdd)

2. write jsonDf to Parquet files:

if (firstRun) {

   jsonRdd.write.parquet(parquet-events)

   firstRun =false

}else { // the table has to exist to be able to append data.

   jsonRdd.write.mode(SaveMode.Append).parquet(parquet-events)

}

})

All the writing goes fine. It produces multiple files, each for
one batch of data.

But the problem is on reading the data:

scala val sqlContext = new org.apache.spark.sql.SQLContext(sc)

scala val events = sqlContext.read.parquet(parquet-events)

org.apache.spark.SparkException: Failed to merge incompatible
schemas StructType...

Caused by: org.apache.spark.SparkException: Failed to merge
incompatible data types StringType and
StructType(StructField(key,StringType,true),
StructField(value,StringType,true))


Indeed the printed schemas contain mismatched types of few fields,
e.g.:

StructField(details,ArrayType(StringType,true),true)

vs

StructField(details,ArrayType(StructType(StructField(key,StringType,true),
StructField(value,StringType,tru e)),true),true)

It seems that bad thing happened in read.json: itrecognized my
array fields differently: when array is empty as containing
Strings; when filled with data as containing structs.

The code of json/InferSchema indeed suggests that:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127
WherecanonicalizeType method replaces NullType with StringType in
my empty arrays.

This is a serious problem for someone trying to stream data from
json to parquet tables . Does anyone have ideas how to handle this
problem? My ideas are (some non-exclusive):

1. Have schema perfectly defined on my data. This is a last resort
as I wanted to create schema-less solution.

2. Write my own schema inference, that removes empty arrays from
schema. Then pass schema directly to read method. I could even use
 modify InferSchema class from spark source, but it is private
unfortunately... So I need to copy paste it.

3. Submit a bug to Spark about it. Do you also think it is a bug?

It's a blocker for me currently, any help will be appreciated!

Cheers,

Krzysztof



​


Re: Partition parquet data by ENUM column

2015-07-24 Thread Cheng Lian
Your guess is partly right. Firstly, Spark SQL doesn’t have an 
equivalent data type to Parquet BINARY (ENUM), and always falls back to 
normal StringType. So in your case, Spark SQL just see a StringType, 
which maps to Parquet BINARY (UTF8), but the underlying data type is 
BINARY (ENUM).


Secondly, Parquet only supports filter push-down optimization for a 
limited set of data types 
https://github.com/apache/parquet-mr/blob/apache-parquet-1.7.0/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java#L66-L80, 
in which ENUM is not included.


So the real problem here is that Spark SQL can’t prevent pushing down a 
predicate over an ENUM field since it sees the field as a normal string 
field. Would you mind to file a JIRA ticket?


Cheng

On 7/24/15 2:14 PM, Cheng Lian wrote:

Could you please provide the full stack trace of the exception? And 
what's the Git commit hash of the version you were using?


Cheng

On 7/24/15 6:37 AM, Jerry Lam wrote:

Hi Cheng,

I ran into issues related to ENUM when I tried to use Filter push 
down. I'm using Spark 1.5.0 (which contains fixes for parquet filter 
push down). The exception is the following:


java.lang.IllegalArgumentException: FilterPredicate column: item's 
declared type (org.apache.parquet.io.api.Binary) does not match the 
schema found in file metadata. Column item is of type: 
FullTypeDescriptor(PrimitiveType: BINARY, OriginalType: ENUM)

Valid types for this column are: null

Is it because Spark does not recognize ENUM type in parquet?

Best Regards,

Jerry



On Wed, Jul 22, 2015 at 12:21 AM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


On 7/22/15 9:03 AM, Ankit wrote:


Thanks a lot Cheng. So it seems even in spark 1.3 and 1.4,
parquet ENUMs were treated as Strings in Spark SQL right? So
does this mean partitioning for enums already works in previous
versions too since they are just treated as strings?


It’s a little bit complicated. A Thrift/Avro/ProtoBuf |ENUM|
value is represented as a |BINARY| annotated with original type
|ENUM| in Parquet. For example, an optional |ENUM| field |e| is
translated to something like |optional BINARY e (ENUM)| in
Parquet. And the underlying data is always a UTF8 string of the
|ENUM| name. However, the Parquet original type |ENUM| is not
documented, thus Spark 1.3 and 1.4 doesn’t recognize the |ENUM|
annotation and just see it as a normal |BINARY|. (I didn’t even
notice the existence of |ENUM| in Parquet before PR #7048…)

On the other hand, Spark SQL has a boolean option named
|spark.sql.parquet.binaryAsString|. When this option is set to
|true|, all Parquet |BINARY| values are considered and converted
to UTF8 strings. The original purpose of this option is used to
work around a bug of Hive, which writes strings as plain Parquet
|BINARY| values without a proper |UTF8| annotation.

That said, by using
|sqlContext.setConf(spark.sql.parquet.binaryAsString, true)|
in Scala/Java/Python, or |SET
spark.sql.parquet.binaryAsString=true| in SQL, you may read those
|ENUM| values as plain UTF8 strings.



Also, is there a good way to verify that the partitioning is
being used? I tried explain like (where data is partitioned by
type column)

scala ev.filter(type = 'NON').explain
== Physical Plan ==
Filter (type#4849 = NON)
 PhysicalRDD [...cols..], MapPartitionsRDD[103] at map at
newParquet.scala:573

but that is the same even with non partitioned data.


Do you mean how to verify whether partition pruning is effective?
You should be able to see log lines like this:

15/07/22 11:14:35 INFO DataSourceStrategy: Selected 1
partitions out of 3, pruned 66.67% partitions.




On Tue, Jul 21, 2015 at 4:35 PM, Cheng Lian
lian.cs@gmail.com mailto:lian.cs@gmail.com wrote:

Parquet support for Thrift/Avro/ProtoBuf ENUM types are just
added to the master branch.
https://github.com/apache/spark/pull/7048

ENUM types are actually not in the Parquet format spec,
that's why we didn't have it at the first place. Basically,
ENUMs are always treated as UTF8 strings in Spark SQL now.

Cheng

On 7/22/15 3:41 AM, ankits wrote:

Hi, I am using a custom build of spark 1.4 with the
parquet dependency
upgraded to 1.7. I have thrift data encoded with parquet
that i want to
partition by a column of type ENUM. Spark programming
guide says partition
discovery is only supported for string and numeric
columns, so it seems
partition discovery won't work out of the box here.

Is there any workaround that will allow me to partition
by ENUMs? Will hive
partitioning help here? I am unfamiliar with Hive

Re: Parquet problems

2015-07-22 Thread Cheng Lian
How many columns are there in these Parquet files? Could you load a 
small portion of the original large dataset successfully?


Cheng

On 6/25/15 5:52 PM, Anders Arpteg wrote:


Yes, both the driver and the executors. Works a little bit better with 
more space, but still a leak that will cause failure after a number of 
reads. There are about 700 different data sources that needs to be 
loaded, lots of data...



tor 25 jun 2015 08:02 Sabarish Sasidharan 
sabarish.sasidha...@manthan.com 
mailto:sabarish.sasidha...@manthan.com skrev:


Did you try increasing the perm gen for the driver?

Regards
Sab

On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com
mailto:arp...@spotify.com wrote:

When reading large (and many) datasets with the Spark 1.4.0
DataFrames parquet reader (the org.apache.spark.sql.parquet
format), the following exceptions are thrown:

Exception in thread sk-result-getter-0
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread task-result-getter-0
Exception in thread task-result-getter-3
java.lang.OutOfMemoryError: PermGen space
Exception in thread task-result-getter-1
java.lang.OutOfMemoryError: PermGen space
Exception in thread task-result-getter-2
java.lang.OutOfMemoryError: PermGen space

and many more like these from different threads. I've tried
increasing the PermGen space using the -XX:MaxPermSize VM
setting, but even after tripling the space, the same errors
occur. I've also tried storing intermediate results, and am
able to get the full job completed by running it multiple
times and starting for the last successful intermediate
result. There seems to be some memory leak in the parquet
format. Any hints on how to fix this problem?

Thanks,
Anders





Re: Spark-hive parquet schema evolution

2015-07-22 Thread Cheng Lian
Yeah, the benefit of `saveAsTable` is that you don't need to deal with 
schema explicitly, while the benefit of ALTER TABLE is you still have a 
standard vanilla Hive table.


Cheng

On 7/22/15 11:00 PM, Dean Wampler wrote:
While it's not recommended to overwrite files Hive thinks it 
understands, you can add the column to Hive's metastore using an ALTER 
TABLE command using HiveQL in the Hive shell or using HiveContext.sql():


ALTER TABLE mytable ADD COLUMNS col_name data_type

See 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AlterTable/Partition/Column 
for full details.


dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition 
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)

Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Wed, Jul 22, 2015 at 4:36 AM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Since Hive doesn’t support schema evolution, you’ll have to update
the schema stored in metastore somehow. For example, you can
create a new external table with the merged schema. Say you have a
Hive table |t1|:

|CREATE TABLE t1 (c0 INT, c1 DOUBLE); |

By default, this table is stored in HDFS path
|hdfs://some-host:9000/user/hive/warehouse/t1|. Now you append
some Parquet data with an extra column |c2| to the same directory:

|import org.apache.spark.sql.types._ val path =
hdfs://some-host:9000/user/hive/warehouse/t1 val df1 =
sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as
'c1, 'id cast StringType as 'c2)
df1.write.mode(append).parquet(path) |

Now you can create a new external table |t2| like this:

|val df2 = sqlContext.read.option( mergeSchema,
true).parquet(path) df2.write.path(path).saveAsTable(t2) |

Since we specified a path above, the newly created |t2| is an
external table pointing to the original HDFS location. But the
schema of |t2| is the merged version.

The drawback of this approach is that, |t2| is actually a Spark
SQL specific data source table rather than a genuine Hive table.
This means, it can be accessed by Spark SQL only. We’re just using
Hive metastore to help persisting metadata of the data source
table. However, since you’re asking how to access the new table
via Spark SQL CLI, this should work for you. We are working on
making Parquet and ORC data source tables accessible via Hive in
Spark 1.5.0.

Cheng

On 7/22/15 10:32 AM, Jerrick Hoang wrote:


Hi Lian,

Sorry I'm new to Spark so I did not express myself very clearly.
I'm concerned about the situation when let's say I have a Parquet
table some partitions and I add a new column A to parquet schema
and write some data with the new schema to a new partition in the
table. If i'm not mistaken, if I do a
sqlContext.read.parquet(table_path).printSchema() it will print
the correct schema with new column A. But if I do a 'describe
table' from SparkSQLCLI I won't see the new column being added. I
understand that this is because Hive doesn't support schema
evolution. So what is the best way to support CLI queries in this
situation? Do I need to manually alter the table everytime the
underlying schema changes?

Thanks

On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian
lian.cs@gmail.com mailto:lian.cs@gmail.com wrote:

Hey Jerrick,

What do you mean by schema evolution with Hive metastore
tables? Hive doesn't take schema evolution into account.
Could you please give a concrete use case? Are you trying to
write Parquet data with extra columns into an existing
metastore Parquet table?

Cheng


On 7/21/15 1:04 AM, Jerrick Hoang wrote:

I'm new to Spark, any ideas would be much appreciated! Thanks

On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang
jerrickho...@gmail.com mailto:jerrickho...@gmail.com wrote:

Hi all,

I'm aware of the support for schema evolution via
DataFrame API. Just wondering what would be the best way
to go about dealing with schema evolution with Hive
metastore tables. So, say I create a table via SparkSQL
CLI, how would I deal with Parquet schema evolution?

Thanks,
J






​






Re: Spark-hive parquet schema evolution

2015-07-22 Thread Cheng Lian
Since Hive doesn’t support schema evolution, you’ll have to update the 
schema stored in metastore somehow. For example, you can create a new 
external table with the merged schema. Say you have a Hive table |t1|:


|CREATE TABLE t1 (c0 INT, c1 DOUBLE); |

By default, this table is stored in HDFS path 
|hdfs://some-host:9000/user/hive/warehouse/t1|. Now you append some 
Parquet data with an extra column |c2| to the same directory:


|import org.apache.spark.sql.types._ val path = 
hdfs://some-host:9000/user/hive/warehouse/t1 val df1 = 
sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as 'c1, 'id 
cast StringType as 'c2) df1.write.mode(append).parquet(path) |


Now you can create a new external table |t2| like this:

|val df2 = sqlContext.read.option(mergeSchema, true).parquet(path) 
df2.write.path(path).saveAsTable(t2) |


Since we specified a path above, the newly created |t2| is an external 
table pointing to the original HDFS location. But the schema of |t2| is 
the merged version.


The drawback of this approach is that, |t2| is actually a Spark SQL 
specific data source table rather than a genuine Hive table. This means, 
it can be accessed by Spark SQL only. We’re just using Hive metastore to 
help persisting metadata of the data source table. However, since you’re 
asking how to access the new table via Spark SQL CLI, this should work 
for you. We are working on making Parquet and ORC data source tables 
accessible via Hive in Spark 1.5.0.


Cheng

On 7/22/15 10:32 AM, Jerrick Hoang wrote:


Hi Lian,

Sorry I'm new to Spark so I did not express myself very clearly. I'm 
concerned about the situation when let's say I have a Parquet table 
some partitions and I add a new column A to parquet schema and write 
some data with the new schema to a new partition in the table. If i'm 
not mistaken, if I do a 
sqlContext.read.parquet(table_path).printSchema() it will print the 
correct schema with new column A. But if I do a 'describe table' from 
SparkSQLCLI I won't see the new column being added. I understand that 
this is because Hive doesn't support schema evolution. So what is the 
best way to support CLI queries in this situation? Do I need to 
manually alter the table everytime the underlying schema changes?


Thanks

On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Hey Jerrick,

What do you mean by schema evolution with Hive metastore tables?
Hive doesn't take schema evolution into account. Could you please
give a concrete use case? Are you trying to write Parquet data
with extra columns into an existing metastore Parquet table?

Cheng


On 7/21/15 1:04 AM, Jerrick Hoang wrote:

I'm new to Spark, any ideas would be much appreciated! Thanks

On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang
jerrickho...@gmail.com mailto:jerrickho...@gmail.com wrote:

Hi all,

I'm aware of the support for schema evolution via DataFrame
API. Just wondering what would be the best way to go about
dealing with schema evolution with Hive metastore tables. So,
say I create a table via SparkSQL CLI, how would I deal with
Parquet schema evolution?

Thanks,
J






​


Re: Partition parquet data by ENUM column

2015-07-21 Thread Cheng Lian

On 7/22/15 9:03 AM, Ankit wrote:

Thanks a lot Cheng. So it seems even in spark 1.3 and 1.4, parquet 
ENUMs were treated as Strings in Spark SQL right? So does this mean 
partitioning for enums already works in previous versions too since 
they are just treated as strings?


It’s a little bit complicated. A Thrift/Avro/ProtoBuf |ENUM| value is 
represented as a |BINARY| annotated with original type |ENUM| in 
Parquet. For example, an optional |ENUM| field |e| is translated to 
something like |optional BINARY e (ENUM)| in Parquet. And the underlying 
data is always a UTF8 string of the |ENUM| name. However, the Parquet 
original type |ENUM| is not documented, thus Spark 1.3 and 1.4 doesn’t 
recognize the |ENUM| annotation and just see it as a normal |BINARY|. (I 
didn’t even notice the existence of |ENUM| in Parquet before PR #7048…)


On the other hand, Spark SQL has a boolean option named 
|spark.sql.parquet.binaryAsString|. When this option is set to |true|, 
all Parquet |BINARY| values are considered and converted to UTF8 
strings. The original purpose of this option is used to work around a 
bug of Hive, which writes strings as plain Parquet |BINARY| values 
without a proper |UTF8| annotation.


That said, by using 
|sqlContext.setConf(spark.sql.parquet.binaryAsString, true)| in 
Scala/Java/Python, or |SET spark.sql.parquet.binaryAsString=true| in 
SQL, you may read those |ENUM| values as plain UTF8 strings.




Also, is there a good way to verify that the partitioning is being 
used? I tried explain like (where data is partitioned by type column)


scala ev.filter(type = 'NON').explain
== Physical Plan ==
Filter (type#4849 = NON)
 PhysicalRDD [...cols..], MapPartitionsRDD[103] at map at 
newParquet.scala:573


but that is the same even with non partitioned data.


Do you mean how to verify whether partition pruning is effective? You 
should be able to see log lines like this:


   15/07/22 11:14:35 INFO DataSourceStrategy: Selected 1 partitions out
   of 3, pruned 66.67% partitions.




On Tue, Jul 21, 2015 at 4:35 PM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Parquet support for Thrift/Avro/ProtoBuf ENUM types are just added
to the master branch. https://github.com/apache/spark/pull/7048

ENUM types are actually not in the Parquet format spec, that's why
we didn't have it at the first place. Basically, ENUMs are always
treated as UTF8 strings in Spark SQL now.

Cheng

On 7/22/15 3:41 AM, ankits wrote:

Hi, I am using a custom build of spark 1.4 with the parquet
dependency
upgraded to 1.7. I have thrift data encoded with parquet that
i want to
partition by a column of type ENUM. Spark programming guide
says partition
discovery is only supported for string and numeric columns, so
it seems
partition discovery won't work out of the box here.

Is there any workaround that will allow me to partition by
ENUMs? Will hive
partitioning help here? I am unfamiliar with Hive, and how it
plays into
parquet, thrift and spark so I would appreciate any pointers
in the right
direction. Thanks.



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Partition-parquet-data-by-ENUM-column-tp23939.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org





​


Re: Partition parquet data by ENUM column

2015-07-21 Thread Cheng Lian
Parquet support for Thrift/Avro/ProtoBuf ENUM types are just added to 
the master branch. https://github.com/apache/spark/pull/7048


ENUM types are actually not in the Parquet format spec, that's why we 
didn't have it at the first place. Basically, ENUMs are always treated 
as UTF8 strings in Spark SQL now.


Cheng

On 7/22/15 3:41 AM, ankits wrote:

Hi, I am using a custom build of spark 1.4 with the parquet dependency
upgraded to 1.7. I have thrift data encoded with parquet that i want to
partition by a column of type ENUM. Spark programming guide says partition
discovery is only supported for string and numeric columns, so it seems
partition discovery won't work out of the box here.

Is there any workaround that will allow me to partition by ENUMs? Will hive
partitioning help here? I am unfamiliar with Hive, and how it plays into
parquet, thrift and spark so I would appreciate any pointers in the right
direction. Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Partition-parquet-data-by-ENUM-column-tp23939.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-hive parquet schema evolution

2015-07-21 Thread Cheng Lian

Hey Jerrick,

What do you mean by schema evolution with Hive metastore tables? Hive 
doesn't take schema evolution into account. Could you please give a 
concrete use case? Are you trying to write Parquet data with extra 
columns into an existing metastore Parquet table?


Cheng

On 7/21/15 1:04 AM, Jerrick Hoang wrote:

I'm new to Spark, any ideas would be much appreciated! Thanks

On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang 
jerrickho...@gmail.com mailto:jerrickho...@gmail.com wrote:


Hi all,

I'm aware of the support for schema evolution via DataFrame API.
Just wondering what would be the best way to go about dealing with
schema evolution with Hive metastore tables. So, say I create a
table via SparkSQL CLI, how would I deal with Parquet schema
evolution?

Thanks,
J






Re: what is : ParquetFileReader: reading summary file ?

2015-07-17 Thread Cheng Lian
Yeah, Spark SQL Parquet support need to do some metadata discovery when 
firstly importing a folder containing Parquet files, and discovered 
metadata is cached.


Cheng

On 7/17/15 1:56 PM, shsh...@tsmc.com wrote:

Hi all,

our scenario is to generate lots of folders containinig parquet file and
then uses add partition to add these folder locations to a hive table;
when trying to read the hive table using Spark,
following logs would show up and took a lot of time on reading them;
but this won't happen after second of third time of querying this table
through sql in HiveContext;
does that mean that parquet file has did some chaching by itself? Thanks!


Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: Initiating
action with parallelism: 5
Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150701/LDSN/_common_metadata
Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150702/MECC/_common_metadata
Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150702/MCOX/_common_metadata
Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150629/LCTE/_common_metadata
Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150630/MDNS/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150701/VSHM/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150624/LSCB/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150628/MPD8/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150703/VSHM/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150630/LIHI/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150701/LESE/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150626/MPD8/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150624/MDHK/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150628/VEMH/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150626/MDHK/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150628/LSCB/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150627/LESR/_common_metadata
Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading
summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150703/LESE/_common_metadata
  ---
  TSMC PROPERTY
  This email communication (and any attachments) is proprietary information
  for the sole use of its
  intended recipient. Any unauthorized review, use or distribution by anyone
  other than the intended
  recipient is strictly prohibited.  If you are not the intended recipient,
  please notify the sender by
  replying to this email, and then delete this email and any copies of it
  immediately. Thank you.
  ---





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame.write().partitionBy(some_column).parquet(path) produces OutOfMemory with very few items

2015-07-16 Thread Cheng Lian

Hi Nikos,

How many columns and distinct values of some_column are there in the 
DataFrame? Parquet writer is known to be very memory consuming for wide 
tables. And lots of distinct partition column values result in many 
concurrent Parquet writers. One possible workaround is to first 
repartition the data by partition columns first.


Cheng

On 7/15/15 7:05 PM, Nikos Viorres wrote:

Hi,

I am trying to test partitioning for DataFrames with parquet usage so 
i attempted to do df.write().partitionBy(some_column).parquet(path) 
on a small dataset of 20.000 records which when saved as parquet 
locally with gzip take 4mb of disk space.
However, on my dev machine with 
-Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always 
fails with an OutOfMemoryError.

Does anyone have any ideas?

stack trace:
[Stage 2:  (0 + 4) / 8]2015-07-15 13:57:21,021 ERROR 
Logging$class Exception in task 3.0 in stage 2.0 (TID 8)

java.lang.OutOfMemoryError: Java heap space
at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at 
parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
at 
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)

at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at 
parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)

at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at 
org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111)
at 
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
at 
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441)
at 
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436)
at 
scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)

at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
at 
org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org 
http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)
2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in 
thread Thread[Executor task launch worker-2,5,main]

java.lang.OutOfMemoryError: Java heap space
at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at 
parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
at 
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)

at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at 
parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)

at 

Re: How to disable parquet schema merging in 1.4?

2015-07-01 Thread Cheng Lian

With Spark 1.4, you may use data source option mergeSchema to control it:

  sqlContext.read.option(mergeSchema, false).parquet(some/path)

or

  CREATE TABLE t USING parquet OPTIONS (mergeSchema false, path 
some/path)


We're considering to disable schema merging by default in 1.5.0 since it 
brings unnecessary performance cost when schema evolution is not a problem.


Cheng

On 6/23/15 2:20 AM, Rex Xiong wrote:
I remember in a previous PR, schema merging can be disabled by 
setting spark.sql.hive.convertMetastoreParquet.mergeSchema to false.
But in 1.4 release, I don't see this config anymore, is there a new 
way to do it?


Thanks



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark DataFrame 1.4 write to parquet/saveAsTable tasks fail

2015-06-17 Thread Cheng Lian
What's the size of this table? Is the data skewed (so that speculation 
is probably triggered)?


Cheng

On 6/15/15 10:37 PM, Night Wolf wrote:

Hey Yin,

Thanks for the link to the JIRA. I'll add details to it. But I'm able 
to reproduce it, at least in the same shell session, every time I do a 
write I get a random number of tasks failing on the first run with the 
NPE.


Using dynamic allocation of executors in YARN mode. No speculative 
execution is enabled.


On Tue, Jun 16, 2015 at 3:11 PM, Yin Huai yh...@databricks.com 
mailto:yh...@databricks.com wrote:


I saw it once but I was not clear how to reproduce it. The jira I
created is https://issues.apache.org/jira/browse/SPARK-7837.

More information will be very helpful. Were those errors from
speculative tasks or regular tasks (the first attempt of the
task)? Is this error deterministic (can you reproduce every time
you run this command)?

Thanks,

Yin

On Mon, Jun 15, 2015 at 8:59 PM, Night Wolf
nightwolf...@gmail.com mailto:nightwolf...@gmail.com wrote:

Looking at the logs of the executor, looks like it fails to
find the file; e.g. for task 10323.0


15/06/16 13:43:13 ERROR output.FileOutputCommitter: Hit
IOException trying to rename

maprfs:///user/hive/warehouse/is_20150617_test2/_temporary/_attempt_201506161340__m_010181_0/part-r-353626.gz.parquet
to
maprfs:/user/hive/warehouse/is_20150617_test2/part-r-353626.gz.parquet
java.io.IOException: Invalid source or target
at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:952)
at

org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:201)
at

org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:225)
at

org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:167)
at

org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:100)
at

org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:137)
at

org.apache.spark.sql.sources.BaseWriterContainer.commitTask(commands.scala:357)
at

org.apache.spark.sql.sources.DefaultWriterContainer.commitTask(commands.scala:394)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org

http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:157)
at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/06/16 13:43:13 ERROR mapred.SparkHadoopMapRedUtil: Error
committing the output of task:
attempt_201506161340__m_010181_0
java.io.IOException: Invalid source or target
at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:952)
at

org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:201)
at

org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:225)
at

org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:167)
at

org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:100)
at

org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:137)
at

org.apache.spark.sql.sources.BaseWriterContainer.commitTask(commands.scala:357)
at

org.apache.spark.sql.sources.DefaultWriterContainer.commitTask(commands.scala:394)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org

http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:157)
at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at

Re: BigDecimal problem in parquet file

2015-06-17 Thread Cheng Lian
 this is the problem to fix 
now.


Thanks

On 13 June 2015 at 06:31, Davies Liu dav...@databricks.com 
mailto:dav...@databricks.com wrote:


Maybe it's related to a bug, which is fixed by
https://github.com/apache/spark/pull/6558 recently.

On Fri, Jun 12, 2015 at 5:38 AM, Bipin Nag bipin@gmail.com
mailto:bipin@gmail.com wrote:
 Hi Cheng,

 Yes, some rows contain unit instead of decimal values. I believe
some rows
 from original source I had don't have any value i.e. it is null.
And that
 shows up as unit. How does the spark-sql or parquet handle null
in place of
 decimal values, assuming that field is nullable. I will have to
change it
 properly.

 Thanks for helping out.
 Bipin

 On 12 June 2015 at 14:57, Cheng Lian lian.cs@gmail.com
mailto:lian.cs@gmail.com wrote:

 On 6/10/15 8:53 PM, Bipin Nag wrote:

 Hi Cheng,

 I am using Spark 1.3.1 binary available for Hadoop 2.6. I am
loading an
 existing parquet file, then repartitioning and saving it. Doing
this gives
 the error. The code for this doesn't look like causing 
problem. I have a

 feeling the source - the existing parquet is the culprit.

 I created that parquet using a jdbcrdd (pulled from microsoft
sql server).
 First I saved jdbcrdd as an objectfile on disk. Then loaded it
again, made a
 dataframe from it using a schema then saved it as a parquet.

 Following is the code :
 For saving jdbcrdd:
  name - fullqualifiedtablename
  pk - string for primarykey
  pklast - last id to pull
 val myRDD = new JdbcRDD( sc, () =
  DriverManager.getConnection(url,username,password) ,
 SELECT * FROM  + name +  WITH (NOLOCK) WHERE ? =
+pk+ and
 +pk+ = ?,
 1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
 myRDD.saveAsObjectFile(rawdata/+name);

 For applying schema and saving the parquet:
 val myschema = schemamap(name)
 val myrdd =
 sc.objectFile[Array[Object]](/home/bipin/rawdata/+name).map(x =
 org.apache.spark.sql.Row(x:_*))

 Have you tried to print out x here to check its contents? My
guess is that
 x actually contains unit values. For example, the follow Spark
shell code
 can reproduce a similar exception:

 import org.apache.spark.sql.types._
 import org.apache.spark.sql.Row

 val schema = StructType(StructField(dec, DecimalType(10, 0))
:: Nil)
 val rdd = sc.parallelize(1 to 10).map(_ = Array(())).map(arr
= Row(arr:
 _*))
 val df = sqlContext.createDataFrame(rdd, schema)

 df.saveAsParquetFile(file:///tmp/foo)

 val actualdata = sqlContext.createDataFrame(myrdd, myschema)
  actualdata.saveAsParquetFile(/home/bipin/stageddata/+name)

 Schema structtype can be made manually, though I pull table's
metadata and
 make one. It is a simple string translation (see sql docs
and/or spark
 datatypes)

 That is how I created the parquet file. Any help to solve the
issue is
 appreciated.
 Thanks
 Bipin


 On 9 June 2015 at 20:44, Cheng Lian lian.cs@gmail.com
mailto:lian.cs@gmail.com wrote:

 Would you please provide a snippet that reproduce this issue? What
 version of Spark were you using?

 Cheng

 On 6/9/15 8:18 PM, bipin wrote:

 Hi,
 When I try to save my data frame as a parquet file I get the
following
 error:

 java.lang.ClassCastException: scala.runtime.BoxedUnit cannot
be cast to
 org.apache.spark.sql.types.Decimal
 at



org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
 at



org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
 at



org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
 at



org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
 at



parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
 at

parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
 at

parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
 at

 org.apache.spark.sql.parquet.ParquetRelation2.org

http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
 at



org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
 at



org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689

Re: HiveContext saveAsTable create wrong partition

2015-06-17 Thread Cheng Lian

Thanks for reporting this. Would you mind to help creating a JIRA for this?

On 6/16/15 2:25 AM, patcharee wrote:
I found if I move the partitioned columns in schemaString and in Row 
to the end of the sequence, then it works correctly...


On 16. juni 2015 11:14, patcharee wrote:

Hi,

I am using spark 1.4 and HiveContext to append data into a 
partitioned hive table. I found that the data insert into the table 
is correct, but the partition(folder) created is totally wrong.

Below is my code snippet

--- 

val schemaString = zone z year month date hh x y height u v w ph phb 
t p pb qvapor qgraup qnice qnrain tke_pbl el_pbl

val schema =
  StructType(
schemaString.split( ).map(fieldName =
  if (fieldName.equals(zone) || fieldName.equals(z) || 
fieldName.equals(year) || fieldName.equals(month) ||
  fieldName.equals(date) || fieldName.equals(hh) || 
fieldName.equals(x) || fieldName.equals(y))

StructField(fieldName, IntegerType, true)
  else
StructField(fieldName, FloatType, true)
))

val pairVarRDD =
sc.parallelize(Seq((Row(2,42,2009,3,1,0,218,365,9989.497.floatValue(),29.627113.floatValue(),19.071793.floatValue(),0.11982734.floatValue(),3174.6812.floatValue(), 

97735.2.floatValue(),16.389032.floatValue(),-96.62891.floatValue(),25135.365.floatValue(),2.6476808E-5.floatValue(),0.0.floatValue(),13195.351.floatValue(), 


0.0.floatValue(),0.1.floatValue(),0.0.floatValue()))
))

val partitionedTestDF2 = sqlContext.createDataFrame(pairVarRDD, schema)

partitionedTestDF2.write.format(org.apache.spark.sql.hive.orc.DefaultSource) 

.mode(org.apache.spark.sql.SaveMode.Append).partitionBy(zone,z,year,month).saveAsTable(test4DimBySpark) 



--- 



The table contains 23 columns (longer than Tuple maximum length), so 
I use Row Object to store raw data, not Tuple.

Here is some message from spark when it saved data

15/06/16 10:39:22 INFO metadata.Hive: Renaming 
src:hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0/part-1;dest: 
hdfs://service-10-0.local:8020/apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1;Status:true 

15/06/16 10:39:22 INFO metadata.Hive: New loading path = 
hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0 
with partSpec {zone=13195, z=0, year=0, month=0}


From the raw data (pairVarRDD) zone = 2, z = 42, year = 2009, month = 
3. But spark created a partition {zone=13195, z=0, year=0, month=0}.


When I queried from hive

hive select * from test4dimBySpark;
OK
242200931.00.0218.0365.09989.497 
29.62711319.0717930.11982734-3174.681297735.2 
16.389032-96.6289125135.3652.6476808E-50.0 13195
000

hive select zone, z, year, month from test4dimBySpark;
OK
13195000
hive dfs -ls /apps/hive/warehouse/test4dimBySpark/*/*/*/*;
Found 2 items
-rw-r--r--   3 patcharee hdfs   1411 2015-06-16 10:39 
/apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1


The data stored in the table is correct zone = 2, z = 42, year = 
2009, month = 3, but the partition created was wrong 
zone=13195/z=0/year=0/month=0


Is this a bug or what could be wrong? Any suggestion is appreciated.

BR,
Patcharee







-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.4 DataFrame Parquet file writing - missing random rows/partitions

2015-06-17 Thread Cheng Lian

Hi Nathan,

Thanks a lot for the detailed report, especially the information about 
nonconsecutive part numbers. It's confirmed to be a race condition bug 
and just filed https://issues.apache.org/jira/browse/SPARK-8406 to track 
this. Will deliver a fix ASAP and this will be included in 1.4.1.


Best,
Cheng

On 6/16/15 12:30 AM, Nathan McCarthy wrote:

Hi all,

Looks like data frame parquet writing is very broken in Spark 1.4.0. 
We had no problems with Spark 1.3.


When trying to save a data frame with *569610608* rows.

dfc.write.format(parquet).save(“/data/map_parquet_file)

We get random results between runs. Caching the data frame in memory 
makes no difference. It looks like the write out misses some of the 
RDD partitions. We have an RDD with *6750* partitions. When we write 
out we get less files out than the number of partitions. When reading 
the data back in and running a count, we get smaller number of rows.


I’ve tried counting the rows in all different ways. All return the 
same result, *560214031* rows, missing about 9.4 million rows (0.15%).


qc.read.parquet(/data/map_parquet_file).count
qc.read.parquet(/data/map_parquet_file).rdd.count
qc.read.parquet(/data/map_parquet_file).mapPartitions{itr = var c = 
0; itr.foreach(_ = c = c + 1); Seq(c).toIterator }.reduce(_ + _)


Looking on HDFS the files, there are /6643/ .parquet files. 107 
missing partitions (about 0.15%).


Then writing out the same cached DF again to a new file gives *6717* 
files on hdfs (about 33 files missing or 0.5%);


dfc.write.parquet(“/data/map_parquet_file_2)

And we get *566670107* rows back (about 3million missing ~0.5%);

qc.read.parquet(/data/map_parquet_file_2).count

Writing the same df out to json writes the expected number (*6750*) of 
parquet files and returns the right number of rows /569610608/.


dfc.write.format(json).save(/data/map_parquet_file_3)
qc.read.format(json).load(/data/map_parquet_file_3).count

One thing to note is that the parquet part files on HDFS are not the 
normal sequential part numbers like for the json output and parquet 
output in Spark 1.3.


part-r-06151.gz.parquet  part-r-118401.gz.parquet 
 part-r-146249.gz.parquet  part-r-196755.gz.parquet 
 part-r-35811.gz.parquet part-r-55628.gz.parquet 
 part-r-73497.gz.parquet  part-r-97237.gz.parquet
part-r-06161.gz.parquet  part-r-118406.gz.parquet 
 part-r-146254.gz.parquet  part-r-196763.gz.parquet 
 part-r-35826.gz.parquet part-r-55647.gz.parquet 
 part-r-73500.gz.parquet  _SUCCESS


We are using MapR 4.0.2 for hdfs.

Any ideas?

Cheers,
Nathan





Re: Spark 1.4 DataFrame Parquet file writing - missing random rows/partitions

2015-06-17 Thread Cheng Lian
Since SPARK-8406 is serious, we hope to ship it ASAP, possibly next 
week, but I can't say it's a promise yet. However, you can cherry pick 
the commit as soon as the fix is merged into branch-1.4. Sorry for the 
troubles!


Cheng

On 6/17/15 1:42 AM, Nathan McCarthy wrote:

Thanks Cheng. Nice find!

Let me know if there is anything we can do to help on this end with 
contributing a fix or testing.


Side note - any ideas on the 1.4.1 eta? There are a few bug fixes we 
need in there.


Cheers,
Nathan

From: Cheng Lian
Date: Wednesday, 17 June 2015 6:25 pm
To: Nathan, user@spark.apache.org mailto:user@spark.apache.org
Subject: Re: Spark 1.4 DataFrame Parquet file writing - missing random 
rows/partitions


Hi Nathan,

Thanks a lot for the detailed report, especially the information about 
nonconsecutive part numbers. It's confirmed to be a race condition bug 
and just filed https://issues.apache.org/jira/browse/SPARK-8406 to 
track this. Will deliver a fix ASAP and this will be included in 1.4.1.


Best,
Cheng

On 6/16/15 12:30 AM, Nathan McCarthy wrote:

Hi all,

Looks like data frame parquet writing is very broken in Spark 1.4.0. 
We had no problems with Spark 1.3.


When trying to save a data frame with *569610608* rows.

dfc.write.format(parquet).save(“/data/map_parquet_file)

We get random results between runs. Caching the data frame in memory 
makes no difference. It looks like the write out misses some of the 
RDD partitions. We have an RDD with *6750* partitions. When we write 
out we get less files out than the number of partitions. When reading 
the data back in and running a count, we get smaller number of rows.


I’ve tried counting the rows in all different ways. All return the 
same result, *560214031* rows, missing about 9.4 million rows (0.15%).


qc.read.parquet(/data/map_parquet_file).count
qc.read.parquet(/data/map_parquet_file).rdd.count
qc.read.parquet(/data/map_parquet_file).mapPartitions{itr = var c 
= 0; itr.foreach(_ = c = c + 1); Seq(c).toIterator }.reduce(_ + _)


Looking on HDFS the files, there are /6643/ .parquet files. 107 
missing partitions (about 0.15%).


Then writing out the same cached DF again to a new file gives *6717* 
files on hdfs (about 33 files missing or 0.5%);


dfc.write.parquet(“/data/map_parquet_file_2)

And we get *566670107* rows back (about 3million missing ~0.5%);

qc.read.parquet(/data/map_parquet_file_2).count

Writing the same df out to json writes the expected number (*6750*) 
of parquet files and returns the right number of rows /569610608/.


dfc.write.format(json).save(/data/map_parquet_file_3)
qc.read.format(json).load(/data/map_parquet_file_3).count

One thing to note is that the parquet part files on HDFS are not the 
normal sequential part numbers like for the json output and parquet 
output in Spark 1.3.


part-r-06151.gz.parquet  part-r-118401.gz.parquet 
 part-r-146249.gz.parquet  part-r-196755.gz.parquet 
 part-r-35811.gz.parquet part-r-55628.gz.parquet 
 part-r-73497.gz.parquet  part-r-97237.gz.parquet
part-r-06161.gz.parquet  part-r-118406.gz.parquet 
 part-r-146254.gz.parquet  part-r-196763.gz.parquet 
 part-r-35826.gz.parquet part-r-55647.gz.parquet 
 part-r-73500.gz.parquet  _SUCCESS


We are using MapR 4.0.2 for hdfs.

Any ideas?

Cheers,
Nathan







Re: Dataframe Write : Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.

2015-06-13 Thread Cheng Lian
As the error message says, were you using a |SQLContext| instead of a 
|HiveContext| to create the DataFrame?


In Spark shell, although the variable name is |sqlContext|, the type of 
that variable is actually |org.apache.spark.sql.hive.HiveContext|, which 
has the ability to communicate with Hive metastore.


Cheng

On 6/13/15 12:36 PM, pth001 wrote:


Hi,

I am using spark 0.14. I try to insert data into a hive table (in orc 
format) from DF.


partitionedTestDF.write.format(org.apache.spark.sql.hive.orc.DefaultSource) 

.mode(org.apache.spark.sql.SaveMode.Append).partitionBy(zone,z,year,month).saveAsTable(testorc) 



When this job is submitted by spark-submit I get 
Exception in thread main java.lang.RuntimeException: Tables created 
with SQLContext must be TEMPORARY. Use a HiveContext instead


But the job works fine on spark-shell. What can be wrong?

BR,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

.


​


Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng Lian
Thanks for the extra details and explanations Chaocai, will try to 
reproduce this when I got chance.


Cheng

On 6/12/15 3:44 PM, 姜超才 wrote:
I said OOM occurred on slave node, because I monitored memory 
utilization during the query task, on driver, very few memory was 
ocupied. And i remember i have ever seen the OOM stderr log on slave 
node.


But recently there seems no OOM log on slave node.
Follow the cmd 、data 、env and the code I gave you, the OOM can 100% 
repro on cluster mode.


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* Cheng Lian l...@databricks.com
*收件人:* 姜超才 jiangchao...@haiyisoft.com, Hester wang 
hester9...@gmail.com, user@spark.apache.org
*主题:* Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more 
than 1,000,000 rows.

*日期:* 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your 
last comment saying The OOM or lose heartbeat was occurred on slave 
node. Because from the log files you attached at first, those OOM 
actually happens on driver side (Thrift server log only contains log 
lines from driver side). Did you see OOM from executor stderr output? 
I ask this because there are still a large portion of users are using 
1.3, and we may want deliver a fix if there does exist bugs that 
causes unexpected OOM.


Cheng

On 6/12/15 3:14 PM, 姜超才 wrote:

Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信息 -
*发件人:* 姜超才
*收件人:* Cheng Lian , Hester wang ,
*主题:* 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

*日期:* 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* Cheng Lian
*收件人:* 姜超才 , Hester wang ,
*主题:* Re: 回复: Re: 回复: Re: Met OOM when fetching more than 
1,000,000 rows.

*日期:* 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop (don't 
have access to a cluster for now) but couldn't reproduce this issue. 
Your program just executed smoothly... :-/


Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local mode 
works for you?  Will investigate this with a cluster when I get chance.


Cheng

On 6/10/15 5:19 PM, 姜超才 wrote:
When set spark.sql.thriftServer.incrementalCollect and set driver 
memory to 7G, Things seems stable and simple:
It can quickly run through the query line, but when traversal the 
result set ( while rs.hasNext ), it can quickly get the OOM: java 
heap space. See attachment.


/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh --master 
spark://cx-spark-001:7077 --conf spark.executor.memory=4g --conf 
spark.driver.memory=7g --conf spark.shuffle.consolidateFiles=true 
--conf spark.shuffle.manager=sort --conf 
spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit --conf 
spark.file.transferTo=false --conf spark.akka.timeout=2000 --conf 
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8 --conf 
spark.kryoserializer.buffer.mb=256 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.akka.frameSize=512 --driver-class-path 
/usr/local/hive/lib/classes12.jar --conf 
spark.sql.thriftServer.incrementalCollect=true


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* Cheng Lian
*收件人:* 姜超才 , Hester wang ,
*主题:* Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.
*日期:* 2015/06/10 16:37:34 (Wed)

Also, if the data isn't confidential, would you mind to send me a 
compressed copy (don't cc user@spark.apache.org)?


Cheng

On 6/10/15 4:23 PM, 姜超才 wrote:

Hi Lian,

Thanks for your quick response.

I forgot mention that I have tuned driver memory from 2G to 4G, 
seems got minor improvement, The dead way when fetching 1,400,000 
rows changed from OOM::GC overhead limit exceeded to  lost 
worker heartbeat after 120s.


I will try  to set spark.sql.thriftServer.incrementalCollect and 
continue increase driver memory to 7G, and will send the result to 
you.


Thanks,

SuperJ


- 原始邮件信息 -
*发件人:* Cheng Lian
*收件人:* Hester wang ,
*主题:* Re: Met OOM when fetching more than 1,000,000 rows.
*日期:* 2015/06/10 16:15:47 (Wed)

Hi Xiaohan,

Would you please try to set 
spark.sql.thriftServer.incrementalCollect to true and 
increasing driver memory size? In this way, HiveThriftServer2 uses 
RDD.toLocalIterator rather than RDD.collect().iterator to return 
the result set. The key difference is that RDD.toLocalIterator 
retrieves a single partition at a time, thus avoid holding the 
whole result set on driver side. The memory issue happens on driver 
side

Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching more than 1,000,000 rows.

2015-06-12 Thread Cheng Lian
My guess the reason why local mode is OK while standalone cluster 
doesn't work is that in cluster mode, task results are serialized and 
sent to driver side. Driver need to deserialize the result, and thus 
occupies much more memory then local mode (where task result 
de/serialization is not performed).


Cheng

On 6/12/15 4:17 PM, Cheng, Hao wrote:


Not sure if Spark Core will provide API to fetch the record one by one 
from the block manager, instead of the pulling them all into the 
driver memory.


*From:*Cheng Lian [mailto:l...@databricks.com]
*Sent:* Friday, June 12, 2015 3:51 PM
*To:* 姜超才; Hester wang; user@spark.apache.org
*Subject:* Re: 回复: Re: 回复: Re: 回复: Re: 回复: Re: Met OOM when 
fetching more than 1,000,000 rows.


Thanks for the extra details and explanations Chaocai, will try to 
reproduce this when I got chance.


Cheng

On 6/12/15 3:44 PM, 姜超才 wrote:

I said OOM occurred on slave node, because I monitored memory
utilization during the query task, on driver, very few memory was
ocupied. And i remember i have ever seen the OOM stderr log on
slave node.

But recently there seems no OOM log on slave node.

Follow the cmd 、data 、env and the code I gave you, the OOM can
100% repro on cluster mode.

Thanks,

SuperJ


- 原始邮件信息 -
*发件人**:* Cheng Lian l...@databricks.com
mailto:l...@databricks.com
*收件人**:* 姜超才 jiangchao...@haiyisoft.com
mailto:jiangchao...@haiyisoft.com, Hester wang
hester9...@gmail.com mailto:hester9...@gmail.com,
user@spark.apache.org mailto:user@spark.apache.org
*主题**:* Re: 回复: Re: 回 复: Re: 回复: Re: Met OOM when fetching
more than 1,000,000 rows.
*日期**:* 2015/06/12 15:30:08 (Fri)

Hi Chaocai,

Glad that 1.4 fixes your case. However, I'm a bit confused by your
last comment saying The OOM or lose heartbeat was occurred on
slave node. Because from the log files you attached at first,
those OOM actually happens on driver side (Thrift server log only
contains log lines from driver side). Did you see OOM from
executor stderr output? I ask this because there are still a large
portion of users are using 1.3, and we may want deliver a fix if
there does exist bugs that causes unexpected OOM.

Cheng

On 6/12/15 3:14 PM, 姜超才 wrote:

Hi Lian,

Today I update my spark to v1.4. This issue resolved.

Thanks,
SuperJ

- 原始邮件信 息 -
*发件人**:* 姜超才
*收件人**:* Cheng Lian , Hester wang ,
*主题**:* 回复: Re: 回复: Re: 回复: Re: Met OOM when fetching
more than 1,000,000 rows.
*日期**:* 2015/06/11 08:56:28 (Thu)

No problem on Local mode. I can get all rows.

Select * from foo;

The OOM or lose heartbeat was occured on slave node.

Thanks,

SuperJ


- 原始邮件信 息 -
*发件人**:* Cheng Lian
*收件人**:* 姜超才 , Hester wang ,
*主题**:* Re: 回复: Re: 回复: Re: Met OOM when fetching more
than 1,000,000 rows.
*日期**:* 2015/06/10 19:58:59 (Wed)

Hm, I tried the following with 0.13.1 and 0.13.0 on my laptop
(don't have access to a cluster for now) but couldn't
reproduce this issue. Your program just executed smoothly... :-/

Command line used to start the Thrift server:

./sbin/start-thriftserver.sh --driver-memory 4g --master local

SQL statements used to create the table with your data:

create table foo(k string, v double);
load data local inpath '/tmp/bar' into table foo;

Tried this via Beeline:

select * from foo limit 160;

Also tried the Java program you provided.

Could you also try to verify whether this single node local
mode works for you?  Will investigate this with a cluster when
I get chance.

Cheng

On 6/10/15 5:19 PM, 姜超才 wrote:

When set spark.sql.thriftServer.incrementalCollect and
set driver memory to 7G, Things seems stable and simple:

It can quickly run through the query line, but when
traversal the result set ( while rs.hasNext ), it can
quickly get the OOM: java heap space. See attachment.

/usr/local/spark/spark-1.3.0/sbin/start-thriftserver.sh
--master spark://cx-spark-001:7077 --conf
spark.executor.memory=4g --conf spark.driver.memory=7g
--conf spark.shuffle.consolidateFiles=true --conf
spark.shuffle.manager=sort --conf
spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit
--conf spark.file.transferTo=false --conf
spark.akka.timeout=2000 --conf
spark.storage.memoryFraction=0.4 --conf spark.cores.max=8
--conf spark.kryoserializer.buffer.mb=256 --conf
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
spark.akka.frameSize=512

  1   2   3   4   >