Spark ExternalTable doesn't recognize subdir

2016-10-19 Thread lk_spark
hi,all
   my issue is everyday I will receive some json datafile , I want to convert 
them to parquet file and save to hdfs,
   the floder will like this:
   /my_table_base_floder 
  /my_table_base_floder/day_2
  /my_table_base_floder/day_3
  
 
where the parquet files of "day_1" was store in  /my_table_base_floder 
then I run : 
sqlContext.createExternalTable("tpc1.customer","hdfs://master1:9000/my_table_base_floder","parquet")
but when I save parquet file to subdir ,for example :  
/my_table_base_floder/day_2  and refresh the metadata.
spark doesn't recognize the data in subdir. How I can do it ?


2016-10-20


lk_spark 

partitionBy produces wrong number of tasks

2016-10-19 Thread Daniel Haviv
Hi,
I have a case where I use partitionBy to write my DF using a calculated
column, so it looks somethings like this:

val df = spark.sql("select *, from_unixtime(ts, 'MMddH')
partition_key from mytable")

df.write.partitionBy("partition_key").orc("/partitioned_table")


df is 8 partitions in size (spark.sql.shuffle.partitions is set to 8) and
partition_key usually has 1 or 2 distinct values.
When the write action begins it's split into 330 tasks and takes much
longer than it should but if I switch to the following code instead it
works as expected with 8 tasks:

df.createTempView("tab")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("insert into partitioned_table select * from tab")



Any idea why is this happening ?
How does partitionBy decide to repartition the DF ?


Thank you,
Daniel


Re: [Spark][issue]Writing Hive Partitioned table

2016-10-19 Thread ayan guha
Hi Group

Sorry to rekindle this thread.

Using Spark 1.6.0 on CDH 5.7.

Any idea?


Best
Ayan

On Fri, Oct 7, 2016 at 5:08 PM, Mich Talebzadeh 
wrote:

> Hi Ayan,
>
> Depends on the version of Spark you are using.
>
> Have you tried updating stats in Hive?
>
> ANALYZE TABLE ${DATABASE}.${TABLE} PARTITION (${PARTITION_NAME}) COMPUTE
> STATISTICS FOR COLUMNS
>
> and then do
>
> show create table ${TABLE}
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 7 October 2016 at 03:46, ayan guha  wrote:
>
>> Posting with correct subject.
>>
>> On Fri, Oct 7, 2016 at 12:37 PM, ayan guha  wrote:
>>
>>> Hi
>>>
>>> Faced one issue:
>>>
>>> - Writing Hive Partitioned table using
>>>
>>> df.withColumn("partition_date",to_date(df["INTERVAL_DATE"]))
>>> .write.partitionBy('partition_date').saveAsTable("sometable"
>>> ,mode="overwrite")
>>>
>>> - Data got written to HDFS fine. I can see the folders with partition
>>> names such as
>>>
>>> /app/somedb/hive/somedb.db/sometable/partition_date=2016-09-28
>>> /app/somedb/hive/somedb.db/sometable/partition_date=2016-09-29
>>>
>>> and so on.
>>> - Also, _common_metadata & _metadata files are written properly
>>>
>>> - I can read data from spark fine using 
>>> read.parquet("/app/somedb/hive/somedb.db/sometable").
>>> Printschema showing all columns.
>>>
>>> - However, I can not read from hive.
>>>
>>> Problem 1: Hive does not think the table is partitioned
>>> Problem 2: Hive sees only 1 column
>>> array from deserializer
>>> Problem 3: MSCK repair table failed, saying partitions are not in
>>> Metadata.
>>>
>>> Question: Is it a known issue with Spark to write to Hive partitioned
>>> table?
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Dataframe schema...

2016-10-19 Thread Muthu Jayakumar
Hello Michael,

Thank you for looking into this query. In my case there seem to be an issue
when I union a parquet file read from disk versus another dataframe that I
construct in-memory. The only difference I see is the containsNull = true.
In fact, I do not see any errors with union on the simple schema of "col1
thru col4" above. But the problem seem to exist only on that
"some_histogram" column which contains the mixed containsNull = true/false.
Let me know if this helps.

Thanks,
Muthu



On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust 
wrote:

> Nullable is just a hint to the optimizer that its impossible for there to
> be a null value in this column, so that it can avoid generating code for
> null-checks.  When in doubt, we set nullable=true since it is always safer
> to check.
>
> Why in particular are you trying to change the nullability of the column?
>
> On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar 
> wrote:
>
>> Hello there,
>>
>> I am trying to understand how and when does DataFrame (or Dataset) sets
>> nullable = true vs false on a schema.
>>
>> Here is my observation from a sample code I tried...
>>
>>
>> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
>> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
>> lit("bla")).printSchema()
>> root
>>  |-- col1: integer (nullable = false)
>>  |-- col2: string (nullable = true)
>>  |-- col3: double (nullable = false)
>>  |-- col4: string (nullable = false)
>>
>>
>> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
>> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
>> lit("bla")).write.parquet("/tmp/sample.parquet")
>>
>> scala> spark.read.parquet("/tmp/sample.parquet").printSchema()
>> root
>>  |-- col1: integer (nullable = true)
>>  |-- col2: string (nullable = true)
>>  |-- col3: double (nullable = true)
>>  |-- col4: string (nullable = true)
>>
>>
>> The place where this seem to get me into trouble is when I try to union
>> one data-structure from in-memory (notice that in the below schema the
>> highlighted element is represented as 'false' for in-memory created schema)
>> and one from file that starts out with a schema like below...
>>
>>  |-- some_histogram: struct (nullable = true)
>>  ||-- values: array (nullable = true)
>>  |||-- element: double (containsNull = true)
>>  ||-- freq: array (nullable = true)
>>  |||-- element: long (containsNull = true)
>>
>> Is there a way to convert this attribute from true to false without
>> running any mapping / udf on that column?
>>
>> Please advice,
>> Muthu
>>
>
>


Re: How does Spark determine in-memory partition count when reading Parquet ~files?

2016-10-19 Thread Michael Armbrust
In spark 2.0 we bin-pack small files into a single task to avoid
overloading the scheduler.  If you want a specific number of partitions you
should repartition.  If you want to disable this optimization you can set
the file open cost very high: spark.sql.files.openCostInBytes

On Tue, Oct 18, 2016 at 7:04 PM, shea.parkes  wrote:

> When reading a parquet ~file with >50 parts, Spark is giving me a DataFrame
> object with far fewer in-memory partitions.
>
> I'm happy to troubleshoot this further, but I don't know Scala well and
> could use some help pointing me in the right direction.  Where should I be
> looking in the code base to understand how many partitions will result from
> reading a parquet ~file?
>
> Thanks,
>
> Shea
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-does-Spark-determine-in-
> memory-partition-count-when-reading-Parquet-files-tp27921.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Dataframe schema...

2016-10-19 Thread Michael Armbrust
Nullable is just a hint to the optimizer that its impossible for there to
be a null value in this column, so that it can avoid generating code for
null-checks.  When in doubt, we set nullable=true since it is always safer
to check.

Why in particular are you trying to change the nullability of the column?

On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar  wrote:

