Re: Dataframe schema...

2016-10-21 Thread Koert Kuipers
This rather innocent looking optimization flag nullable has caused a lot of
bugs... Makes me wonder if we are better off without it

On Oct 21, 2016 8:37 PM, "Muthu Jayakumar"  wrote:

> Thanks Cheng Lian for opening the JIRA. I found this with Spark 2.0.0.
>
> Thanks,
> Muthu
>
> On Fri, Oct 21, 2016 at 3:30 PM, Cheng Lian  wrote:
>
>> Yea, confirmed. While analyzing unions, we treat StructTypes with
>> different field nullabilities as incompatible types and throws this error.
>>
>> Opened https://issues.apache.org/jira/browse/SPARK-18058 to track this
>> issue. Thanks for reporting!
>>
>> Cheng
>>
>> On 10/21/16 3:15 PM, Cheng Lian wrote:
>>
>> Hi Muthu,
>>
>> What is the version of Spark are you using? This seems to be a bug in the
>> analysis phase.
>>
>> Cheng
>>
>> On 10/21/16 12:50 PM, Muthu Jayakumar wrote:
>>
>> Sorry for the late response. Here is what I am seeing...
>>
>>
>> Schema from parquet file.
>>
>> d1.printSchema()
>>
>> root
>>  |-- task_id: string (nullable = true)
>>  |-- task_name: string (nullable = true)
>>  |-- some_histogram: struct (nullable = true)
>>  ||-- values: array (nullable = true)
>>  |||-- element: double (containsNull = true)
>>  ||-- freq: array (nullable = true)
>>  |||-- element: long (containsNull = true)
>>
>> d2.printSchema() //Data created using dataframe and/or processed before 
>> writing to parquet file.
>>
>> root
>>  |-- task_id: string (nullable = true)
>>  |-- task_name: string (nullable = true)
>>  |-- some_histogram: struct (nullable = true)
>>  ||-- values: array (nullable = true)
>>  |||-- element: double (containsNull = false)
>>  ||-- freq: array (nullable = true)
>>  |||-- element: long (containsNull = false)
>>
>> d1.union(d2).printSchema()
>>
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> unresolved operator 'Union;
>> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.f
>> ailAnalysis(CheckAnalysis.scala:40)
>> at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis
>> (Analyzer.scala:58)
>> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>> n$checkAnalysis$1.apply(CheckAnalysis.scala:361)
>> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>> n$checkAnalysis$1.apply(CheckAnalysis.scala:67)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeN
>> ode.scala:126)
>> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.c
>> heckAnalysis(CheckAnalysis.scala:67)
>> at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysi
>> s(Analyzer.scala:58)
>> at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed
>> (QueryExecution.scala:49)
>> at org.apache.spark.sql.Dataset.(Dataset.scala:161)
>> at org.apache.spark.sql.Dataset.(Dataset.scala:167)
>> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
>> at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
>> at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)
>>
>> Please advice,
>> Muthu
>>
>> On Thu, Oct 20, 2016 at 1:46 AM, Michael Armbrust > > wrote:
>>
>>> What is the issue you see when unioning?
>>>
>>> On Wed, Oct 19, 2016 at 6:39 PM, Muthu Jayakumar 
>>> wrote:
>>>
 Hello Michael,

 Thank you for looking into this query. In my case there seem to be an
 issue when I union a parquet file read from disk versus another dataframe
 that I construct in-memory. The only difference I see is the containsNull =
 true. In fact, I do not see any errors with union on the simple schema of
 "col1 thru col4" above. But the problem seem to exist only on that
 "some_histogram" column which contains the mixed containsNull = true/false.
 Let me know if this helps.

 Thanks,
 Muthu



 On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> Nullable is just a hint to the optimizer that its impossible for there
> to be a null value in this column, so that it can avoid generating code 
> for
> null-checks.  When in doubt, we set nullable=true since it is always safer
> to check.
>
> Why in particular are you trying to change the nullability of the
> column?
>
> On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar 
> wrote:
>
>> Hello there,
>>
>> I am trying to understand how and when does DataFrame (or Dataset)
>> sets nullable = true vs false on a schema.
>>
>> Here is my observation from a sample code I tried...
>>
>>
>> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3,
>> "c", 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
>> lit("bla")).printSchema()
>> root
>>  |-- col1: integer (nullable = false)
>>  |-- col2: string (nullable = true)
>>  |-- col3: double (nullable = false)
>>  |-- col4: string 

Re: Dataframe schema...

2016-10-21 Thread Muthu Jayakumar
Thanks Cheng Lian for opening the JIRA. I found this with Spark 2.0.0.

Thanks,
Muthu

On Fri, Oct 21, 2016 at 3:30 PM, Cheng Lian  wrote:

> Yea, confirmed. While analyzing unions, we treat StructTypes with
> different field nullabilities as incompatible types and throws this error.
>
> Opened https://issues.apache.org/jira/browse/SPARK-18058 to track this
> issue. Thanks for reporting!
>
> Cheng
>
> On 10/21/16 3:15 PM, Cheng Lian wrote:
>
> Hi Muthu,
>
> What is the version of Spark are you using? This seems to be a bug in the
> analysis phase.
>
> Cheng
>
> On 10/21/16 12:50 PM, Muthu Jayakumar wrote:
>
> Sorry for the late response. Here is what I am seeing...
>
>
> Schema from parquet file.
>
> d1.printSchema()
>
> root
>  |-- task_id: string (nullable = true)
>  |-- task_name: string (nullable = true)
>  |-- some_histogram: struct (nullable = true)
>  ||-- values: array (nullable = true)
>  |||-- element: double (containsNull = true)
>  ||-- freq: array (nullable = true)
>  |||-- element: long (containsNull = true)
>
> d2.printSchema() //Data created using dataframe and/or processed before 
> writing to parquet file.
>
> root
>  |-- task_id: string (nullable = true)
>  |-- task_name: string (nullable = true)
>  |-- some_histogram: struct (nullable = true)
>  ||-- values: array (nullable = true)
>  |||-- element: double (containsNull = false)
>  ||-- freq: array (nullable = true)
>  |||-- element: long (containsNull = false)
>
> d1.union(d2).printSchema()
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> unresolved operator 'Union;
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.
> failAnalysis(CheckAnalysis.scala:40)
> at org.apache.spark.sql.catalyst.analysis.Analyzer.
> failAnalysis(Analyzer.scala:58)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:126)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.
> checkAnalysis(CheckAnalysis.scala:67)
> at org.apache.spark.sql.catalyst.analysis.Analyzer.
> checkAnalysis(Analyzer.scala:58)
> at org.apache.spark.sql.execution.QueryExecution.
> assertAnalyzed(QueryExecution.scala:49)
> at org.apache.spark.sql.Dataset.(Dataset.scala:161)
> at org.apache.spark.sql.Dataset.(Dataset.scala:167)
> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
> at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
> at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)
>
> Please advice,
> Muthu
>
> On Thu, Oct 20, 2016 at 1:46 AM, Michael Armbrust 
> wrote:
>
>> What is the issue you see when unioning?
>>
>> On Wed, Oct 19, 2016 at 6:39 PM, Muthu Jayakumar 
>> wrote:
>>
>>> Hello Michael,
>>>
>>> Thank you for looking into this query. In my case there seem to be an
>>> issue when I union a parquet file read from disk versus another dataframe
>>> that I construct in-memory. The only difference I see is the containsNull =
>>> true. In fact, I do not see any errors with union on the simple schema of
>>> "col1 thru col4" above. But the problem seem to exist only on that
>>> "some_histogram" column which contains the mixed containsNull = true/false.
>>> Let me know if this helps.
>>>
>>> Thanks,
>>> Muthu
>>>
>>>
>>>
>>> On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 Nullable is just a hint to the optimizer that its impossible for there
 to be a null value in this column, so that it can avoid generating code for
 null-checks.  When in doubt, we set nullable=true since it is always safer
 to check.

 Why in particular are you trying to change the nullability of the
 column?

 On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar 
 wrote:

> Hello there,
>
> I am trying to understand how and when does DataFrame (or Dataset)
> sets nullable = true vs false on a schema.
>
> Here is my observation from a sample code I tried...
>
>
> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3,
> "c", 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
> lit("bla")).printSchema()
> root
>  |-- col1: integer (nullable = false)
>  |-- col2: string (nullable = true)
>  |-- col3: double (nullable = false)
>  |-- col4: string (nullable = false)
>
>
> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3,
> "c", 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
> lit("bla")).write.parquet("/tmp/sample.parquet")
>
> scala> spark.read.parquet("/tmp/sample.parquet").printSchema()
> root
>  |-- col1: integer (nullable = true)

RDD to Dataset results in fixed number of partitions

2016-10-21 Thread Spark User
Hi All,

I'm trying to create a Dataset from RDD and do groupBy on the Dataset. The
groupBy stage runs with 200 partitions. Although the RDD had 5000
partitions. I also seem to have no way to change that 200 partitions on the
Dataset to some other large number. This seems to be affecting the
parallelism as there are 700 executors and only 200 partitions.

The code looks somewhat like:

val sqsDstream = sparkStreamingContext.union((1 to 3).map(_ =>
  sparkStreamingContext.receiverStream(new SQSReceiver())
).transform(_.repartition(5000))

sqsDstream.foreachRDD(rdd => {
  val dataSet = sparkSession.createDataset(rdd)
  val aggregatedDataset: Dataset[Row] =
  dataSet.groupBy("primaryKey").agg(udaf("key1"))
  aggregatedDataset.foreachPartition(partition => {
 //write to output stream
   })
})


Any pointers would be appreciated.
Thanks,
Bharath


Re: Writing to Parquet Job turns to wait mode after even completion of job

2016-10-21 Thread Chetan Khatri
Hello Cheng,

Thank you for response.

I am using spark 1.6.1, i am writing around 350 gz parquet part files for
single table. Processed around 180 GB of Data using Spark.



On Sat, Oct 22, 2016 at 3:41 AM, Cheng Lian  wrote:

> What version of Spark are you using and how many output files does the job
> writes out?
>
> By default, Spark versions before 1.6 (not including) writes Parquet
> summary files when committing the job. This process reads footers from all
> Parquet files in the destination directory and merges them together. This
> can be particularly bad if you are appending a small amount of data to a
> large existing Parquet dataset.
>
> If that's the case, you may disable Parquet summary files by setting
> Hadoop configuration " parquet.enable.summary-metadata" to false.
>
> We've disabled it by default since 1.6.0
>
> Cheng
>
> On 10/21/16 1:47 PM, Chetan Khatri wrote:
>
> Hello Spark Users,
>
> I am writing around 10 GB of Processed Data to Parquet where having 1 TB
> of HDD and 102 GB of RAM, 16 vCore machine on Google Cloud.
>
> Every time, i write to parquet. it shows on Spark UI that stages succeeded
> but on spark shell it hold context on wait mode for almost 10 mins. then it
> clears broadcast, accumulator shared variables.
>
> Can we sped up this thing ?
>
> Thanks.
>
> --
> Yours Aye,
> Chetan Khatri.
> M.+91 7 80574
> Data Science Researcher
> INDIA
>
> ​​Statement of Confidentiality
> 
> The contents of this e-mail message and any attachments are confidential
> and are intended solely for addressee. The information may also be legally
> privileged. This transmission is sent in trust, for the sole purpose of
> delivery to the intended recipient. If you have received this transmission
> in error, any use, reproduction or dissemination of this transmission is
> strictly prohibited. If you are not the intended recipient, please
> immediately notify the sender by reply e-mail or phone and delete this
> message and its attachments, if any.​​
>
>
>


-- 
Yours Aye,
Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are confidential
and are intended solely for addressee. The information may also be legally
privileged. This transmission is sent in trust, for the sole purpose of
delivery to the intended recipient. If you have received this transmission
in error, any use, reproduction or dissemination of this transmission is
strictly prohibited. If you are not the intended recipient, please
immediately notify the sender by reply e-mail or phone and delete this
message and its attachments, if any.​​


Re: RDD groupBy() then random sort each group ?

2016-10-21 Thread Cheng Lian
I think it would much easier to use DataFrame API to do this by doing 
local sort using randn() as key. For example, in Spark 2.0:


val df = spark.range(100)
val shuffled = df.repartition($"id" % 10).sortWithinPartitions(randn(42))

Replace df with a DataFrame wrapping your RDD, and $"id" % 10 with the 
key to group by, then you can get the RDD from shuffled and do the 
following operations you want.


Cheng


On 10/20/16 10:53 AM, Yang wrote:
in my application, I group by same training samples by their 
model_id's  (the input table contains training samples for 100k 
different models), then each group ends up having about 1 million 
training samples,