> Hello there,
>
> I am trying to understand how and when does DataFrame (or Dataset) sets
> nullable = true vs false on a schema.
>
> Here is my observation from a sample code I tried...
>
>
> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
> lit("bla")).printSchema()
> root
>  |-- col1: integer (nullable = false)
>  |-- col2: string (nullable = true)
>  |-- col3: double (nullable = false)
>  |-- col4: string (nullable = false)
>
>
> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
> lit("bla")).write.parquet("/tmp/sample.parquet")
>
> scala> spark.read.parquet("/tmp/sample.parquet").printSchema()
> root
>  |-- col1: integer (nullable = true)
>  |-- col2: string (nullable = true)
>  |-- col3: double (nullable = true)
>  |-- col4: string (nullable = true)
>
>
> The place where this seem to get me into trouble is when I try to union
> one data-structure from in-memory (notice that in the below schema the
> highlighted element is represented as 'false' for in-memory created schema)
> and one from file that starts out with a schema like below...
>
>  |-- some_histogram: struct (nullable = true)
>  ||-- values: array (nullable = true)
>  |||-- element: double (containsNull = true)
>  ||-- freq: array (nullable = true)
>  |||-- element: long (containsNull = true)
>
> Is there a way to convert this attribute from true to false without
> running any mapping / udf on that column?
>
> Please advice,
> Muthu
>


Dataframe schema...

2016-10-19 Thread Muthu Jayakumar
Hello there,

I am trying to understand how and when does DataFrame (or Dataset) sets
nullable = true vs false on a schema.

Here is my observation from a sample code I tried...


scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
lit("bla")).printSchema()
root
 |-- col1: integer (nullable = false)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = false)
 |-- col4: string (nullable = false)


scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
lit("bla")).write.parquet("/tmp/sample.parquet")

scala> spark.read.parquet("/tmp/sample.parquet").printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = true)
 |-- col4: string (nullable = true)


The place where this seem to get me into trouble is when I try to union one
data-structure from in-memory (notice that in the below schema the
highlighted element is represented as 'false' for in-memory created schema)
and one from file that starts out with a schema like below...

 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = true)

Is there a way to convert this attribute from true to false without running
any mapping / udf on that column?

Please advice,
Muthu


?????? Why the json file used by sparkSession.read.json must be a validjson object per line

2016-10-19 Thread Wangjianfei
yeah, the design mainly because hdfs.
 
--




2015???? 15101549787






 




--  --
??: "Jakob Odersky"; 
: 2016??10??20??(??) 4:46
??: "Hyukjin Kwon"; 
: "Daniel Barclay"; "Koert 
Kuipers"; "user @spark"; 
"Wangjianfei"<1004910...@qq.com>; 
: Re: Why the json file used by sparkSession.read.json must be a validjson 
object per line



Another reason I could imagine is that files are often read from HDFS,
which by default uses line terminators to separate records.

It is possible to implement your own hdfs delimiter finder, however
for arbitrary json data, finding that delimiter would require stateful
parsing of the file and would be difficult to parallelize across a
cluster.

On Tue, Oct 18, 2016 at 4:40 PM, Hyukjin Kwon  wrote:
> Regarding his recent PR[1], I guess he meant multiple line json.
>
> As far as I know, single line json also conplies the standard. I left a
> comment with RFC in the PR but please let me know if I am wrong at any
> point.
>
> Thanks!
>
> [1]https://github.com/apache/spark/pull/15511
>
>
> On 19 Oct 2016 7:00 a.m., "Daniel Barclay" 
> wrote:
>>
>> Koert,
>>
>> Koert Kuipers wrote:
>>
>> A single json object would mean for most parsers it needs to fit in memory
>> when reading or writing
>>
>> Note that codlife didn't seem to being asking about single-object JSON
>> files, but about standard-format JSON files.
>>
>>
>> On Oct 15, 2016 11:09, "codlife" <1004910...@qq.com> wrote:
>>>
>>> Hi:
>>>I'm doubt about the design of spark.read.json,  why the json file is
>>> not
>>> a standard json file, who can tell me the internal reason. Any advice is
>>> appreciated.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-the-json-file-used-by-sparkSession-read-json-must-be-a-valid-json-object-per-line-tp27907.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>
>

Re: can mllib Logistic Regression package handle 10 million sparse features?

2016-10-19 Thread Yang
in my case, my model size is fairly small ( 100k training samples ), though
the features count is roughly 100k populated out of 10mil possible features.

in this case it does not help me to distribute the training process, since
data size is so small. I just need a good core solver to train the model in
a serial manner. On the other hand, I have to train millions of such models
independently, so I have enough load balancing opportunity

On Tue, Oct 11, 2016 at 3:09 AM, Nick Pentreath 
wrote:

> That's a good point about shuffle data compression. Still, it would be
> good to benchmark the ideas behind https://github.com/
> apache/spark/pull/12761 I think.
>
> For many datasets, even within one partition the gradient sums etc can
> remain very sparse. For example Criteo DAC data is extremely sparse - and
> it has roughly 5% of active features per partition. However, you're correct
> that as the coefficients (and intermediate stats counters) get aggregated
> they will become more and more dense. But there is also the intermediate
> memory overhead of the dense structures, though that comes into play in the
> 100s - 1000s millions feature range.
>
> The situation in the PR above is actually different in that even the
> coefficient vector itself is truly sparse (through some encoding they did
> IRC). This is not an uncommon scenario however, as for high-dimensional
> features users may want to use feature hashing which may result in actually
> sparse coefficient vectors. With hashing often the feature dimension will
> be chosen as power of 2 and higher (in some cases significantly) than the
> true feature dimension to reduce collisions. So sparsity is critical here
> for storage efficiency.
>
> Your result for the final stage does seem to indicate something can be
> improved - perhaps it is due to some level of fetch parallelism - so more
> partitions may fetch more data in parallel? Because with just default
> setting for `treeAggregate` I was seeing much faster times for the final
> stage with 34 million feature dimension (though the final shuffle size
> seems 50% of yours with 2x the features - this is with Spark 2.0.1, I
> haven't tested out master yet with this data).
>
> [image: Screen Shot 2016-10-11 at 12.03.55 PM.png]
>
>
>
> On Fri, 7 Oct 2016 at 08:11 DB Tsai  wrote:
>
>> Hi Nick,
>>
>>
>>
>> I'm also working on the benchmark of liner models in Spark. :)
>>
>>
>>
>> One thing I saw is that for sparse features, 14 million features, with
>>
>> multi-depth aggregation, the final aggregation to the driver is
>>
>> extremely slow. See the attachment. The amount of data being exchanged
>>
>> between executor and executor is significantly larger than collecting
>>
>> the data into driver, but the time for collecting the data back to
>>
>> driver takes 4mins while the aggregation between executors only takes
>>
>> 20secs. Seems that the code path is different, and I suspect that
>>
>> there may be something in the spark core that we can optimize.
>>
>>
>>
>> Regrading using sparse data structure for aggregation, I'm not so sure
>>
>> how much this will improve the performance. Since after computing the
>>
>> gradient sum for all the data in one partitions, the vector will be no
>>
>> longer to be very sparse. Even it's sparse, after couple depth of
>>
>> aggregation, it will be very dense. Also, we perform the compression
>>
>> in the shuffle phase, so if there are many zeros, even it's in dense
>>
>> vector representation, the vector should take around the same size as
>>
>> sparse representation. I can be wrong since I never do a study on
>>
>> this, and I wonder how much performance we can gain in practice by
>>
>> using sparse vector for aggregating the gradients.
>>
>>
>>
>> Sincerely,
>>
>>
>>
>> DB Tsai
>>
>> --
>>
>> Web: https://www.dbtsai.com
>>
>> PGP Key ID: 0xAF08DF8D
>>
>>
>>
>>
>>
>> On Thu, Oct 6, 2016 at 4:09 AM, Nick Pentreath 
>> wrote:
>>
>> > I'm currently working on various performance tests for large, sparse
>> feature
>>
>> > spaces.
>>
>> >
>>
>> > For the Criteo DAC data - 45.8 million rows, 34.3 million features
>>
>> > (categorical, extremely sparse), the time per iteration for
>>
>> > ml.LogisticRegression is about 20-30s.
>>
>> >
>>
>> > This is with 4x worker nodes, 48 cores & 120GB RAM each. I haven't yet
>> tuned
>>
>> > the tree aggregation depth. But the number of partitions can make a
>>
>> > difference - generally fewer is better since the cost is mostly
>>
>> > communication of the gradient (the gradient computation is < 10% of the
>>
>> > per-iteration time).
>>
>> >
>>
>> > Note that the current impl forces dense arrays for intermediate data
>>
>> > structures, increasing the communication cost significantly. See this
>> PR for
>>
>> > info: https://github.com/apache/spark/pull/12761. Once sparse data
>>
>> > structures are supported for this, 

Re: Why the json file used by sparkSession.read.json must be a valid json object per line

2016-10-19 Thread Jakob Odersky
Another reason I could imagine is that files are often read from HDFS,
which by default uses line terminators to separate records.

It is possible to implement your own hdfs delimiter finder, however
for arbitrary json data, finding that delimiter would require stateful
parsing of the file and would be difficult to parallelize across a
cluster.

On Tue, Oct 18, 2016 at 4:40 PM, Hyukjin Kwon  wrote:
> Regarding his recent PR[1], I guess he meant multiple line json.
>
> As far as I know, single line json also conplies the standard. I left a
> comment with RFC in the PR but please let me know if I am wrong at any
> point.
>
> Thanks!
>
> [1]https://github.com/apache/spark/pull/15511
>
>
> On 19 Oct 2016 7:00 a.m., "Daniel Barclay" 
> wrote:
>>
>> Koert,
>>
>> Koert Kuipers wrote:
>>
>> A single json object would mean for most parsers it needs to fit in memory
>> when reading or writing
>>
>> Note that codlife didn't seem to being asking about single-object JSON
>> files, but about standard-format JSON files.
>>
>>
>> On Oct 15, 2016 11:09, "codlife" <1004910...@qq.com> wrote:
>>>
>>> Hi:
>>>I'm doubt about the design of spark.read.json,  why the json file is
>>> not
>>> a standard json file, who can tell me the internal reason. Any advice is
>>> appreciated.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-the-json-file-used-by-sparkSession-read-json-must-be-a-valid-json-object-per-line-tp27907.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Code review / sqlContext Scope

2016-10-19 Thread Ajay Chander
Can someone please shed some lights on this. I wrote the below code in
Scala 2.10.5, can someone please tell me if this is the right way of doing
it?


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext

class Test {

  def main(args: Array[String]): Unit = {

val conf = new SparkConf()
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)

sqlContext.sql("set spark.sql.shuffle.partitions=1000");
sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")

val dataElementsFile = "hdfs://nameservice/user/ajay/spark/flds.txt"

//deDF has only 61 rows
val deDF = 
sqlContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()

deDF.withColumn("ds_nm", lit("UDA")).withColumn("tabl_nm",
lit("TEST_DB.TEST_TABLE")).collect().filter(filterByDataset).map(calculateMetricsAtDELevel).foreach(persistResults)


// if ds_nm starts with 'RAW_' I dont want to process it
def filterByDataset(de: Row): Boolean = {
val datasetName = de.getAs[String]("ds_nm").trim
if (datasetName.startsWith("RAW_")) {
return false
}
else {
return true
}
}

def calculateMetricsAtDELevel(de: Row): DataFrame = {
val dataElement = de.getAs[String]("DataElement").trim
val datasetName = de.getAs[String]("ds_nm").trim
val tableName = de.getAs[String]("tabl_nm").trim

// udaDF holds 107,762,849 Rows * 412 Columns / 105 files in HDFS and 176.5
GB * 3 Replication Factor
val udaDF = sqlContext.sql("SELECT '" + datasetName + "' as ds_nm, cyc_dt,
supplier_proc_i, " +
" '" + dataElement + "' as data_elm, " + dataElement + " as data_elm_val
FROM " + tableName + "")

println("udaDF num Partitions: "+udaDF.toJavaRDD.getNumPartitions)
// udaDF.toJavaRDD.getNumPartitions = 1490

val calculatedMetrics = udaDF.groupBy("ds_nm", "cyc_dt", "supplier_proc_i",
"data_elm", "data_elm_val").count()
println("calculatedMetrics num Partitions: "
+calculatedMetrics.toJavaRDD.getNumPartitions)
// calculatedMetrics.toJavaRDD.getNumPartitions = 1000 since I set it to
sqlContext.sql("set spark.sql.shuffle.partitions=1000");

val adjustedSchemaDF = calculatedMetrics.withColumnRenamed("count",
"derx_val_cnt").withColumn("load_dt", current_timestamp())
println("adjustedSchemaDF num Partitions: "
+adjustedSchemaDF.toJavaRDD.getNumPartitions)
// adjustedSchemaDF.toJavaRDD.getNumPartitions = 1000 as well

return adjustedSchemaDF
}

def persistResults(adjustedSchemaDF: DataFrame) = {
// persist the resukts into Hive table backed by PARQUET
adjustedSchemaDF.write.partitionBy("ds_nm", "cyc_dt").mode("Append"
).insertInto("devl_df2_spf_batch.spf_supplier_trans_metric_detl_base_1")
}

}
}

This is my cluster( Spark 1.6.0 on Yarn, Cloudera 5.7.1) configuration,

Memory -> 4.10 TB

VCores -> 544

I am deploying the application in yarn client mode and the cluster is
set to use Dynamic Memory Allocation.

Any pointers are appreciated.

Thank you


On Sat, Oct 8, 2016 at 1:17 PM, Ajay Chander  wrote:

> Hi Everyone,
>
> Can anyone tell me if there is anything wrong with my code flow below ?
> Based on each element from the text file I would like to run a query
> against Hive table and persist results in another Hive table. I want to do
> this in parallel for each element in the file. I appreciate any of your
> inputs on this.
>
> $ cat /home/ajay/flds.txt
> PHARMY_NPI_ID
> ALT_SUPPLIER_STORE_NBR
> MAIL_SERV_NBR
>
> spark-shell  --name hivePersistTest --master yarn --deploy-mode client
>
> val dataElementsFile = "/home/ajay/flds.txt"
> val dataElements = Source.fromFile(dataElementsFile).getLines.toArray
>
> def calculateQuery (de: String)  : DataFrame = {
>   val calculatedQuery = sqlContext.sql("select 'UDA' as ds_nm, cyc_dt, 
> supplier_proc_i as supplier_proc_id, '" + de + "' as data_elm, " + de + " as 
> data_elm_val," +
> " count(1) as derx_val_cnt, current_timestamp as load_dt " +
> "from SPRINT2_TEST2 " +
> "group by 'UDA', cyc_dt, supplier_proc_i, '" + de + "' , " + de + " ")
>
>   return calculatedQuery
> }
>
> def persistResults (calculatedQuery: DataFrame) = {
>   calculatedQuery.write.insertInto("sprint2_stp1_test2")
> }
>
> dataElements.map(calculateQuery).foreach(persistResults)
>
>
> Thanks.
>
>


ApacheCon is now less than a month away!

2016-10-19 Thread Rich Bowen
Dear Apache Enthusiast,

ApacheCon Sevilla is now less than a month out, and we need your help
getting the word out. Please tell your colleagues, your friends, and
members of related technical communities, about this event. Rates go up
November 3rd, so register today!

ApacheCon, and Apache Big Data, are the official gatherings of the
Apache Software Foundation, and one of the best places in the world to
meet other members of your project community, gain deeper knowledge
about your favorite Apache projects, learn about the ASF. Your project
doesn't live in a vacuum - it's part of a larger family of projects that
have a shared set of values, as well as a shared governance model. And
many of our project have an overlap in developers, in communities, and
in subject matter, making ApacheCon a great place for cross-pollination
of ideas and of communities.

Some highlights of these events will be:

* Many of our board members and project chairs will be present
* The lightning talks are a great place to hear, and give, short
presentations about what you and other members of the community are
working on
* The key signing gets you linked into the web of trust, and better
able to verify our software releases
* Evening receptions and parties where you can meet community
members in a less formal setting
* The State of the Feather, where you can learn what the ASF has
done in the last year, and what's coming next year
* BarCampApache, an informal unconference-style event, is another
venue for discussing your projects at the ASF

We have a great schedule lined up, covering the wide range of ASF
projects, including:

* CI and CD at Scale: Scaling Jenkins with Docker and Apache Mesos -
Carlos Sanchez
* Inner sourcing 101 - Jim Jagielski
* Java Memory Leaks in Modular Environments - Mark Thomas

ApacheCon/Apache Big Data will be held in Sevilla, Spain, at the Melia
Sevilla, November 14th through 18th. You can find out more at
http://apachecon.com/  Other ways to stay up to date with ApacheCon are:

* Follow us on Twitter at @apachecon
* Join us on IRC, at #apachecon on the Freenode IRC network
* Join the apachecon-discuss mailing list by sending email to
apachecon-discuss-subscr...@apache.org
* Or contact me directly at rbo...@apache.org with questions,
comments, or to volunteer to help

See you in Sevilla!

-- 
Rich Bowen: VP, Conferences
rbo...@apache.org
http://apachecon.com/
@apachecon

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.0 with Kafka 0.10 exception

2016-10-19 Thread Cody Koeninger
60 seconds for a batch is above the default settings in kafka related
to heartbeat timeouts, so that might be related.  Have you tried
tweaking session.timeout.ms, heartbeat.interval.ms, or related
configs?

On Wed, Oct 19, 2016 at 12:22 PM, Srikanth  wrote:
> Bringing this thread back as I'm seeing this exception on a production kafka
> cluster.
>
> I have two Spark streaming apps reading the same topic. App1 has batch
> interval 2secs and app2 has 60secs.
> Both apps are running on the same cluster on similar hardware. I see this
> exception only in app2 and fairly consistently.
>
> Difference I see between the apps is
> App1
>   spark.streaming.kafka.maxRatePerPartition, 6000
>   batch interval 2 secs
> App2
>   spark.streaming.kafka.maxRatePerPartition, 1
>   batch interval 60 secs
>
> All other kafka/spark related configs are same for both apps.
>   spark.streaming.kafka.consumer.poll.ms = 4096
>   spark.streaming.backpressure.enabled = true
>
> Not sure if pre-fetching or caching is messing things up.
>
> 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID
> 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: assertion
> failed: Failed to get records for spark-executor-StreamingEventSplitProd
> mt_event 6 49091480 after polling for 4096
> at scala.Predef$.assert(Predef.scala:170)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$21.next(Iterator.scala:838)
>
>
> On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger  wrote:
>>
>> That's not what I would have expected to happen with a lower cache
>> setting, but in general disabling the cache isn't something you want
>> to do with the new kafka consumer.
>>
>>
>> As far as the original issue, are you seeing those polling errors
>> intermittently, or consistently?  From your description, it sounds
>> like retry is working correctly.
>>
>>
>> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth  wrote:
>> > Setting those two results in below exception.
>> > No.of executors < no.of partitions. Could that be triggering this?
>> >
>> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
>> > (TID 9)
>> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> > multi-threaded access
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
>> > at java.util.HashMap.putVal(Unknown Source)
>> > at java.util.HashMap.put(Unknown Source)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> >
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>> > at
>> >
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
>> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> > at java.lang.Thread.run(Unknown Source)
>> >
>> >
>> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger 
>> > wrote:
>> >>
>> >> you could try setting
>> >>
>> >> 

Re: Spark 2.0 with Kafka 0.10 exception

2016-10-19 Thread Srikanth
Bringing this thread back as I'm seeing this exception on a production
kafka cluster.

I have two Spark streaming apps reading the same topic. App1 has batch
interval 2secs and app2 has 60secs.
Both apps are running on the same cluster on similar hardware. I see this
exception only in app2 and fairly consistently.

Difference I see between the apps is
App1
  spark.streaming.kafka.maxRatePerPartition, 6000
  batch interval 2 secs
App2
  spark.streaming.kafka.maxRatePerPartition, 1
  batch interval 60 secs

All other kafka/spark related configs are same for both apps.
  spark.streaming.kafka.consumer.poll.ms = 4096
  spark.streaming.backpressure.enabled = true

Not sure if pre-fetching or caching is messing things up.

16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID
12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: assertion
failed: Failed to get records for spark-executor-StreamingEventSplitProd
mt_event 6 49091480 after polling for 4096
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$
KafkaRDDIterator.next(KafkaRDD.scala:227)
at org.apache.spark.streaming.kafka010.KafkaRDD$
KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$21.next(Iterator.scala:838)


On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger  wrote:

> That's not what I would have expected to happen with a lower cache
> setting, but in general disabling the cache isn't something you want
> to do with the new kafka consumer.
>
>
> As far as the original issue, are you seeing those polling errors
> intermittently, or consistently?  From your description, it sounds
> like retry is working correctly.
>
>
> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth  wrote:
> > Setting those two results in below exception.
> > No.of executors < no.of partitions. Could that be triggering this?
> >
> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
> (TID 9)
> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> > multi-threaded access
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1360)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> > at java.util.HashMap.putVal(Unknown Source)
> > at java.util.HashMap.put(Unknown Source)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:158)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(
> KafkaRDD.scala:210)
> > at org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> > at java.lang.Thread.run(Unknown Source)
> >
> >
> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger 
> wrote:
> >>
> >> you could try setting
> >>
> >> spark.streaming.kafka.consumer.cache.initialCapacity
> >>
> >> spark.streaming.kafka.consumer.cache.maxCapacity
> >>
> >> to 1
> >>
> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth  wrote:
> >> > I had a look at the executor logs and noticed that this exception
> >> > happens
> >> > only when using the cached consumer.
> >> > Every retry is successful. This is consistent.
> >> > One possibility is that the cached 