then I feed that group of samples to a little Logistic Regression 
solver (SGD), but SGD requires the input data to be shuffled randomly 
(so that positive and negative samples are evenly distributed), so now 
I do something like


my_input_rdd.groupBy(x=>x.model_id).map(x=>
val (model_id, group_of_rows) = x

 (model_id, group_of_rows.toSeq().shuffle() )

).map(x=> (x._1, train_sgd(x._2))


the issue is that on the 3rd row above, I had to explicitly call 
toSeq() on the group_of_rows in order to shuffle, which is an Iterable 
and not Seq. now I have to load the entire 1 million rows into memory, 
and in practice I've seen my tasks OOM and GC time goes crazy (about 
50% of total run time). I suspect this toSeq() is the reason, since 
doing a simple count() on the groupBy() result works fine.


I am planning to shuffle the my_input_rdd first, and then groupBy(), 
and not do the toSeq().shuffle(). intuitively the input rdd is already 
shuffled, so UNLESS groupBy() tries to do some sorting, the rows in 
the group SHOULD remain shuffled  but overall this remains rather 
flimsy.


any ideas to do this more reliably?

thanks!




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark 2.0.0] error when unioning to an empty dataset

2016-10-21 Thread Cheng Lian
Efe - You probably hit this bug: 
https://issues.apache.org/jira/browse/SPARK-18058



On 10/21/16 2:03 AM, Agraj Mangal wrote:
I have seen this error sometimes when the elements in the schema have 
different nullabilities. Could you print the schema for data and for 
someCode.thatReturnsADataset() and see if there is any difference 
between the two ?


On Fri, Oct 21, 2016 at 9:14 AM, Efe Selcuk > wrote:


Thanks for the response. What do you mean by "semantically" the
same? They're both Datasets of the same type, which is a case
class, so I would expect compile-time integrity of the data. Is
there a situation where this wouldn't be the case?

Interestingly enough, if I instead create an empty rdd with
sparkContext.emptyRDD of the same case class type, it works!

So something like:
var data = spark.sparkContext.emptyRDD[SomeData]

// loop
data = data.union(someCode.thatReturnsADataset().rdd)
// end loop

data.toDS //so I can union it to the actual Dataset I have elsewhere

On Thu, Oct 20, 2016 at 8:34 PM Agraj Mangal > wrote:

I believe this normally comes when Spark is unable to perform
union due to "difference" in schema of the operands. Can you
check if the schema of both the datasets are semantically same ?

On Tue, Oct 18, 2016 at 9:06 AM, Efe Selcuk
> wrote:

Bump!

On Thu, Oct 13, 2016 at 8:25 PM Efe Selcuk
> wrote:

I have a use case where I want to build a dataset
based off of conditionally available data. I thought
I'd do something like this:

case class SomeData( ... ) // parameters are basic
encodable types like strings and BigDecimals

var data = spark.emptyDataset[SomeData]

// loop, determining what data to ingest and process
into datasets
  data = data.union(someCode.thatReturnsADataset)
// end loop

However I get a runtime exception:

Exception in thread "main"
org.apache.spark.sql.AnalysisException: unresolved
operator 'Union;
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at

org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at

org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at

org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at

org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at

org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at
org.apache.spark.sql.Dataset.(Dataset.scala:161)
at
org.apache.spark.sql.Dataset.(Dataset.scala:167)
at
org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at
org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at
org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Granted, I'm new at Spark so this might be an
anti-pattern, so I'm open to suggestions. However it
doesn't seem like I'm doing anything incorrect here,
the types are correct. Searching for this error online
returns results seemingly about working in dataframes
and having mismatching schemas or a different order of
fields, and it seems like bugfixes have gone into
place for those cases.

Thanks in advance.
Efe




-- 
Thanks & Regards,

Agraj Mangal




--
Thanks & Regards,
Agraj Mangal




Re: How to iterate the element of an array in DataFrame?

2016-10-21 Thread Cheng Lian
You may either use SQL function "array" and "named_struct" or define a 
case class with expected field names.


Cheng


On 10/21/16 2:45 AM, 颜发才(Yan Facai) wrote:

My expectation is:
root
|-- tag: vector

namely, I want to extract from:
[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
to:
Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))

I believe it needs two step:
1. val tag2vec = {tag: Array[Structure] => Vector}
2. mblog_tags.withColumn("vec", tag2vec(col("tag"))

But, I have no idea of how to describe the Array[Structure] in the 
DataFrame.






On Fri, Oct 21, 2016 at 4:51 PM, lk_spark > wrote:


how about change Schema from
root
 |-- category.firstCategory: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- category: string (nullable = true)
 |||-- weight: string (nullable = true)
to:
root
 |-- category: string (nullable = true)
 |-- weight: string (nullable = true)
2016-10-21

lk_spark


*发件人:*颜发才(Yan Facai) >
*发送时间:*2016-10-21 15:35
*主题:*Re: How to iterate the element of an array in DataFrame?
*收件人:*"user.spark">
*抄送:*
I don't know how to construct
`array>`.
Could anyone help me?

I try to get the array by :
scala> mblog_tags.map(_.getSeq[(String, String)](0))

while the result is:
res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] =
[value: array>]


How to express `struct` ?



On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai)
> wrote:

Hi, I want to extract the attribute `weight` of an array,
and combine them to construct a sparse vector.

### My data is like this:

scala> mblog_tags.printSchema
root
 |-- category.firstCategory: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- category: string (nullable = true)
 |||-- weight: string (nullable = true)


scala> mblog_tags.show(false)
+--+
|category.firstCategory |
+--+
|[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
|[[tagCategory_029, 0.9]] |
|[[tagCategory_029, 0.8]]|
+--+


### And expected:
Vectors.sparse(100, Array(60, 29), Array(0.8, 0.7))
Vectors.sparse(100, Array(29), Array(0.9))
Vectors.sparse(100, Array(29), Array(0.8))

How to iterate an array in DataFrame?
Thanks.









Re: Dataframe schema...

2016-10-21 Thread Cheng Lian
Yea, confirmed. While analyzing unions, we treat StructTypes with 
different field nullabilities as incompatible types and throws this error.


Opened https://issues.apache.org/jira/browse/SPARK-18058 to track this 
issue. Thanks for reporting!


Cheng


On 10/21/16 3:15 PM, Cheng Lian wrote:


Hi Muthu,

What is the version of Spark are you using? This seems to be a bug in 
the analysis phase.


Cheng


On 10/21/16 12:50 PM, Muthu Jayakumar wrote:

Sorry for the late response. Here is what I am seeing...


Schema from parquet file.
d1.printSchema()
root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = true)

d2.printSchema() //Data created using dataframe and/or processed before writing to 
parquet file.

root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = false)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = false)

d1.union(d2).printSchema()
Exception in thread "main" org.apache.spark.sql.AnalysisException: 
unresolved operator 'Union;
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)

at org.apache.spark.sql.Dataset.(Dataset.scala:161)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Please advice,
Muthu

On Thu, Oct 20, 2016 at 1:46 AM, Michael Armbrust 
> wrote:


What is the issue you see when unioning?

On Wed, Oct 19, 2016 at 6:39 PM, Muthu Jayakumar
> wrote:

Hello Michael,

Thank you for looking into this query. In my case there seem
to be an issue when I union a parquet file read from disk
versus another dataframe that I construct in-memory. The only
difference I see is the containsNull = true. In fact, I do
not see any errors with union on the simple schema of "col1
thru col4" above. But the problem seem to exist only on that
"some_histogram" column which contains the mixed containsNull
= true/false.
Let me know if this helps.

Thanks,
Muthu



On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust
> wrote:

Nullable is just a hint to the optimizer that its
impossible for there to be a null value in this column,
so that it can avoid generating code for null-checks. 
When in doubt, we set nullable=true since it is always

safer to check.

Why in particular are you trying to change the
nullability of the column?

On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar
> wrote:

Hello there,

I am trying to understand how and when does DataFrame
(or Dataset) sets nullable = true vs false on a schema.

Here is my observation from a sample code I tried...


scala> spark.createDataset(Seq((1, "a", 2.0d), (2,
"b", 2.0d), (3, "c", 2.0d))).toDF("col1", "col2",
"col3").withColumn("col4", lit("bla")).printSchema()
root
 |-- col1: integer (nullable = false)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = false)
 |-- col4: string (nullable = false)


scala> spark.createDataset(Seq((1, "a", 2.0d), (2,
"b", 2.0d), (3, "c", 2.0d))).toDF("col1", "col2",
"col3").withColumn("col4",
lit("bla")).write.parquet("/tmp/sample.parquet")

scala>

Re: Dataframe schema...

2016-10-21 Thread Cheng Lian

Hi Muthu,

What is the version of Spark are you using? This seems to be a bug in 
the analysis phase.


Cheng


On 10/21/16 12:50 PM, Muthu Jayakumar wrote:

Sorry for the late response. Here is what I am seeing...


Schema from parquet file.
d1.printSchema()
root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = true)

d2.printSchema() //Data created using dataframe and/or processed before writing to 
parquet file.

root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = false)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = false)