Re: How to make Mesos Cluster Dispatcher of Spark 1.6.1 load my config files?

2016-10-19 Thread Michael Gummelt
See https://issues.apache.org/jira/browse/SPARK-13258 for an explanation
and workaround.

On Wed, Oct 19, 2016 at 1:35 AM, Chanh Le  wrote:

> Thank you Daniel,
> Actually I tried this before but this way is still not flexible way if you
> are running multiple jobs at the time and may different dependencies
> between each job configuration so I gave up.
>
> Another simple solution is set the command bellow as a service and I am
> using it.
>
> /build/analytics/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
>>
>>
>>
>>
>> *--files /build/analytics/kafkajobs/prod.conf \--conf
>> 'spark.executor.extraJavaOptions=-Dconfig.fuction.conf' \--conf
>> 'spark.driver.extraJavaOptions=-Dconfig.file=/build/analytics/kafkajobs/prod.conf'
>> \--conf
>> 'spark.driver.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>> \--conf
>> 'spark.executor.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>> \*
>> --class com.ants.util.kafka.PersistenceData \
>>
>> *--master mesos://10.199.0.19:5050 \*--executor-memory 5G \
>> --driver-memory 2G \
>> --total-executor-cores 4 \
>> --jars /build/analytics/kafkajobs/spark-streaming-kafka_2.10-1.6.2.jar \
>> /build/analytics/kafkajobs/kafkajobs-prod.jar
>>
>
> [Unit]
> Description=Mesos Cluster Dispatcher
>
> [Service]
> ExecStart=/build/analytics/kafkajobs/persist-job.sh
> PIDFile=/var/run/spark-persist.pid
> [Install]
> WantedBy=multi-user.target
>
>
> Regards,
> Chanh
>
> On Oct 19, 2016, at 2:15 PM, Daniel Carroza  wrote:
>
> Hi Chanh,
>
> I found a workaround that works to me:
> http://stackoverflow.com/questions/29552799/spark-
> unable-to-find-jdbc-driver/40114125#40114125
>
> Regards,
> Daniel
>
> El jue., 6 oct. 2016 a las 6:26, Chanh Le ()
> escribió:
>
>> Hi everyone,
>> I have the same config in both mode and I really want to change config
>> whenever I run so I created a config file and run my application with it.
>> My problem is:
>> It’s works with these config without using Mesos Cluster Dispatcher.
>>
>> /build/analytics/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
>>
>>
>>
>>
>> *--files /build/analytics/kafkajobs/prod.conf \--conf
>> 'spark.executor.extraJavaOptions=-Dconfig.fuction.conf' \--conf
>> 'spark.driver.extraJavaOptions=-Dconfig.file=/build/analytics/kafkajobs/prod.conf'
>> \--conf
>> 'spark.driver.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>> \--conf
>> 'spark.executor.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>> \*
>> --class com.ants.util.kafka.PersistenceData \
>>
>> *--master mesos://10.199.0.19:5050 \*--executor-memory 5G \
>> --driver-memory 2G \
>> --total-executor-cores 4 \
>> --jars /build/analytics/kafkajobs/spark-streaming-kafka_2.10-1.6.2.jar \
>> /build/analytics/kafkajobs/kafkajobs-prod.jar
>>
>>
>> And it’s didn't work with these:
>>
>> /build/analytics/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
>>
>>
>>
>>
>> *--files /build/analytics/kafkajobs/prod.conf \--conf
>> 'spark.executor.extraJavaOptions=-Dconfig.fuction.conf' \--conf
>> 'spark.driver.extraJavaOptions=-Dconfig.file=/build/analytics/kafkajobs/prod.conf'
>> \--conf
>> 'spark.driver.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>> \--conf
>> 'spark.executor.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>> \*
>> --class com.ants.util.kafka.PersistenceData \
>>
>>
>> *--master mesos://10.199.0.19:7077 \--deploy-mode cluster \--supervise \*
>> --executor-memory 5G \
>> --driver-memory 2G \
>> --total-executor-cores 4 \
>> --jars /build/analytics/kafkajobs/spark-streaming-kafka_2.10-1.6.2.jar \
>> /build/analytics/kafkajobs/kafkajobs-prod.jar
>>
>> It threw me an error: *Exception in thread "main" java.sql.SQLException:
>> No suitable driver found for jdbc:postgresql://psqlhost:5432/kafkajobs*
>> which means my —conf didn’t work and those config I put in 
>> */build/analytics/kafkajobs/prod.conf
>> *wasn’t loaded. It only loaded thing I put in application.conf (default
>> config).
>>
>> How to make MCD load my config?
>>
>> Regards,
>> Chanh
>>
>> --
> Daniel Carroza Santana
> Vía de las Dos Castillas, 33, Ática 4, 3ª Planta.
> 28224 Pozuelo de Alarcón. Madrid.
> Tel: +34 91 828 64 73 // *@stratiobd *
>
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: Deep learning libraries for scala

2016-10-19 Thread janardhan shetty
Agreed. But as it states deeper integration with (scala) is yet to be
developed.
Any thoughts on how to use tensorflow with scala ? Need to write wrappers I
think.

On Oct 19, 2016 7:56 AM, "Benjamin Kim"  wrote:

> On that note, here is an article that Databricks made regarding using
> Tensorflow in conjunction with Spark.
>
> https://databricks.com/blog/2016/01/25/deep-learning-with-
> apache-spark-and-tensorflow.html
>
> Cheers,
> Ben
>
>
> On Oct 19, 2016, at 3:09 AM, Gourav Sengupta 
> wrote:
>
> while using Deep Learning you might want to stay as close to tensorflow as
> possible. There is very less translation loss, you get to access stable,
> scalable and tested libraries from the best brains in the industry and as
> far as Scala goes, it helps a lot to think about using the language as a
> tool to access algorithms in this instance unless you want to start
> developing algorithms from grounds up ( and in which case you might not
> require any libraries at all).
>
> On Sat, Oct 1, 2016 at 3:30 AM, janardhan shetty 
> wrote:
>
>> Hi,
>>
>> Are there any good libraries which can be used for scala deep learning
>> models ?
>> How can we integrate tensorflow with scala ML ?
>>
>
>
>


Re: Deep learning libraries for scala

2016-10-19 Thread Benjamin Kim
On that note, here is an article that Databricks made regarding using 
Tensorflow in conjunction with Spark.

https://databricks.com/blog/2016/01/25/deep-learning-with-apache-spark-and-tensorflow.html

Cheers,
Ben


> On Oct 19, 2016, at 3:09 AM, Gourav Sengupta  
> wrote:
> 
> while using Deep Learning you might want to stay as close to tensorflow as 
> possible. There is very less translation loss, you get to access stable, 
> scalable and tested libraries from the best brains in the industry and as far 
> as Scala goes, it helps a lot to think about using the language as a tool to 
> access algorithms in this instance unless you want to start developing 
> algorithms from grounds up ( and in which case you might not require any 
> libraries at all).
> 
> On Sat, Oct 1, 2016 at 3:30 AM, janardhan shetty  > wrote:
> Hi,
> 
> Are there any good libraries which can be used for scala deep learning models 
> ?
> How can we integrate tensorflow with scala ML ?
> 



Re: LDA and Maximum Iterations

2016-10-19 Thread Richard Garris
Hi Frank,

Two suggestions

1. I would recommend caching the corpus prior to running LDA

2. If you are using EM I would tweak the sample size using the
setMiniBatchFraction
parameter to decrease the sample per iteration.

-Richard

On Tue, Sep 20, 2016 at 10:27 AM, Frank Zhang <
dataminin...@yahoo.com.invalid> wrote:

> Hi Yuhao,
>
>Thank you so much for your great contribution to the LDA and other
> Spark modules!
>
> I use both Spark 1.6.2 and 2.0.0. The data I used originally is very
> large which has tens of millions of documents. But for test purpose, the
> data set I mentioned earlier ("/data/mllib/sample_lda_data.txt") is good
> enough.  Please change the path to where you install your Spark to point to
> the data set and run those lines:
>
> import org.apache.spark.mllib.clustering.LDA
> import org.apache.spark.mllib.linalg.Vectors
>
> *//please change the path for the data set below:*
> *val data = sc.textFile("/data/mllib/sample_lda_data.txt") *
> val parsedData = data.map(s => Vectors.dense(s.trim.split('
> ').map(_.toDouble)))
> val corpus = parsedData.zipWithIndex.map(_.swap).cache()
> val ldaModel = new LDA().setK(3).run(corpus)
>
>It should work. After that, please run:
> val ldaModel = new LDA().setK(3).setMaxIterations(500).run(corpus)
>
>When I ran it, at job #90, that iteration took relatively extremely
> long then it stopped with exception:
> Active Jobs (1)
> Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all
> stages): Succeeded/Total
> 90 fold at LDAOptimizer.scala:226
>  2016/09/20 10:18:30 22 s 0/269
> 0/538
> Completed Jobs (90)
> Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all
> stages): Succeeded/Total
> 89 fold at LDAOptimizer.scala:226
>  2016/09/20 10:18:30 43 ms 4/4
> (262 skipped)
> 8/8 (524 skipped)
> 88 fold at LDAOptimizer.scala:226
>  2016/09/20 10:18:30 40 ms 4/4
> (259 skipped)
> 8/8 (518 skipped)
> 87 fold at LDAOptimizer.scala:226
>  2016/09/20 10:18:29 80 ms 4/4
> (256 skipped)
> 8/8 (512 skipped)
> 86 fold at LDAOptimizer.scala:226
>  2016/09/20 10:18:29 41 ms 4/4
> (253 skipped)
> 8/8 (506 skipped)
>Part of the error message:
> Driver stacktrace:
>   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1450)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1438)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1437)
>   at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1437)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>   at scala.Option.foreach(Option.scala:257)
>   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:811)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1659)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1618)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1607)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(
> DAGScheduler.scala:632)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
>   at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1046)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>   at org.apache.spark.rdd.RDD.fold(RDD.scala:1040)
>   at org.apache.spark.mllib.clustering.EMLDAOptimizer.
> computeGlobalTopicTotals(LDAOptimizer.scala:226)
>   at org.apache.spark.mllib.clustering.EMLDAOptimizer.
> next(LDAOptimizer.scala:213)
>   at org.apache.spark.mllib.clustering.EMLDAOptimizer.
> next(LDAOptimizer.scala:79)
>   at org.apache.spark.mllib.clustering.LDA.run(LDA.scala:334)
>   ... 48 elided
> Caused by: java.lang.StackOverflowError
>   at java.lang.reflect.InvocationTargetException.(
> InvocationTargetException.java:72)
>   at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>   at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>   at 

Re: K-Mean retrieving Cluster Members

2016-10-19 Thread Robin East

or alternatively this should work (assuming parsedData is an RDD[Vector]):

clusters.predict(parsedData)





> On 18 Oct 2016, at 00:35, Reth RM  wrote:
> 
> I think I got it
>  
>   parsedData.foreach(
> new VoidFunction() {
> @Override
> public void call(Vector vector) throws Exception {
> System.out.println(clusters.predict(vector));
> 
> }
> }
> );
> 
> 
> On Mon, Oct 17, 2016 at 10:56 AM, Reth RM  > wrote:
> Could you please point me to sample code to retrieve the cluster members of K 
> mean?
> 
> The below code prints cluster centers.  I needed cluster members belonging to 
> each center.
> 
>  
> val clusters = KMeans.train(parsedData, numClusters, numIterations) 
> clusters.clusterCenters.foreach(println)
> 



Joins of typed datasets

2016-10-19 Thread daunnc
Hi! 
I work with a new Spark 2 datasets api. PR:
https://github.com/geotrellis/geotrellis/pull/1675

The idea is to use Datasets[(K, V)] and for example to join by Key of type
K. 
The first problems was that there are no Encoders for custom types (not
products), so the workaround was to use Kryo:
https://github.com/pomadchin/geotrellis/blob/4f417f3c5e99eacf2ca57b4e8405047d556beda0/spark/src/main/scala/geotrellis/spark/KryoEncoderImplicits.scala

But it has a limitation, that we can't join on this K type (i suppose as
Spark represents everything as byte blobs, using kryo encoder): 

def combineValues[R: ClassTag](other: Dataset[(K, V)])(f: (V, V) => R):
Dataset[(K, R)] = {
  self.toDF("_1", "_2").alias("self").join(other.toDF("_1",
"_2").alias("other"), $"self._1" === $"other._1").as[(K, (V,
V))].mapValues({ case (tile1, tile2) =>
f(tile1, tile2)
  })
}

What is the correct solution? K is important, as it is a geospatial key.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joins-of-typed-datasets-tp27924.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



how to see spark class variable values on variable explorer of spyder for python?

2016-10-19 Thread muhammet pakyürek

is there any way to  to see spark class variable values on variable explorer of 
spyder for python?



Re: spark with kerberos

2016-10-19 Thread Steve Loughran

On 19 Oct 2016, at 00:18, Michael Segel 
> wrote:

(Sorry sent reply via wrong account.. )

Steve,

Kinda hijacking the thread, but I promise its still on topic to OP’s issue.. ;-)

Usually you will end up having a local Kerberos set up per cluster.
So your machine accounts (hive, yarn, hbase, etc …) are going to be local  to 
the cluster.


not necessarily...you can share a KDC. And in a land of active directory you'd 
need some trust



So you will have to set up some sort of realm trusts between the clusters.

If you’re going to be setting up security (Kerberos … ick! shivers… ;-) you’re 
going to want to keep the machine accounts isolated to the cluster.
And the OP said that he didn’t control the other cluster which makes me believe 
that they are separate.


good point; you may not be able to get the tickets for cluster C accounts. But 
if you can log in as a user for


I would also think that you would have trouble with the credential… isn’t is 
tied to a user at a specific machine?

there are two types of kerberos identity, simple "hdfs@REALM" and 
server-specific "hdfs/server@REALM". The simple ones work just as well in small 
clusters, it's just that in larger clusters your KDCs (especially AD) tend to 
interpret an attempt by 200 machines to log in as user "hdfs@REALM" in 30s as 
an attempt to brute force a password, and start rejecting logins. The 
separation into hdfs/_HOST_/REALM style avoids that, and may reduce the damage 
if the keytab leaks

If the user submitting work is logged into the KDC of cluster C, e.g:


kinit user@CLUSTERC


and spark is configured to ask for the extra namenode tokens,

spark.yarn.access.namenodes hdfs://cluster-c:8020


..then spark MAY ask for those tokens, pass them up to cluster B and so have 
them available for talking to cluster C. The submitted job is using the block 
tokens, so doesn't need to log in to kerberos itself, and if cluster B is 
insecure, doesn't need to worry about credentials and identity there. The HDFS 
client code just returns the block token to talk to cluster C when an attempt 
to talk to the DN of cluster C is rejected with an "authenticate yourself" 
response.

The main issue to me is: will that token get picked up and propagated to an 
insecure cluster, so as to support this operation? Because there's a risk that 
the ubiquitous static method, UserGroupInformation.isSecurityEnabled() is being 
checked in places, and as the cluster itself isn't secure 
(hadoop.security.authentication  in core-site.xml != "simple"). It looks like 
org.apache.spark.deploy.yarn.security.HDFSCredentialProvider is doing exactly 
that (as does HBase and Hive), meaning job submission doesn't fetch tokens 
unless the destination cluster is secure.

One thing that could be attempted, would be turning authentication on to 
kerberos just in the job launch config, and seeing if that will collect all 
required tokens *without* getting confused by the fact that YARN and HDFS don't 
need them.

spark.hadoop.hadoop.security.authentication

I have no idea if this works; you've have to try it and see

(Its been a while since I looked at this and I drank heavily to forget 
Kerberos… so I may be a bit fuzzy here.)


denying all knowledge of Kerberos is always a good tactic.


Re: About Error while reading large JSON file in Spark

2016-10-19 Thread Steve Loughran

On 18 Oct 2016, at 10:58, Chetan Khatri 
> wrote:

Dear Xi shen,

Thank you for getting back to question.