d1.union(d2).printSchema()
Exception in thread "main" org.apache.spark.sql.AnalysisException: 
unresolved operator 'Union;
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)

at org.apache.spark.sql.Dataset.(Dataset.scala:161)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Please advice,
Muthu

On Thu, Oct 20, 2016 at 1:46 AM, Michael Armbrust 
> wrote:


What is the issue you see when unioning?

On Wed, Oct 19, 2016 at 6:39 PM, Muthu Jayakumar
> wrote:

Hello Michael,

Thank you for looking into this query. In my case there seem
to be an issue when I union a parquet file read from disk
versus another dataframe that I construct in-memory. The only
difference I see is the containsNull = true. In fact, I do not
see any errors with union on the simple schema of "col1 thru
col4" above. But the problem seem to exist only on that
"some_histogram" column which contains the mixed containsNull
= true/false.
Let me know if this helps.

Thanks,
Muthu



On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust
> wrote:

Nullable is just a hint to the optimizer that its
impossible for there to be a null value in this column, so
that it can avoid generating code for null-checks. When in
doubt, we set nullable=true since it is always safer to
check.

Why in particular are you trying to change the nullability
of the column?

On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar
> wrote:

Hello there,

I am trying to understand how and when does DataFrame
(or Dataset) sets nullable = true vs false on a schema.

Here is my observation from a sample code I tried...


scala> spark.createDataset(Seq((1, "a", 2.0d), (2,
"b", 2.0d), (3, "c", 2.0d))).toDF("col1", "col2",
"col3").withColumn("col4", lit("bla")).printSchema()
root
 |-- col1: integer (nullable = false)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = false)
 |-- col4: string (nullable = false)


scala> spark.createDataset(Seq((1, "a", 2.0d), (2,
"b", 2.0d), (3, "c", 2.0d))).toDF("col1", "col2",
"col3").withColumn("col4",
lit("bla")).write.parquet("/tmp/sample.parquet")

scala>
spark.read.parquet("/tmp/sample.parquet").printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = true)
 |-- col4: string (nullable = true)


The place where 

Re: Writing to Parquet Job turns to wait mode after even completion of job

2016-10-21 Thread Cheng Lian
What version of Spark are you using and how many output files does the 
job writes out?


By default, Spark versions before 1.6 (not including) writes Parquet 
summary files when committing the job. This process reads footers from 
all Parquet files in the destination directory and merges them together. 
This can be particularly bad if you are appending a small amount of data 
to a large existing Parquet dataset.


If that's the case, you may disable Parquet summary files by setting 
Hadoop configuration " parquet.enable.summary-metadata" to false.


We've disabled it by default since 1.6.0

Cheng


On 10/21/16 1:47 PM, Chetan Khatri wrote:

Hello Spark Users,

I am writing around 10 GB of Processed Data to Parquet where having 1 
TB of HDD and 102 GB of RAM, 16 vCore machine on Google Cloud.


Every time, i write to parquet. it shows on Spark UI that stages 
succeeded but on spark shell it hold context on wait mode for almost 
10 mins. then it clears broadcast, accumulator shared variables.


Can we sped up this thing ?

Thanks.

--
Yours Aye,
Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are 
confidential and are intended solely for addressee. The information 
may also be legally privileged. This transmission is sent in trust, 
for the sole purpose of delivery to the intended recipient. If you 
have received this transmission in error, any use, reproduction or 
dissemination of this transmission is strictly prohibited. If you are 
not the intended recipient, please immediately notify the sender by 
reply e-mail or phone and delete this message and its attachments, if 
any.​​




Writing to Parquet Job turns to wait mode after even completion of job

2016-10-21 Thread Chetan Khatri
Hello Spark Users,

I am writing around 10 GB of Processed Data to Parquet where having 1 TB of
HDD and 102 GB of RAM, 16 vCore machine on Google Cloud.

Every time, i write to parquet. it shows on Spark UI that stages succeeded
but on spark shell it hold context on wait mode for almost 10 mins. then it
clears broadcast, accumulator shared variables.

Can we sped up this thing ?

Thanks.

-- 
Yours Aye,
Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are confidential
and are intended solely for addressee. The information may also be legally
privileged. This transmission is sent in trust, for the sole purpose of
delivery to the intended recipient. If you have received this transmission
in error, any use, reproduction or dissemination of this transmission is
strictly prohibited. If you are not the intended recipient, please
immediately notify the sender by reply e-mail or phone and delete this
message and its attachments, if any.​​


Re: Dataframe schema...

2016-10-21 Thread Muthu Jayakumar
Sorry for the late response. Here is what I am seeing...


Schema from parquet file.

d1.printSchema()

root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = true)

d2.printSchema() //Data created using dataframe and/or processed
before writing to parquet file.

root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = false)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = false)

d1.union(d2).printSchema()

Exception in thread "main" org.apache.spark.sql.AnalysisException:
unresolved operator 'Union;
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset.(Dataset.scala:161)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Please advice,
Muthu

On Thu, Oct 20, 2016 at 1:46 AM, Michael Armbrust 
wrote:

> What is the issue you see when unioning?
>
> On Wed, Oct 19, 2016 at 6:39 PM, Muthu Jayakumar 
> wrote:
>
>> Hello Michael,
>>
>> Thank you for looking into this query. In my case there seem to be an
>> issue when I union a parquet file read from disk versus another dataframe
>> that I construct in-memory. The only difference I see is the containsNull =
>> true. In fact, I do not see any errors with union on the simple schema of
>> "col1 thru col4" above. But the problem seem to exist only on that
>> "some_histogram" column which contains the mixed containsNull = true/false.
>> Let me know if this helps.
>>
>> Thanks,
>> Muthu
>>
>>
>>
>> On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust > > wrote:
>>
>>> Nullable is just a hint to the optimizer that its impossible for there
>>> to be a null value in this column, so that it can avoid generating code for
>>> null-checks.  When in doubt, we set nullable=true since it is always safer
>>> to check.
>>>
>>> Why in particular are you trying to change the nullability of the column?
>>>
>>> On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar 
>>> wrote:
>>>
 Hello there,

 I am trying to understand how and when does DataFrame (or Dataset) sets
 nullable = true vs false on a schema.

 Here is my observation from a sample code I tried...


 scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3,
 "c", 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
 lit("bla")).printSchema()
 root
  |-- col1: integer (nullable = false)
  |-- col2: string (nullable = true)
  |-- col3: double (nullable = false)
  |-- col4: string (nullable = false)


 scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3,
 "c", 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
 lit("bla")).write.parquet("/tmp/sample.parquet")

 scala> spark.read.parquet("/tmp/sample.parquet").printSchema()
 root
  |-- col1: integer (nullable = true)
  |-- col2: string (nullable = true)
  |-- col3: double (nullable = true)
  |-- col4: string (nullable = true)


 The place where this seem to get me into trouble is when I try to union
 one data-structure from in-memory (notice that in the below schema the
 highlighted element is represented as 'false' for in-memory created schema)
 and one from file that starts out with a schema like below...

  |-- some_histogram: struct (nullable = true)
  ||-- values: array (nullable = true)
  |||-- element: double (containsNull = true)
  ||-- freq: array (nullable = true)
  |||-- element: long (containsNull = true)

 Is there a way to convert this attribute from true to false without
 running any mapping / udf on that 

About Reading Parquet - failed to read single gz parquet - failed entire transformation

2016-10-21 Thread Chetan Khatri
Hello Spark Users,

I am working on Historical Data Processing for Telecom provider where I
processed single Job and output wrote to parquet in append mode, while
reading i am able to read the parquet view table because it's just lazy
evolution.

But when i apply action on top of that by joining my other two sources
MongoDB collection, JSON from S3 Bucket and Parquet source. It throws error
as below, which means there are single gz parquet file failed to access.
where actually by checking using hadoop fs command, file is there. after
getting this error and validating i deleted specific that file, i though it
will be okey if those data which were there in that specific file that will
not get transformed but other will be , but afterwords also spark
transformation got failed.

I must have to re-run Data Ingestion Spark Job ! Which was time wasting
stuff.

So what is exact solution of this error !


2016-10-21 06:54:03,2918 ERROR Client
fs/client/fileclient/cc/client.cc:1802 Thread: 20779 Open failed for file
/user/ubuntu/UserMaster/part-r-00113-41456b8f-4c6b-46e6-b70f-ca, LookupFid
error No such file or directory(2)
2016-10-21 06:54:03,2918 ERROR JniCommon
fs/client/fileclient/cc/jni_MapRClient.cc:2488 Thread: 20779 getBlockInfo
failed, Could not open file
/user/ubuntu/UserMaster/part-r-00113-41456b8f-4c6b-46e6-b70f-ca
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
tree:
TungstenExchange hashpartitioning(trim(userId#24),1024), None
+- Scan
ParquetRelation[userId#24,screenSize#27,platformVersion#21,productName#35,isPixelHit#19L,browser#23,operator#33,circle#26,platform#30,browserVersion#29,brandName#36]
InputPaths: maprfs:/user/ubuntu/UserMaster

  at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
  at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
  at org.apache.spark.sql.execution.Sort.doExecute(Sort.scala:64)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
  at
org.apache.spark.sql.execution.joins.SortMergeOuterJoin.doExecute(SortMergeOuterJoin.scala:107)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
  at
org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)


-- 
Yours Aye,
Chetan Khatri.


Re: Spark 2.0 with Kafka 0.10 exception

2016-10-21 Thread Cody Koeninger
That's a good point... the dstreams package is still on 10.0.1 though.
I'll make a ticket to update it.

On Fri, Oct 21, 2016 at 1:02 PM, Srikanth  wrote:
> Kakfa 0.10.1 release separates poll() from heartbeat. So session.timeout.ms
> & max.poll.interval.ms can be set differently.
> I'll leave it to you on how to add this to docs!
>
>
> On Thu, Oct 20, 2016 at 1:41 PM, Cody Koeninger  wrote:
>>
>> Right on, I put in a PR to make a note of that in the docs.
>>
>> On Thu, Oct 20, 2016 at 12:13 PM, Srikanth  wrote:
>> > Yeah, setting those params helped.
>> >
>> > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger 
>> > wrote:
>> >>
>> >> 60 seconds for a batch is above the default settings in kafka related
>> >> to heartbeat timeouts, so that might be related.  Have you tried
>> >> tweaking session.timeout.ms, heartbeat.interval.ms, or related
>> >> configs?
>> >>
>> >> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth 
>> >> wrote:
>> >> > Bringing this thread back as I'm seeing this exception on a
>> >> > production
>> >> > kafka
>> >> > cluster.
>> >> >
>> >> > I have two Spark streaming apps reading the same topic. App1 has
>> >> > batch
>> >> > interval 2secs and app2 has 60secs.
>> >> > Both apps are running on the same cluster on similar hardware. I see
>> >> > this
>> >> > exception only in app2 and fairly consistently.
>> >> >
>> >> > Difference I see between the apps is
>> >> > App1
>> >> >   spark.streaming.kafka.maxRatePerPartition, 6000
>> >> >   batch interval 2 secs
>> >> > App2
>> >> >   spark.streaming.kafka.maxRatePerPartition, 1
>> >> >   batch interval 60 secs
>> >> >
>> >> > All other kafka/spark related configs are same for both apps.
>> >> >   spark.streaming.kafka.consumer.poll.ms = 4096
>> >> >   spark.streaming.backpressure.enabled = true
>> >> >
>> >> > Not sure if pre-fetching or caching is messing things up.
>> >> >
>> >> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0
>> >> > (TID
>> >> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError:
>> >> > assertion
>> >> > failed: Failed to get records for
>> >> > spark-executor-StreamingEventSplitProd
>> >> > mt_event 6 49091480 after polling for 4096
>> >> > at scala.Predef$.assert(Predef.scala:170)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >> > at
>> >> > scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >> > at
>> >> > scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >> > at
>> >> > scala.collection.Iterator$$anon$21.next(Iterator.scala:838)
>> >> >
>> >> >
>> >> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger 
>> >> > wrote:
>> >> >>
>> >> >> That's not what I would have expected to happen with a lower cache
>> >> >> setting, but in general disabling the cache isn't something you want
>> >> >> to do with the new kafka consumer.
>> >> >>
>> >> >>
>> >> >> As far as the original issue, are you seeing those polling errors
>> >> >> intermittently, or consistently?  From your description, it sounds
>> >> >> like retry is working correctly.
>> >> >>
>> >> >>
>> >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth 
>> >> >> wrote:
>> >> >> > Setting those two results in below exception.
>> >> >> > No.of executors < no.of partitions. Could that be triggering this?
>> >> >> >
>> >> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage
>> >> >> > 2.0
>> >> >> > (TID 9)
>> >> >> > java.util.ConcurrentModificationException: KafkaConsumer is not
>> >> >> > safe
>> >> >> > for
>> >> >> > multi-threaded access
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> >> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
>> >> >> > at java.util.HashMap.putVal(Unknown Source)
>> >> >> > at java.util.HashMap.put(Unknown Source)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210)
>> >> >> > at
>> 

Re: Spark 2.0 with Kafka 0.10 exception

2016-10-21 Thread Srikanth
Kakfa 0.10.1 release separates poll() from heartbeat. So session.timeout.ms
& max.poll.interval.ms can be set differently.
I'll leave it to you on how to add this to docs!


On Thu, Oct 20, 2016 at 1:41 PM, Cody Koeninger  wrote:

> Right on, I put in a PR to make a note of that in the docs.
>
> On Thu, Oct 20, 2016 at 12:13 PM, Srikanth  wrote:
> > Yeah, setting those params helped.
> >
> > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger 
> wrote:
> >>
> >> 60 seconds for a batch is above the default settings in kafka related
> >> to heartbeat timeouts, so that might be related.  Have you tried
> >> tweaking session.timeout.ms, heartbeat.interval.ms, or related
> >> configs?
> >>
> >> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth 
> wrote:
> >> > Bringing this thread back as I'm seeing this exception on a production
> >> > kafka
> >> > cluster.
> >> >
> >> > I have two Spark streaming apps reading the same topic. App1 has batch
> >> > interval 2secs and app2 has 60secs.
> >> > Both apps are running on the same cluster on similar hardware. I see
> >> > this
> >> > exception only in app2 and fairly consistently.
> >> >
> >> > Difference I see between the apps is
> >> > App1
> >> >   spark.streaming.kafka.maxRatePerPartition, 6000
> >> >   batch interval 2 secs
> >> > App2
> >> >   spark.streaming.kafka.maxRatePerPartition, 1
> >> >   batch interval 60 secs
> >> >
> >> > All other kafka/spark related configs are same for both apps.
> >> >   spark.streaming.kafka.consumer.poll.ms = 4096
> >> >   spark.streaming.backpressure.enabled = true
> >> >
> >> > Not sure if pre-fetching or caching is messing things up.
> >> >
> >> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0
> >> > (TID
> >> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError:
> >> > assertion
> >> > failed: Failed to get records for spark-executor-
> StreamingEventSplitProd
> >> > mt_event 6 49091480 after polling for 4096
> >> > at scala.Predef$.assert(Predef.scala:170)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:
> 409)
> >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:
> 409)
> >> > at scala.collection.Iterator$$anon$21.next(Iterator.scala:
> 838)
> >> >
> >> >
> >> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger 
> >> > wrote:
> >> >>
> >> >> That's not what I would have expected to happen with a lower cache
> >> >> setting, but in general disabling the cache isn't something you want
> >> >> to do with the new kafka consumer.
> >> >>
> >> >>
> >> >> As far as the original issue, are you seeing those polling errors
> >> >> intermittently, or consistently?  From your description, it sounds
> >> >> like retry is working correctly.
> >> >>
> >> >>
> >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth 
> wrote:
> >> >> > Setting those two results in below exception.
> >> >> > No.of executors < no.of partitions. Could that be triggering this?
> >> >> >
> >> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage
> 2.0
> >> >> > (TID 9)
> >> >> > java.util.ConcurrentModificationException: KafkaConsumer is not
> safe
> >> >> > for
> >> >> > multi-threaded access
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1360)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> >> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> >> >> > at java.util.HashMap.putVal(Unknown Source)
> >> >> > at java.util.HashMap.put(Unknown Source)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:158)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.(KafkaRDD.scala:210)
> >> >> > at
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> >> > at
> >> >> >
> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

Re: Where condition on columns of Arrays does no longer work in spark 2

2016-10-21 Thread Cheng Lian

Thanks for reporting! It's a bug, just filed a ticket to track it:

https://issues.apache.org/jira/browse/SPARK-18053

Cheng


On 10/20/16 1:54 AM, filthysocks wrote:
I have a Column in a DataFrame that contains Arrays and I wanna filter 
for equality. It does work fine in spark 1.6 but not in 2.0 In spark 
1.6.2:

import org.apache.spark.sql.SQLContext

case class DataTest(lists: Seq[Int])

val sql = new SQLContext(sc)
val data = sql.createDataFrame(sc.parallelize(Seq(
DataTest(Seq(1)),
DataTest(Seq(4,5,6))
   )))
data.registerTempTable("uiae")
sql.sql(s"SELECT lists FROM uiae WHERE 
lists=Array(1)").collect().foreach(println)
returns:[WrappedArray(1)]
In spark 2.0.0:
import spark.implicits._

case class DataTest(lists: Seq[Int])
val data = Seq(DataTest(Seq(1)),DataTest(Seq(4,5,6))).toDS()

data.createOrReplaceTempView("uiae")
spark.sql(s"SELECT lists FROM uiae WHERE 
lists=Array(1)").collect().foreach(println)
returns: nothing

Is that a bug? Or is it just done differently in spark 2?

View this message in context: Where condition on columns of Arrays 
does no longer work in spark 2 

Sent from the Apache Spark User List mailing list archive 
 at Nabble.com.




Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Cody Koeninger
DStream checkpoints have all kinds of other difficulties, biggest one
being you can't use a checkpoint if your app code has been updated.
If you can avoid checkpoints in general, I would.

On Fri, Oct 21, 2016 at 11:17 AM, Erwan ALLAIN  wrote:
> Thanks for the fast answer !
>
> I just feel annoyed and frustrated not to be able to use spark checkpointing
> because I believe that there mechanism has been correctly tested.
> I'm afraid that reinventing the wheel can lead to side effects that I don't
> see now ...
>
> Anyway thanks again, I know what I have to do :)
>
> On Fri, Oct 21, 2016 at 5:05 PM, Cody Koeninger  wrote:
>>
>> 0.  If your processing time is regularly greater than your batch
>> interval you're going to have problems anyway.  Investigate this more,
>> set maxRatePerPartition, something.
>> 1. That's personally what I tend to do.
>> 2. Why are you relying on checkpoints if you're storing offset state
>> in the database?  Just restart from the offsets in the database.  I
>> think your solution of map of batchtime to offset ranges would work
>> fine in that case, no?  (make sure to expire items from the map)
>>
>>
>>
>> On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN 
>> wrote:
>> > Hi,
>> >
>> > I'm currently implementing an exactly once mechanism based on the
>> > following
>> > example:
>> >
>> >
>> > https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
>> >
>> > the pseudo code is as follow:
>> >
>> > dstream.transform (store offset in a variable on driver side )
>> > dstream.map
>> > dstream.foreachRdd( action + save offset in db)
>> >
>> > this code doesn't work if the processing time is greater than batch
>> > interval
>> > (same problem as windowed
>> >
>> > (https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala)
>> >
>> > Indeed, at each batch interval a new rdd is created and stacked, thus
>> > method
>> > transform is called several times and update the global variable and at
>> > last
>> > when we perform saving the offset range does not correspond to the one
>> > processed.
>> >
>> > 1) Do I need to work at the RDD level (inside a big forEachRDD like in
>> > the
>> > first example) instead of dstream ?
>> >
>> > 2) I can use a map[BatchTime, OffsetRange] as a global variable but in
>> > case
>> > of crash this map will not reflect anymore the generatedRdds (restored
>> > from
>> > checkpoint, RDD prepared but not executed)
>> >   2.1 ) Do I need to store this map elsewhere (cassandra) ?
>> >   2.2)  Is there a way to retrieve offset range restored ? (transform
>> > method
>> > is not called anymore for the checkpointed rdd)
>> >   2.3) Is possible to store some context along the RDD to be serialized
>> > ?
>> >
>> > Lots of questions, let me kow if it's not clear !
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Erwan ALLAIN
Thanks for the fast answer !

I just feel annoyed and frustrated not to be able to use spark
checkpointing because I believe that there mechanism has been correctly
tested.
I'm afraid that reinventing the wheel can lead to side effects that I don't
see now ...

Anyway thanks again, I know what I have to do :)

On Fri, Oct 21, 2016 at 5:05 PM, Cody Koeninger  wrote:

> 0.  If your processing time is regularly greater than your batch
> interval you're going to have problems anyway.  Investigate this more,
> set maxRatePerPartition, something.
> 1. That's personally what I tend to do.
> 2. Why are you relying on checkpoints if you're storing offset state
> in the database?  Just restart from the offsets in the database.  I
> think your solution of map of batchtime to offset ranges would work
> fine in that case, no?  (make sure to expire items from the map)
>
>
>
> On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN 
> wrote:
> > Hi,
> >
> > I'm currently implementing an exactly once mechanism based on the
> following
> > example:
> >
> > https://github.com/koeninger/kafka-exactly-once/blob/
> master/src/main/scala/example/TransactionalPerBatch.scala
> >
> > the pseudo code is as follow:
> >
> > dstream.transform (store offset in a variable on driver side )
> > dstream.map
> > dstream.foreachRdd( action + save offset in db)
> >
> > this code doesn't work if the processing time is greater than batch
> interval
> > (same problem as windowed
> > (https://github.com/koeninger/kafka-exactly-once/blob/
> master/src/main/scala/example/Windowed.scala)
> >
> > Indeed, at each batch interval a new rdd is created and stacked, thus
> method
> > transform is called several times and update the global variable and at
> last
> > when we perform saving the offset range does not correspond to the one
> > processed.
> >
> > 1) Do I need to work at the RDD level (inside a big forEachRDD like in
> the
> > first example) instead of dstream ?
> >
> > 2) I can use a map[BatchTime, OffsetRange] as a global variable but in
> case
> > of crash this map will not reflect anymore the generatedRdds (restored
> from
> > checkpoint, RDD prepared but not executed)
> >   2.1 ) Do I need to store this map elsewhere (cassandra) ?
> >   2.2)  Is there a way to retrieve offset range restored ? (transform
> method
> > is not called anymore for the checkpointed rdd)
> >   2.3) Is possible to store some context along the RDD to be serialized ?
> >
> > Lots of questions, let me kow if it's not clear !
> >
>


Re: [Spark ML] Using GBTClassifier in OneVsRest

2016-10-21 Thread Guo-Xun Yuan
Same questions here.

GBTClassifier and MultilayerPerceptronClassifier extend Predictor[_,_]
rather than Classifier[_,_]. However, both are classifiers. It looks like
the class inheritance hierarchy is not strictly followed.

I wonder if the community considers it an issue, and has a plan for the fix?

Thanks!
Guo-Xun







On Thu, Oct 20, 2016 at 11:54 PM, Nick Pentreath 
wrote:

> Currently no - GBT implements the predictors, not the classifier
> interface. It might be possible to wrap it in a wrapper that extends the
> Classifier trait.
>
> Hopefully GBT will support multi-class at some point. But you can use
> RandomForest which does support multi-class.
>
> On Fri, 21 Oct 2016 at 02:12 ansari  wrote:
>
>> It appears as if the inheritance hierarchy doesn't allow GBTClassifiers
>> to be
>> used as the binary classifier in a OneVsRest trainer. Is there a simple
>> way
>> to use gradient-boosted trees for multiclass (not binary) problems?
>>
>> Specifically, it complains that GBTClassifier doesn't inherit from
>> Classifier[_, _, _].
>>
>> I'm using Spark 2.0.1:
>>
>> val gbt = new GBTClassifier()
>>   .setLabelCol("indexedLabel")
>>   .setFeaturesCol("features")
>>   .setMaxIter(10)
>>   .setMaxDepth(10)
>>
>> val ovr = new OneVsRest().
>> setClassifier(gbt)
>>
>> fails saying
>>
>> error: type mismatch;
>>  found   : org.apache.spark.ml.classification.GBTClassifier
>>  required: org.apache.spark.ml.classification.Classifier[_, _, _]
>>setClassifier(gbt)
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Spark-ML-Using-GBTClassifier-in-
>> OneVsRest-tp27933.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Plotting decision boundary in non-linear logistic regression

2016-10-21 Thread aditya1702
Hello,
I am working with Logistic Regression on a non linear data and I want to
plot a decision boundary using the data. I dont know how do I do it using
the contour plot. Could someone help me out please. This is the code I have
written:

from pyspark.ml.classification import LogisticRegression


lr=LogisticRegression(maxIter=1000,regParam=0.3,elasticNetParam=0.20)
model=lr.fit(data_train_df)

prediction = model.transform(data_test_df)
prediction.select(col('label'),col('prediction'))
final_pred_df=prediction.select(col('label'),col('prediction'))
ans=final_pred_df.where(col('label')==col('prediction')).count()
final_pred_df.show()
accuracy=ans/float(final_pred_df.count())
print accuracy*100

This gives the following output:

+-+--+
|label|prediction|
+-+--+
|  1.0|   1.0|
|  1.0|   1.0|
|  1.0|   1.0|
|  1.0|   1.0|
|  1.0|   1.0|
|  0.0|   0.0|
|  0.0|   0.0|
|  0.0|   1.0|
|  0.0|   1.0|
|  0.0|   1.0|
|  0.0|   1.0|
|  0.0|   1.0|
|  0.0|   0.0|
|  0.0|   0.0|
|  0.0|   0.0|
|  0.0|   0.0|
|  0.0|   0.0|
+-+--+

70.5882352941

Now how do I visualize this. The data plot is somewhat like this:

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Plotting-decision-boundary-in-non-linear-logistic-regression-tp27937.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ALS.trainImplicit block sizes

2016-10-21 Thread Nick Pentreath
Oh also you mention 20 partitions. Is that how many you have? How many
ratings?

It may be worth trying to reparation to larger number of partitions.

On Fri, 21 Oct 2016 at 17:04, Nick Pentreath 
wrote:

> I wonder if you can try with setting different blocks for user and item?
> Are you able to try 2.0 or use Scala for setting it in 1.6?
>
> You want your item blocks to be a lot less than user blocks. Items maybe
> 5-10, users perhaps 250-500?
>
> Do you have many "power items" that are connected to almost every user? Or
> vice versa?
>
> On Fri, 21 Oct 2016 at 16:46, Nikhil Mishra 
> wrote:
>
> Yes, that's what I tried initially. The default value is pretty low -
> something like 20. Default depends on the number of partitions in the
> ratings RDD. It was going out of memory with the default size too.
>
> On Fri, Oct 21, 2016 at 5:31 AM, Nick Pentreath 
> wrote:
>
> Did you try not setting the blocks parameter? It will then try to set it
> automatically for your data size.
> On Fri, 21 Oct 2016 at 09:16, Nikhil Mishra 
> wrote:
>
> I am using 105 nodes (1 master, 4 core and 100 task nodes). All are 7.5
> gig machines.
>
> On Fri, Oct 21, 2016 at 12:15 AM, Nick Pentreath  > wrote:
>
> How many nodes are you using in the cluster?
>
>
>
> On Fri, 21 Oct 2016 at 08:58 Nikhil Mishra 
> wrote:
>
> Thanks Nick.
>
> So we do partition U x I matrix into BxB matrices, each of size around U/B
> and I/B. Is that correct? Do you know whether a single block of the matrix
> is represented in memory as a full matrix or as sparse matrix? I ask this
> because my job has been failing for block sizes which should have worked.
>
> I have U = 85 million users, I = 250,000 items and when I specify block
> size 5,000, I get out of memory error, even though I am setting
> --executor-memory as 7g (on a linux EC2 which has 7.5g memory). Assuming
> each block has 17000 users and 50 items, eve if the block is internally
> represented as a full matrix, it should still occupy around 50MB space.
>
> Increasing block size to 20,000 also results in the same. So there is
> something I don't understand about how this is working.
>
> BTW, I am trying to find 50 latent factors (rank = 50).
>
> Do you have some insights as to how I should tweak things to get this
> working?
>
> Thanks,
> Nik
>
> On Thu, Oct 20, 2016 at 11:43 PM, Nick Pentreath  > wrote:
>
> The blocks params will set both user and item blocks.
>
> Spark 2.0 supports user and item blocks for PySpark:
> http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.recommendation
>
>
> On Fri, 21 Oct 2016 at 08:12 Nikhil Mishra 
> wrote:
>
> Hi,
>
> I have a question about the block size to be specified in
> ALS.trainImplicit() in pyspark (Spark 1.6.1). There is only one block size
> parameter to be specified. I want to know if that would result in
> partitioning both the users as well as the items axes.
>
> For example, I am using the following call to ALs.trainImplicit() in my
> code.
>
> ---
>
> RANK = 50
>
> ITERATIONS = 2
>
> BLOCKS = 1000
>
> ALPHA = 1.0
>
> model = ALS.trainImplicit(ratings, RANK, ITERATIONS, blocks=BLOCKS,
> alpha=ALPHA)
>
>
> 
>
> Will this partition the users x items matrix into BLOCKS x BLOCKS number
> of matrices or will it partition only the users axis thereby resulting in
> BLOCKS number of matrices, each with columns = total number of unique items?
>
> Thanks,
> Nik
>
>
>
>
>


Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Cody Koeninger
0.  If your processing time is regularly greater than your batch
interval you're going to have problems anyway.  Investigate this more,
set maxRatePerPartition, something.
1. That's personally what I tend to do.
2. Why are you relying on checkpoints if you're storing offset state
in the database?  Just restart from the offsets in the database.  I
think your solution of map of batchtime to offset ranges would work
fine in that case, no?  (make sure to expire items from the map)



On Fri, Oct 21, 2016 at 3:32 AM, Erwan ALLAIN  wrote:
> Hi,
>
> I'm currently implementing an exactly once mechanism based on the following
> example:
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
>
> the pseudo code is as follow:
>
> dstream.transform (store offset in a variable on driver side )
> dstream.map
> dstream.foreachRdd( action + save offset in db)
>
> this code doesn't work if the processing time is greater than batch interval
> (same problem as windowed
> (https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala)
>
> Indeed, at each batch interval a new rdd is created and stacked, thus method
> transform is called several times and update the global variable and at last
> when we perform saving the offset range does not correspond to the one
> processed.
>
> 1) Do I need to work at the RDD level (inside a big forEachRDD like in the
> first example) instead of dstream ?
>
> 2) I can use a map[BatchTime, OffsetRange] as a global variable but in case
> of crash this map will not reflect anymore the generatedRdds (restored from
> checkpoint, RDD prepared but not executed)
>   2.1 ) Do I need to store this map elsewhere (cassandra) ?
>   2.2)  Is there a way to retrieve offset range restored ? (transform method
> is not called anymore for the checkpointed rdd)
>   2.3) Is possible to store some context along the RDD to be serialized ?
>
> Lots of questions, let me kow if it's not clear !
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ALS.trainImplicit block sizes

2016-10-21 Thread Nick Pentreath
I wonder if you can try with setting different blocks for user and item?
Are you able to try 2.0 or use Scala for setting it in 1.6?

You want your item blocks to be a lot less than user blocks. Items maybe
5-10, users perhaps 250-500?

Do you have many "power items" that are connected to almost every user? Or
vice versa?

On Fri, 21 Oct 2016 at 16:46, Nikhil Mishra 
wrote:

> Yes, that's what I tried initially. The default value is pretty low -
> something like 20. Default depends on the number of partitions in the
> ratings RDD. It was going out of memory with the default size too.
>
> On Fri, Oct 21, 2016 at 5:31 AM, Nick Pentreath 
> wrote:
>
> Did you try not setting the blocks parameter? It will then try to set it
> automatically for your data size.
> On Fri, 21 Oct 2016 at 09:16, Nikhil Mishra 
> wrote:
>
> I am using 105 nodes (1 master, 4 core and 100 task nodes). All are 7.5
> gig machines.
>
> On Fri, Oct 21, 2016 at 12:15 AM, Nick Pentreath  > wrote:
>
> How many nodes are you using in the cluster?
>
>
>
> On Fri, 21 Oct 2016 at 08:58 Nikhil Mishra 
> wrote:
>
> Thanks Nick.
>
> So we do partition U x I matrix into BxB matrices, each of size around U/B
> and I/B. Is that correct? Do you know whether a single block of the matrix
> is represented in memory as a full matrix or as sparse matrix? I ask this
> because my job has been failing for block sizes which should have worked.
>
> I have U = 85 million users, I = 250,000 items and when I specify block
> size 5,000, I get out of memory error, even though I am setting
> --executor-memory as 7g (on a linux EC2 which has 7.5g memory). Assuming
> each block has 17000 users and 50 items, eve if the block is internally
> represented as a full matrix, it should still occupy around 50MB space.
>
> Increasing block size to 20,000 also results in the same. So there is
> something I don't understand about how this is working.
>
> BTW, I am trying to find 50 latent factors (rank = 50).
>
> Do you have some insights as to how I should tweak things to get this
> working?
>
> Thanks,
> Nik
>
> On Thu, Oct 20, 2016 at 11:43 PM, Nick Pentreath  > wrote:
>
> The blocks params will set both user and item blocks.
>
> Spark 2.0 supports user and item blocks for PySpark:
> http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.recommendation
>
>
> On Fri, 21 Oct 2016 at 08:12 Nikhil Mishra 
> wrote:
>
> Hi,
>
> I have a question about the block size to be specified in
> ALS.trainImplicit() in pyspark (Spark 1.6.1). There is only one block size
> parameter to be specified. I want to know if that would result in
> partitioning both the users as well as the items axes.
>
> For example, I am using the following call to ALs.trainImplicit() in my
> code.
>
> ---
>
> RANK = 50
>
> ITERATIONS = 2
>
> BLOCKS = 1000
>
> ALPHA = 1.0
>
> model = ALS.trainImplicit(ratings, RANK, ITERATIONS, blocks=BLOCKS,
> alpha=ALPHA)
>
>
> 
>
> Will this partition the users x items matrix into BLOCKS x BLOCKS number
> of matrices or will it partition only the users axis thereby resulting in
> BLOCKS number of matrices, each with columns = total number of unique items?
>
> Thanks,
> Nik
>
>
>
>
>


Issues with reading gz files with Spark Streaming

2016-10-21 Thread Nkechi Achara
Hi,

I am using Spark 1.5.0 to read gz files with textFileStream, but when new
files are dropped in the specified directory. I know this is only the case
with gz files as when i extract the file into the directory specified the
files are read on the next window and processed.

My code is here:

val comments = ssc.fileStream[LongWritable, Text,
TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false).
  map(pair => pair._2.toString)
comments.foreachRDD(i => i.foreach(m=> println(m)))

any idea why the gz files are not being recognized.

Thanks in advance,

K


Re: issue accessing Phoenix table from Spark

2016-10-21 Thread Jörn Franke
Have you verified that this class is in the fat jar? It looks that it misses 
some of the Hbase libraries ... 

> On 21 Oct 2016, at 11:45, Mich Talebzadeh  wrote:
> 
> Still does not work with Spark 2.0.0 on apache-phoenix-4.8.1-HBase-1.2-bin
> 
> thanks
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
>> On 7 October 2016 at 09:27, Mich Talebzadeh  
>> wrote:
>> Hi,
>> 
>> my code is trying to load a phoenix table built on an Hbase table.
>> 
>> import org.apache.spark.SparkContext
>> import org.apache.spark.SparkConf
>> import org.apache.hadoop.conf.Configuration
>> import org.apache.hadoop.hbase.HBaseConfiguration
>> import org.apache.hadoop.hbase.HColumnDescriptor
>> import org.apache.hadoop.hbase.HTableDescriptor
>> import org.apache.hadoop.hbase.{ HBaseConfiguration, HColumnDescriptor, 
>> HTableDescriptor }
>> import org.apache.hadoop.mapred.JobConf
>> import org.apache.hadoop.hbase.client.HBaseAdmin
>> import org.apache.spark.sql.types._
>> import org.apache.phoenix.spark._
>> 
>> 
>> The code line is from https://phoenix.apache.org/phoenix_spark.html
>> 
>> scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> warning: there was one deprecation warning; re-run with -deprecation for 
>> details
>> HiveContext: org.apache.spark.sql.hive.HiveContext = 
>> org.apache.spark.sql.hive.HiveContext@533e8807
>> 
>> scala> val df = HiveContext.load(
>>  | "org.apache.phoenix.spark",
>>  | Map("table" -> "temptable", "zkUrl" -> "rhes564:2181")
>>  |   )
>> 
>> warning: there was one deprecation warning; re-run with -deprecation for 
>> details
>> java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
>>   at 
>> org.apache.phoenix.spark.PhoenixRDD.getPhoenixConfiguration(PhoenixRDD.scala:71)
>>   at 
>> org.apache.phoenix.spark.PhoenixRDD.phoenixConf$lzycompute(PhoenixRDD.scala:39)
>>   at org.apache.phoenix.spark.PhoenixRDD.phoenixConf(PhoenixRDD.scala:38)
>>   at org.apache.phoenix.spark.PhoenixRDD.(PhoenixRDD.scala:42)
>>   at 
>> org.apache.phoenix.spark.PhoenixRelation.schema(PhoenixRelation.scala:50)
>>   at 
>> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:40)
>>   at 
>> org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:382)
>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:143)
>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)
>>   at org.apache.spark.sql.SQLContext.load(SQLContext.scala:958)
>>   ... 55 elided
>> I tried this as a fat jar file building it with Maven but I still get the 
>> same error.
>> 
>> Also the original code looks like this
>> 
>> val df = sqlContext.load(
>>   "org.apache.phoenix.spark",
>>   Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")
>> )
>> 
>> Thanks
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> http://talebzadehmich.wordpress.com
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
> 


Drop partition with PURGE fail

2016-10-21 Thread bluishpenguin
Hi all,
I am using spark 2.0 and would like execute the command below to drop the
partition with PURGE.

sql = "ALTER TABLE db.table1 DROP IF EXISTS PARTITION (pkey='150') PURGE"
spark.sql(sql)

It throws exception: 
/Py4JJavaError: An error occurred while calling o45.sql.
: org.apache.spark.sql.catalyst.parser.ParseException: 
Operation not allowed: ALTER TABLE ... DROP PARTITION ... PURGE(line 1, pos
0)

== SQL ==
ALTER TABLE exp_project_1463.ds_item1 DROP IF EXISTS PARTITION (pkey='150')
PURGE
^^^/

Without PURGE, I can execute successfully. 
Any idea what is going wrong?

Thank you.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Drop-partition-with-PURGE-fail-tp27936.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Re: How to iterate the element of an array in DataFrame?

2016-10-21 Thread Yan Facai
My expectation is:
root
|-- tag: vector

namely, I want to extract from:
[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
to:
Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))

I believe it needs two step:
1. val tag2vec = {tag: Array[Structure] => Vector}
2. mblog_tags.withColumn("vec", tag2vec(col("tag"))

But, I have no idea of how to describe the Array[Structure] in the
DataFrame.





On Fri, Oct 21, 2016 at 4:51 PM, lk_spark  wrote:

> how about change Schema from
> root
>  |-- category.firstCategory: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- category: string (nullable = true)
>  |||-- weight: string (nullable = true)
> to:
>
> root
>  |-- category: string (nullable = true)
>  |-- weight: string (nullable = true)
>
> 2016-10-21
> --
> lk_spark
> --
>
> *发件人:*颜发才(Yan Facai) 
> *发送时间:*2016-10-21 15:35
> *主题:*Re: How to iterate the element of an array in DataFrame?
> *收件人:*"user.spark"
> *抄送:*
>
> I don't know how to construct `array weight:string>>`.
> Could anyone help me?
>
> I try to get the array by :
> scala> mblog_tags.map(_.getSeq[(String, String)](0))
>
> while the result is:
> res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] = [value:
> array>]
>
>
> How to express `struct` ?
>
>
>
> On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai)  wrote:
>
>> Hi, I want to extract the attribute `weight` of an array, and combine
>> them to construct a sparse vector.
>>
>> ### My data is like this:
>>
>> scala> mblog_tags.printSchema
>> root
>>  |-- category.firstCategory: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- category: string (nullable = true)
>>  |||-- weight: string (nullable = true)
>>
>>
>> scala> mblog_tags.show(false)
>> +--+
>> |category.firstCategory|
>> +--+
>> |[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
>> |[[tagCategory_029, 0.9]]  |
>> |[[tagCategory_029, 0.8]]  |
>> +--+
>>
>>
>> ### And expected:
>> Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
>> Vectors.sparse(100, Array(29),  Array(0.9))
>> Vectors.sparse(100, Array(29),  Array(0.8))
>>
>> How to iterate an array in DataFrame?
>> Thanks.
>>
>>
>>
>>
>


Re: issue accessing Phoenix table from Spark

2016-10-21 Thread Mich Talebzadeh
Still does not work with Spark 2.0.0 on apache-phoenix-4.8.1-HBase-1.2-bin

thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 7 October 2016 at 09:27, Mich Talebzadeh 
wrote:

> Hi,
>
> my code is trying to load a phoenix table built on an Hbase table.
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.hbase.HBaseConfiguration
> import org.apache.hadoop.hbase.HColumnDescriptor
> import org.apache.hadoop.hbase.HTableDescriptor
> import org.apache.hadoop.hbase.{ HBaseConfiguration, HColumnDescriptor,
> HTableDescriptor }
> import org.apache.hadoop.mapred.JobConf
> import org.apache.hadoop.hbase.client.HBaseAdmin
> import org.apache.spark.sql.types._
> import org.apache.phoenix.spark._
>
>
> The code line is from https://phoenix.apache.org/phoenix_spark.html
>
> scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> warning: there was one deprecation warning; re-run with -deprecation for
> details
> HiveContext: org.apache.spark.sql.hive.HiveContext =
> org.apache.spark.sql.hive.HiveContext@533e8807
> scala> val df = HiveContext.load(
>  | "org.apache.phoenix.spark",
>  | Map("table" -> "temptable", "zkUrl" -> "rhes564:2181")
>  |   )
>
> warning: there was one deprecation warning; re-run with -deprecation for
> details
> java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
>   at org.apache.phoenix.spark.PhoenixRDD.getPhoenixConfiguration(
> PhoenixRDD.scala:71)
>   at org.apache.phoenix.spark.PhoenixRDD.phoenixConf$
> lzycompute(PhoenixRDD.scala:39)
>   at org.apache.phoenix.spark.PhoenixRDD.phoenixConf(PhoenixRDD.scala:38)
>   at org.apache.phoenix.spark.PhoenixRDD.(PhoenixRDD.scala:42)
>   at org.apache.phoenix.spark.PhoenixRelation.schema(
> PhoenixRelation.scala:50)
>   at org.apache.spark.sql.execution.datasources.LogicalRelation.(
> LogicalRelation.scala:40)
>   at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(
> SparkSession.scala:382)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:143)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)
>   at org.apache.spark.sql.SQLContext.load(SQLContext.scala:958)
>   ... 55 elided
> I tried this as a fat jar file building it with Maven but I still get the
> same error.
>
> Also the original code looks like this
>
> val df = sqlContext.load(
>   "org.apache.phoenix.spark",
>   Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")
> )
>
> Thanks
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: [Spark 2.0.0] error when unioning to an empty dataset