The approach i am following are as below:
I have MSSQL server as Enterprise data lack.

1. run Java jobs and generated JSON files, every file is almost 6 GB.
Correct spark need every JSON on separate line, so i did
sed -e 's/}/}\n/g' -s old-file.json > new-file.json
to get every json element on separate lines.
2. uploaded to s3 bucket and reading from their using sqlContext.read.json() 
function, where i am getting above error.

Note: If i am running for small size files then i am not getting this error 
where JSON elements are almost same structured.

Current approach:


  *splitting large JSON(6 GB) to 1-1 GB then will process.

Note: Machine size is , 1 master and 2 slave, each 4 vcore, 26 GB RAM

I see what you are trying to do here: one JSON file per line, then splitting by 
line so that you can parallelise JSON processing, as well as holding many JSON 
objects in a single s3 file. This is a devious little trick. It just doesn't 
work once the json files goes > 2^31 bytes long, as the code to split by line 
breaks.

You could write your own input splitter which actually does basic Json parsing, 
splitting up by looking for the final } in a JSON clause (harder than you 
think, as you need to remember how many {} clauses you have entered and not 
include escaped "{" in strings.

a quick google shows some that may be a good starting point

https://github.com/Pivotal-Field-Engineering/pmr-common/blob/master/PivotalMRCommon/src/main/java/com/gopivotal/mapreduce/lib/input/JsonInputFormat.java
https://github.com/alexholmes/json-mapreduce



Re: Deep learning libraries for scala

2016-10-19 Thread Gourav Sengupta
while using Deep Learning you might want to stay as close to tensorflow as
possible. There is very less translation loss, you get to access stable,
scalable and tested libraries from the best brains in the industry and as
far as Scala goes, it helps a lot to think about using the language as a
tool to access algorithms in this instance unless you want to start
developing algorithms from grounds up ( and in which case you might not
require any libraries at all).

On Sat, Oct 1, 2016 at 3:30 AM, janardhan shetty 
wrote:

> Hi,
>
> Are there any good libraries which can be used for scala deep learning
> models ?
> How can we integrate tensorflow with scala ML ?
>


Re: How to make Mesos Cluster Dispatcher of Spark 1.6.1 load my config files?

2016-10-19 Thread Chanh Le
Thank you Daniel,
Actually I tried this before but this way is still not flexible way if you are 
running multiple jobs at the time and may different dependencies between each 
job configuration so I gave up.

Another simple solution is set the command bellow as a service and I am using 
it.

> /build/analytics/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
> --files /build/analytics/kafkajobs/prod.conf \
> --conf 'spark.executor.extraJavaOptions=-Dconfig.fuction.conf' \
> --conf 
> 'spark.driver.extraJavaOptions=-Dconfig.file=/build/analytics/kafkajobs/prod.conf'
>  \
> --conf 
> 'spark.driver.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>  \
> --conf 
> 'spark.executor.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>  \
> --class com.ants.util.kafka.PersistenceData \
> --master mesos://10.199.0.19:5050 <> \
> --executor-memory 5G \
> --driver-memory 2G \
> --total-executor-cores 4 \
> --jars /build/analytics/kafkajobs/spark-streaming-kafka_2.10-1.6.2.jar \
> /build/analytics/kafkajobs/kafkajobs-prod.jar


[Unit]
Description=Mesos Cluster Dispatcher

[Service]
ExecStart=/build/analytics/kafkajobs/persist-job.sh
PIDFile=/var/run/spark-persist.pid
[Install]
WantedBy=multi-user.target


Regards,
Chanh

> On Oct 19, 2016, at 2:15 PM, Daniel Carroza  wrote:
> 
> Hi Chanh,
> 
> I found a workaround that works to me:
> http://stackoverflow.com/questions/29552799/spark-unable-to-find-jdbc-driver/40114125#40114125
>  
> 
> 
> Regards,
> Daniel
> 
> El jue., 6 oct. 2016 a las 6:26, Chanh Le ( >) escribió:
> Hi everyone,
> I have the same config in both mode and I really want to change config 
> whenever I run so I created a config file and run my application with it.
> My problem is: 
> It’s works with these config without using Mesos Cluster Dispatcher.
> 
> /build/analytics/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
> --files /build/analytics/kafkajobs/prod.conf \
> --conf 'spark.executor.extraJavaOptions=-Dconfig.fuction.conf' \
> --conf 
> 'spark.driver.extraJavaOptions=-Dconfig.file=/build/analytics/kafkajobs/prod.conf'
>  \
> --conf 
> 'spark.driver.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>  \
> --conf 
> 'spark.executor.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>  \
> --class com.ants.util.kafka.PersistenceData \
> --master mesos://10.199.0.19:5050 <> \
> --executor-memory 5G \
> --driver-memory 2G \
> --total-executor-cores 4 \
> --jars /build/analytics/kafkajobs/spark-streaming-kafka_2.10-1.6.2.jar \
> /build/analytics/kafkajobs/kafkajobs-prod.jar
> 
> 
> And it’s didn't work with these:
> 
> /build/analytics/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
> --files /build/analytics/kafkajobs/prod.conf \
> --conf 'spark.executor.extraJavaOptions=-Dconfig.fuction.conf' \
> --conf 
> 'spark.driver.extraJavaOptions=-Dconfig.file=/build/analytics/kafkajobs/prod.conf'
>  \
> --conf 
> 'spark.driver.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>  \
> --conf 
> 'spark.executor.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
>  \
> --class com.ants.util.kafka.PersistenceData \
> --master mesos://10.199.0.19:7077 <> \
> --deploy-mode cluster \
> --supervise \
> --executor-memory 5G \
> --driver-memory 2G \
> --total-executor-cores 4 \
> --jars /build/analytics/kafkajobs/spark-streaming-kafka_2.10-1.6.2.jar \
> /build/analytics/kafkajobs/kafkajobs-prod.jar
> 
> It threw me an error: Exception in thread "main" java.sql.SQLException: No 
> suitable driver found for jdbc:postgresql://psqlhost:5432/kafkajobs <>
> which means my —conf didn’t work and those config I put in 
> /build/analytics/kafkajobs/prod.conf wasn’t loaded. It only loaded thing I 
> put in application.conf (default config).
> 
> How to make MCD load my config?
> 
> Regards,
> Chanh
> 
> -- 
> Daniel Carroza Santana
> 
> Vía de las Dos Castillas, 33, Ática 4, 3ª Planta.
> 28224 Pozuelo de Alarcón. Madrid.
> Tel: +34 91 828 64 73 <> // @stratiobd 


Re: Substitute Certain Rows a data Frame using SparkR

2016-10-19 Thread Felix Cheung
It's a bit less concise but this works:

> a <- as.DataFrame(cars)
> head(a)
  speed dist
1 4 2
2 4 10
3 7 4
4 7 22
5 8 16
6 9 10

> b <- withColumn(a, "speed", ifelse(a$speed > 15, a$speed, 3))
> head(b)
  speed dist
1 3 2
2 3 10
3 3 4
4 3 22
5 3 16
6 3 10

I think your example could be something we support though. Please feel free to 
open a JIRA for that.
_
From: shilp >
Sent: Monday, October 17, 2016 7:38 AM
Subject: Substitute Certain Rows a data Frame using SparkR
To: >


I have a sparkR Data frame and I want to Replace certain Rows of a Column which 
satisfy certain condition with some value.If it was a simple R data frame then 
I would do something as follows:df$Column1[df$Column1 == "Value"] = "NewValue" 
How would i perform similar operation on a SparkR data frame. ??

View this message in context: Substitute Certain Rows a data Frame using 
SparkR
Sent from the Apache Spark User List mailing list 
archive at 
Nabble.com.




Re: How to make Mesos Cluster Dispatcher of Spark 1.6.1 load my config files?

2016-10-19 Thread Daniel Carroza
Hi Chanh,

I found a workaround that works to me:
http://stackoverflow.com/questions/29552799/spark-unable-to-find-jdbc-driver/40114125#40114125

Regards,
Daniel

El jue., 6 oct. 2016 a las 6:26, Chanh Le () escribió:

> Hi everyone,
> I have the same config in both mode and I really want to change config
> whenever I run so I created a config file and run my application with it.
> My problem is:
> It’s works with these config without using Mesos Cluster Dispatcher.
>
> /build/analytics/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
>
>
>
>
> *--files /build/analytics/kafkajobs/prod.conf \--conf
> 'spark.executor.extraJavaOptions=-Dconfig.fuction.conf' \--conf
> 'spark.driver.extraJavaOptions=-Dconfig.file=/build/analytics/kafkajobs/prod.conf'
> \--conf
> 'spark.driver.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
> \--conf
> 'spark.executor.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
> \*
> --class com.ants.util.kafka.PersistenceData \
>
> *--master mesos://10.199.0.19:5050 \*--executor-memory 5G \
> --driver-memory 2G \
> --total-executor-cores 4 \
> --jars /build/analytics/kafkajobs/spark-streaming-kafka_2.10-1.6.2.jar \
> /build/analytics/kafkajobs/kafkajobs-prod.jar
>
>
> And it’s didn't work with these:
>
> /build/analytics/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
>
>
>
>
> *--files /build/analytics/kafkajobs/prod.conf \--conf
> 'spark.executor.extraJavaOptions=-Dconfig.fuction.conf' \--conf
> 'spark.driver.extraJavaOptions=-Dconfig.file=/build/analytics/kafkajobs/prod.conf'
> \--conf
> 'spark.driver.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
> \--conf
> 'spark.executor.extraClassPath=/build/analytics/spark-1.6.1-bin-hadoop2.6/lib/postgresql-9.3-1102.jdbc41.jar'
> \*
> --class com.ants.util.kafka.PersistenceData \
>
>
> *--master mesos://10.199.0.19:7077 \--deploy-mode cluster \--supervise \*
> --executor-memory 5G \
> --driver-memory 2G \
> --total-executor-cores 4 \
> --jars /build/analytics/kafkajobs/spark-streaming-kafka_2.10-1.6.2.jar \
> /build/analytics/kafkajobs/kafkajobs-prod.jar
>
> It threw me an error: *Exception in thread "main" java.sql.SQLException:
> No suitable driver found for jdbc:postgresql://psqlhost:5432/kafkajobs*
> which means my —conf didn’t work and those config I put in 
> */build/analytics/kafkajobs/prod.conf
> *wasn’t loaded. It only loaded thing I put in application.conf (default
> config).
>
> How to make MCD load my config?
>
> Regards,
> Chanh
>
> --
Daniel Carroza Santana
Vía de las Dos Castillas, 33, Ática 4, 3ª Planta.
28224 Pozuelo de Alarcón. Madrid.
Tel: +34 91 828 64 73 // *@stratiobd *


Re: question about the new Dataset API

2016-10-19 Thread Yang
I even added a fake groupByKey on the entire DataSet:


scala> a_ds.groupByKey(k=>1).agg(typed.count[(Long,Long)](_._1)).show
+-++
|value|TypedCount(scala.Tuple2)|
+-++
|1|   2|
+-++




On Tue, Oct 18, 2016 at 11:30 PM, Yang  wrote:

> scala> val a = sc.parallelize(Array((1,2),(3,4)))
> a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[243] at
> parallelize at :38
>
> scala> val a_ds = hc.di.createDataFrame(a).as[(Long,Long)]
> a_ds: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: int, _2: int]
>
> scala> a_ds.agg(typed.count[(Long,Long)](x=>x._1))
> res34: org.apache.spark.sql.DataFrame = [TypedCount(org.apache.spark.sql.Row):
> bigint]
>
> scala> res34.show
>
> then it gave me the following error:
>
> Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.
> expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
>
> at $anonfun$1.apply(:46)at 
> org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:69)at
>  
> org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:66)at
>  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at
>  
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at
>  scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)at
>  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at 
> org.apache.spark.scheduler.Task.run(Task.scala:86)at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at
>  java.lang.Thread.run(Thread.java:745)
>
>
> I had to add a groupByKey()
> scala> a_ds.groupByKey(k=>k._1).agg(typed.count[(Long,Long)](_._1)).show
> +-++
> |value|TypedCount(scala.Tuple2)|
> +-++
> |1|   1|
> |3|   1|
> +-++
>
> but why does the groupByKey() make it any different? looks like a bug
>
>
>