2016-10-21 Thread Agraj Mangal
I have seen this error sometimes when the elements in the schema have
different nullabilities. Could you print the schema for data and for
someCode.thatReturnsADataset() and see if there is any difference between
the two ?

On Fri, Oct 21, 2016 at 9:14 AM, Efe Selcuk  wrote:

> Thanks for the response. What do you mean by "semantically" the same?
> They're both Datasets of the same type, which is a case class, so I would
> expect compile-time integrity of the data. Is there a situation where this
> wouldn't be the case?
>
> Interestingly enough, if I instead create an empty rdd with
> sparkContext.emptyRDD of the same case class type, it works!
>
> So something like:
> var data = spark.sparkContext.emptyRDD[SomeData]
>
> // loop
>   data = data.union(someCode.thatReturnsADataset().rdd)
> // end loop
>
> data.toDS //so I can union it to the actual Dataset I have elsewhere
>
> On Thu, Oct 20, 2016 at 8:34 PM Agraj Mangal  wrote:
>
> I believe this normally comes when Spark is unable to perform union due to
> "difference" in schema of the operands. Can you check if the schema of both
> the datasets are semantically same ?
>
> On Tue, Oct 18, 2016 at 9:06 AM, Efe Selcuk  wrote:
>
> Bump!
>
> On Thu, Oct 13, 2016 at 8:25 PM Efe Selcuk  wrote:
>
> I have a use case where I want to build a dataset based off of
> conditionally available data. I thought I'd do something like this:
>
> case class SomeData( ... ) // parameters are basic encodable types like
> strings and BigDecimals
>
> var data = spark.emptyDataset[SomeData]
>
> // loop, determining what data to ingest and process into datasets
>   data = data.union(someCode.thatReturnsADataset)
> // end loop
>
> However I get a runtime exception:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> unresolved operator 'Union;
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.
> failAnalysis(CheckAnalysis.scala:40)
> at org.apache.spark.sql.catalyst.analysis.Analyzer.
> failAnalysis(Analyzer.scala:58)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:126)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.
> checkAnalysis(CheckAnalysis.scala:67)
> at org.apache.spark.sql.catalyst.analysis.Analyzer.
> checkAnalysis(Analyzer.scala:58)
> at org.apache.spark.sql.execution.QueryExecution.
> assertAnalyzed(QueryExecution.scala:49)
> at org.apache.spark.sql.Dataset.(Dataset.scala:161)
> at org.apache.spark.sql.Dataset.(Dataset.scala:167)
> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
> at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
> at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)
>
> Granted, I'm new at Spark so this might be an anti-pattern, so I'm open to
> suggestions. However it doesn't seem like I'm doing anything incorrect
> here, the types are correct. Searching for this error online returns
> results seemingly about working in dataframes and having mismatching
> schemas or a different order of fields, and it seems like bugfixes have
> gone into place for those cases.
>
> Thanks in advance.
> Efe
>
>
>
>
> --
> Thanks & Regards,
> Agraj Mangal
>
>


-- 
Thanks & Regards,
Agraj Mangal


Re: Re: How to iterate the element of an array in DataFrame?

2016-10-21 Thread lk_spark
how about change Schema from
root
 |-- category.firstCategory: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- category: string (nullable = true)
 |||-- weight: string (nullable = true)

to:

root
 |-- category: string (nullable = true)
 |-- weight: string (nullable = true)

2016-10-21 

lk_spark 



发件人:颜发才(Yan Facai) 
发送时间:2016-10-21 15:35
主题:Re: How to iterate the element of an array in DataFrame?
收件人:"user.spark"
抄送:

I don't know how to construct `array>`.
Could anyone help me?


I try to get the array by :
scala> mblog_tags.map(_.getSeq[(String, String)](0))

while the result is:
res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] = [value: 
array>]




How to express `struct` ?






On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai)  wrote:

Hi, I want to extract the attribute `weight` of an array, and combine them to 
construct a sparse vector. 



### My data is like this:

scala> mblog_tags.printSchema
root
 |-- category.firstCategory: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- category: string (nullable = true)
 |||-- weight: string (nullable = true)


scala> mblog_tags.show(false)
+--+
|category.firstCategory|
+--+
|[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
|[[tagCategory_029, 0.9]]  |
|[[tagCategory_029, 0.8]]  |
+--+



### And expected:
Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
Vectors.sparse(100, Array(29),  Array(0.9))
Vectors.sparse(100, Array(29),  Array(0.8))


How to iterate an array in DataFrame?

Thanks.

Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Erwan ALLAIN
Hi,

I'm currently implementing an exactly once mechanism based on the following
example:

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala

the pseudo code is as follow:

dstream.transform (store offset in a variable on driver side )
dstream.map
dstream.foreachRdd( action + save offset in db)

this code doesn't work if the processing time is greater than batch
interval (same problem as windowed (
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala
)

Indeed, at each batch interval a new rdd is created and stacked, thus
method transform is called several times and update the global variable and
at last when we perform saving the offset range does not correspond to the
one processed.

1) Do I need to work at the RDD level (inside a big forEachRDD like in the
first example) instead of dstream ?

2) I can use a map[BatchTime, OffsetRange] as a global variable but in case
of crash this map will not reflect anymore the generatedRdds (restored from
checkpoint, RDD prepared but not executed)
  2.1 ) Do I need to store this map elsewhere (cassandra) ?
  2.2)  Is there a way to retrieve offset range restored ? (transform
method is not called anymore for the checkpointed rdd)
  2.3) Is possible to store some context along the RDD to be serialized ?

Lots of questions, let me kow if it's not clear !


sql.functions partitionby AttributeError: 'NoneType' object has no attribute '_jvm'

2016-10-21 Thread muhammet pakyürek
i work with partitioonby for lead lag functions i get the errror above and here 
is the explanation


jspec = 
sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols))





How to clean the accumulator and broadcast from the driver manually?

2016-10-21 Thread Mungeol Heo
Hello,

As I mentioned at the title, I want to know is it possible to clean
the accumulator/broadcast from the driver manually since the driver's
memory keeps increasing.

Someone says that unpersist method removes them both from memory as
well as disk on each executor node. But it stays on the driver node,
so it can be re-broadcast

If it is true, how can I solve the "driver's memory keeps increasing" issue?

Any help will be GREAT!
Thank you.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to iterate the element of an array in DataFrame?

2016-10-21 Thread Yan Facai
I don't know how to construct
`array>`.
Could anyone help me?

I try to get the array by :
scala> mblog_tags.map(_.getSeq[(String, String)](0))

while the result is:
res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] = [value:
array>]


How to express `struct` ?



On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai)  wrote:

> Hi, I want to extract the attribute `weight` of an array, and combine them
> to construct a sparse vector.
>
> ### My data is like this:
>
> scala> mblog_tags.printSchema
> root
>  |-- category.firstCategory: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- category: string (nullable = true)
>  |||-- weight: string (nullable = true)
>
>
> scala> mblog_tags.show(false)
> +--+
> |category.firstCategory|
> +--+
> |[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
> |[[tagCategory_029, 0.9]]  |
> |[[tagCategory_029, 0.8]]  |
> +--+
>
>
> ### And expected:
> Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
> Vectors.sparse(100, Array(29),  Array(0.9))
> Vectors.sparse(100, Array(29),  Array(0.8))
>
> How to iterate an array in DataFrame?
> Thanks.
>
>
>
>


RE: Can we disable parquet logs in Spark?

2016-10-21 Thread Yu, Yucai
I set "log4j.rootCategory=ERROR, console" and using "-file 
conf/log4f.properties" to make most of logs suppressed, but those 
org.apache.parquet log still exists.

Any way to disable them also?

Thanks,
Yucai

From: Yu, Yucai [mailto:yucai...@intel.com]
Sent: Friday, October 21, 2016 2:50 PM
To: user@spark.apache.org
Subject: Can we disable parquet logs in Spark?

Hi,

I see lots of parquet logs in container logs(YARN mode), like below:

stdout:
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 8,448B for 
[ss_promo_sk] INT32: 5,996 values, 8,513B raw, 8,409B comp, 1 pages, encodings: 
[PLAIN_DICTIONARY, BIT_PACKED, RLE], dic { 1,475 entries, 5,900B raw, 1,475B 
comp}
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 1,376B for 
[ss_ticket_number] INT32: 5,996 values, 1,730B raw, 1,340B comp, 1 pages, 
encodings: [PLAIN_DICTIONARY, BIT_PACKED, RLE], dic { 524 entries, 2,096B raw, 
524B comp}
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 5,516B for 
[ss_quantity] INT32: 5,996 values, 5,567B raw, 5,479B comp, 1 pages, encodings: 
[PLAIN_DICTIONARY, BIT_PACKED, RLE], dic { 100 entries, 400B raw, 100B comp}
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 14,385B for 
[ss_wholesale_cost] INT32: 5,996 values, 23,931B raw, 14,346B comp, 1 pages, 
encodings: [BIT_PACKED, PLAIN, RLE]
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 15,043B for 
[ss_list_price] INT32: 5,996 values, 23,871B raw, 15,004B comp, 1 pages, 
encodings: [BIT_PACKED, PLAIN, RLE]
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 14,442B for 
[ss_sales_price] INT32: 5,996 values, 23,896B raw, 14,403B comp, 1 pages, 
encodings: [BIT_PACKED, PLAIN, RLE]
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 3,538B for 
[ss_ext_discount_amt] INT32: 5,996 values, 7,317B raw, 3,501B comp, 1 pages, 
encodings: [PLAIN_DICTIONARY, BIT_PACKED, RLE], dic { 1,139 entries, 4,556B 
raw, 1,139B comp}
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 18,052B for 
[ss_ext_sales_price] INT32: 5,996 values, 23,907B raw, 18,013B comp, 1 pages, 
encodings: [BIT_PACKED, PLAIN, RLE]
Oc

I tried below in log4j.properties, but not work.
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

Is there a way to disable them?

Thanks a lot!

Yucai


Re: ALS.trainImplicit block sizes

2016-10-21 Thread Nick Pentreath
How many nodes are you using in the cluster?



On Fri, 21 Oct 2016 at 08:58 Nikhil Mishra 
wrote:

> Thanks Nick.
>
> So we do partition U x I matrix into BxB matrices, each of size around U/B
> and I/B. Is that correct? Do you know whether a single block of the matrix
> is represented in memory as a full matrix or as sparse matrix? I ask this
> because my job has been failing for block sizes which should have worked.
>
> I have U = 85 million users, I = 250,000 items and when I specify block
> size 5,000, I get out of memory error, even though I am setting
> --executor-memory as 7g (on a linux EC2 which has 7.5g memory). Assuming
> each block has 17000 users and 50 items, eve if the block is internally
> represented as a full matrix, it should still occupy around 50MB space.
>
> Increasing block size to 20,000 also results in the same. So there is
> something I don't understand about how this is working.
>
> BTW, I am trying to find 50 latent factors (rank = 50).
>
> Do you have some insights as to how I should tweak things to get this
> working?
>
> Thanks,
> Nik
>
> On Thu, Oct 20, 2016 at 11:43 PM, Nick Pentreath  > wrote:
>
> The blocks params will set both user and item blocks.
>
> Spark 2.0 supports user and item blocks for PySpark:
> http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.recommendation
>
>
> On Fri, 21 Oct 2016 at 08:12 Nikhil Mishra 
> wrote:
>
> Hi,
>
> I have a question about the block size to be specified in
> ALS.trainImplicit() in pyspark (Spark 1.6.1). There is only one block size
> parameter to be specified. I want to know if that would result in
> partitioning both the users as well as the items axes.
>
> For example, I am using the following call to ALs.trainImplicit() in my
> code.
>
> ---
>
> RANK = 50
>
> ITERATIONS = 2
>
> BLOCKS = 1000
>
> ALPHA = 1.0
>
> model = ALS.trainImplicit(ratings, RANK, ITERATIONS, blocks=BLOCKS,
> alpha=ALPHA)
>
>
> 
>
> Will this partition the users x items matrix into BLOCKS x BLOCKS number
> of matrices or will it partition only the users axis thereby resulting in
> BLOCKS number of matrices, each with columns = total number of unique items?
>
> Thanks,
> Nik
>
>
>


Re: [Spark ML] Using GBTClassifier in OneVsRest

2016-10-21 Thread Nick Pentreath
Currently no - GBT implements the predictors, not the classifier interface.
It might be possible to wrap it in a wrapper that extends the Classifier
trait.

Hopefully GBT will support multi-class at some point. But you can use
RandomForest which does support multi-class.

On Fri, 21 Oct 2016 at 02:12 ansari  wrote:

> It appears as if the inheritance hierarchy doesn't allow GBTClassifiers to
> be
> used as the binary classifier in a OneVsRest trainer. Is there a simple way
> to use gradient-boosted trees for multiclass (not binary) problems?
>
> Specifically, it complains that GBTClassifier doesn't inherit from
> Classifier[_, _, _].
>
> I'm using Spark 2.0.1:
>
> val gbt = new GBTClassifier()
>   .setLabelCol("indexedLabel")
>   .setFeaturesCol("features")
>   .setMaxIter(10)
>   .setMaxDepth(10)
>
> val ovr = new OneVsRest().
> setClassifier(gbt)
>
> fails saying
>
> error: type mismatch;
>  found   : org.apache.spark.ml.classification.GBTClassifier
>  required: org.apache.spark.ml.classification.Classifier[_, _, _]
>setClassifier(gbt)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ML-Using-GBTClassifier-in-OneVsRest-tp27933.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Can we disable parquet logs in Spark?

2016-10-21 Thread Yu, Yucai
Hi,

I see lots of parquet logs in container logs(YARN mode), like below:

stdout:
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 8,448B for 
[ss_promo_sk] INT32: 5,996 values, 8,513B raw, 8,409B comp, 1 pages, encodings: 
[PLAIN_DICTIONARY, BIT_PACKED, RLE], dic { 1,475 entries, 5,900B raw, 1,475B 
comp}
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 1,376B for 
[ss_ticket_number] INT32: 5,996 values, 1,730B raw, 1,340B comp, 1 pages, 
encodings: [PLAIN_DICTIONARY, BIT_PACKED, RLE], dic { 524 entries, 2,096B raw, 
524B comp}
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 5,516B for 
[ss_quantity] INT32: 5,996 values, 5,567B raw, 5,479B comp, 1 pages, encodings: 
[PLAIN_DICTIONARY, BIT_PACKED, RLE], dic { 100 entries, 400B raw, 100B comp}
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 14,385B for 
[ss_wholesale_cost] INT32: 5,996 values, 23,931B raw, 14,346B comp, 1 pages, 
encodings: [BIT_PACKED, PLAIN, RLE]
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 15,043B for 
[ss_list_price] INT32: 5,996 values, 23,871B raw, 15,004B comp, 1 pages, 
encodings: [BIT_PACKED, PLAIN, RLE]
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 14,442B for 
[ss_sales_price] INT32: 5,996 values, 23,896B raw, 14,403B comp, 1 pages, 
encodings: [BIT_PACKED, PLAIN, RLE]
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 3,538B for 
[ss_ext_discount_amt] INT32: 5,996 values, 7,317B raw, 3,501B comp, 1 pages, 
encodings: [PLAIN_DICTIONARY, BIT_PACKED, RLE], dic { 1,139 entries, 4,556B 
raw, 1,139B comp}
Oct 21, 2016 2:27:30 PM INFO: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 18,052B for 
[ss_ext_sales_price] INT32: 5,996 values, 23,907B raw, 18,013B comp, 1 pages, 
encodings: [BIT_PACKED, PLAIN, RLE]
Oc

I tried below in log4j.properties, but not work.
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

Is there a way to disable them?

Thanks a lot!

Yucai


Re: ALS.trainImplicit block sizes

2016-10-21 Thread Nick Pentreath
The blocks params will set both user and item blocks.

Spark 2.0 supports user and item blocks for PySpark:
http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.recommendation

On Fri, 21 Oct 2016 at 08:12 Nikhil Mishra 
wrote:

> Hi,
>
> I have a question about the block size to be specified in
> ALS.trainImplicit() in pyspark (Spark 1.6.1). There is only one block size
> parameter to be specified. I want to know if that would result in
> partitioning both the users as well as the items axes.
>
> For example, I am using the following call to ALs.trainImplicit() in my
> code.
>
> ---
>
> RANK = 50
>
> ITERATIONS = 2
>
> BLOCKS = 1000
>
> ALPHA = 1.0
>
> model = ALS.trainImplicit(ratings, RANK, ITERATIONS, blocks=BLOCKS,
> alpha=ALPHA)
>
>
> 
>
> Will this partition the users x items matrix into BLOCKS x BLOCKS number
> of matrices or will it partition only the users axis thereby resulting in
> BLOCKS number of matrices, each with columns = total number of unique items?
>
> Thanks,
> Nik
>


Re: spark pi example fail on yarn

2016-10-21 Thread Xi Shen
I see, I had this issue before. I think you are using Java 8, right?
Because Java 8 JVM requires more bootstrap heap memory.

Turning off the memory check is an unsafe way to avoid this issue. I think
it is better to increase the memory ratio, like this:

  
yarn.nodemanager.vmem-pmem-ratio
3.15
  


On Fri, Oct 21, 2016 at 11:15 AM Li Li  wrote:

I modified yarn-site.xml yarn.nodemanager.vmem-check-enabled to false
and it works for yarn-client and spark-shell

On Fri, Oct 21, 2016 at 10:59 AM, Li Li  wrote:
> I found a warn in nodemanager log. is the virtual memory exceed? how
> should I config yarn to solve this problem?
>
> 2016-10-21 10:41:12,588 INFO
>
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> Memory usage of ProcessTree 20299 for container-id
> container_1477017445921_0001_02_01: 335.1 MB of 1 GB physical
> memory used; 2.2 GB of 2.1 GB virtual memory used
> 2016-10-21 10:41:12,589 WARN
>
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> Process tree for container: container_1477017445921_0001_02_01 has
> processes older than 1 iteration running over the configured limit.
> Limit=2254857728, current usage = 2338873344
>
> On Fri, Oct 21, 2016 at 8:49 AM, Saisai Shao 
wrote:
>> It is not Spark has difficulty to communicate with YARN, it simply means
AM
>> is exited with FINISHED state.
>>
>> I'm guessing it might be related to memory constraints for container,
please
>> check the yarn RM and NM logs to find out more details.
>>
>> Thanks
>> Saisai
>>
>> On Fri, Oct 21, 2016 at 8:14 AM, Xi Shen  wrote:
>>>
>>> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
>>> application has already exited with state FINISHED!
>>>
>>>  From this, I think it is spark has difficult communicating with YARN.
You
>>> should check your Spark log.
>>>
>>>
>>> On Fri, Oct 21, 2016 at 8:06 AM Li Li  wrote:

 which log file should I

 On Thu, Oct 20, 2016 at 10:02 PM, Saisai Shao 
 wrote:
 > Looks like ApplicationMaster is killed by SIGTERM.
 >
 > 16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
 > 16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:
 >
 > This container may be killed by yarn NodeManager or other processes,
 > you'd
 > better check yarn log to dig out more details.
 >
 > Thanks
 > Saisai
 >
 > On Thu, Oct 20, 2016 at 6:51 PM, Li Li  wrote:
 >>
 >> I am setting up a small yarn/spark cluster. hadoop/yarn version is
 >> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
 >> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
 >> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
 >> org.apache.spark.examples.SparkPi --master yarn-client
 >> examples/jars/spark-examples_2.11-2.0.1.jar 1
 >> it fails and the first error is:
 >> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
 >> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
 >> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
 >> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
 >> 16/10/20 18:12:12 INFO
 >> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
ApplicationMaster
 >> registered as NettyRpcEndpointRef(null)
 >> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
 >> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
 >> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
 >> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
 >> /proxy/application_1476957324184_0002
 >> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
 >> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 >> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
 >> SchedulerBackend is ready for scheduling beginning after waiting
 >> maxRegisteredResourcesWaitingTime: 3(ms)
 >> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
 >> SparkContext, some configuration may not take effect.
 >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
 >> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
 >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
 >> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
 >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
 >> o.s.j.s.ServletContextHandler@378f002a
{/SQL/execution,null,AVAILABLE}
 >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
 >>
 >> o.s.j.s.ServletContextHandler@2cc75074
{/SQL/execution/json,null,AVAILABLE}
 >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
 >> 

ALS.trainImplicit block sizes

2016-10-21 Thread Nikhil Mishra
Hi,

I have a question about the block size to be specified in
ALS.trainImplicit() in pyspark (Spark 1.6.1). There is only one block size
parameter to be specified. I want to know if that would result in
partitioning both the users as well as the items axes.

For example, I am using the following call to ALs.trainImplicit() in my
code.

---

RANK = 50

ITERATIONS = 2

BLOCKS = 1000

ALPHA = 1.0

model = ALS.trainImplicit(ratings, RANK, ITERATIONS, blocks=BLOCKS,
alpha=ALPHA)




Will this partition the users x items matrix into BLOCKS x BLOCKS number of
matrices or will it partition only the users axis thereby resulting in
BLOCKS number of matrices, each with columns = total number of unique items?

Thanks,
Nik