question about the new Dataset API

2016-10-19 Thread Yang
scala> val a = sc.parallelize(Array((1,2),(3,4)))
a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[243] at
parallelize at :38

scala> val a_ds = hc.di.createDataFrame(a).as[(Long,Long)]
a_ds: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: int, _2: int]

scala> a_ds.agg(typed.count[(Long,Long)](x=>x._1))
res34: org.apache.spark.sql.DataFrame =
[TypedCount(org.apache.spark.sql.Row): bigint]

scala> res34.show

then it gave me the following error:

Caused by: java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
cast to scala.Tuple2

at $anonfun$1.apply(:46)at
org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:69)at
org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:66)at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at
org.apache.spark.scheduler.Task.run(Task.scala:86)at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at
java.lang.Thread.run(Thread.java:745)


I had to add a groupByKey()
scala> a_ds.groupByKey(k=>k._1).agg(typed.count[(Long,Long)](_._1)).show
+-++
|value|TypedCount(scala.Tuple2)|
+-++
|1|   1|
|3|   1|
+-++

but why does the groupByKey() make it any different? looks like a bug


Equivalent Parquet File Repartitioning Benefits for Join/Shuffle?

2016-10-19 Thread adam kramer
Hello All,

I’m trying to improve join efficiency within (self-join) and across
data sets loaded from different parquet files primarily due to a
multi-stage data ingestion environment.

Are there specific benefits to shuffling efficiency (e.g. no network
transmission) if the parquet files are written from equivalently
partitioned datasets (i.e. same partition columns and number of
partitions)?

A self-join and multi-join Scala shell example that uses the method in question:
% val df1 = 
sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-1")
% val df2 = 
sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-2")
% val df1_part = df1.repartition(500,$”a",$”b",$”c")
% val df2_part = df2.repartition(500,$”a",$”b",$”c")
% 
df1_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-1”)
% 
df2_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-2”)
% val reloaded_df1_part =
sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-1”)
% val reloaded_df2_part =
sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-2”)
% val superfast_self_join =
reloaded_df1_part.join(reloaded_df1_part.select($”a”,$”b”,$”c”,$”d”.as(“right_d”)),
Seq(“a”,”b”,”c”))
% val superfast_multi_join =
reloaded_df1_part.join(reloaded_df2_part.select($”a”,$”b”,$”c”,$”not_in_df1”),
Seq(“a”,”b”,”c”))
% superfast_self_join.count
% superfast_multi_join.count

Ignoring the time necessary to repartition and assuming good
partitioning cardinality (while joining groups of rows), are there
performance benefits to this approach for joins ‘superfast_self_join'
and 'superfast_multi_join'? Or is there no benefit as the partitioner
information is lost upon persistence/write to parquet?

Note I am currently using Spark 1.6.3 and moving to 2.0.1 in the near future.

Thank you for any insights.

Adam

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org