Re: Question about installing Apache Spark [PySpark] computer requirements

2024-07-29 Thread mike Jadoo
856)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
at 
org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
at 
org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Cannot run program
"C:\Users\mikej\AppData\Local\Programs\Python\Python312":
CreateProcess error=5, Access is denied
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181)
at 
org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
at 
org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more


On Mon, Jul 29, 2024 at 4:34 PM

Question about installing Apache Spark [PySpark] computer requirements

2024-07-29 Thread mike Jadoo
Hello,

I am trying to run Pyspark on my computer without success.  I follow
several different directions from online sources and it appears that I need
to get a faster computer.

I wanted to ask what are some recommendations for computer specifications
to run PySpark (Apache Spark).

Any help would be greatly appreciated.

Thank you,

Mike


Dark mode logo

2024-03-06 Thread Mike Drob
Hi Spark Community,

I see that y'all have a logo uploaded to https://www.apache.org/logos/#spark 
but it has black text. Is there an official, alternate logo with lighter text 
that would look good on a dark background?

Thanks,
Mike

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Parallelising JDBC reads in spark

2020-05-24 Thread Mike Artz
Does anything different happened when you set the isolationLevel to do
Dirty Reads i.e. "READ_UNCOMMITTED"

On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H 
wrote:

> Hi,
>
> We are writing a ETL pipeline using Spark, that fetch the data from SQL
> server in batch mode (every 15mins). Problem we are facing when we try to
> parallelising single table reads into multiple tasks without missing any
> data.
>
> We have tried this,
>
>
>- Use `ROW_NUMBER` window function in the SQL query
>- Then do
>-
>
>DataFrame df =
>hiveContext
>.read()
>.jdbc(
>**,
>query,
>"row_num",
>1,
>,
>noOfPartitions,
>jdbcOptions);
>
>
>
> The problem with this approach is if our tables get updated in between in SQL 
> Server while tasks are still running then the `ROW_NUMBER` will change and we 
> may miss some records.
>
>
> Any approach to how to fix this issue ? . Any pointers will be helpful
>
>
> *Note*: I am on spark 1.6
>
>
> Thanks
>
> Manjiunath Shetty
>
>


Re: unsubscribe

2019-11-26 Thread Mike Dillion
Nandan,
Please send unsubscribe requests to user-unsubscr...@spark.apache.org

On Tue, Nov 26, 2019 at 6:02 AM @Nandan@ 
wrote:

> unsubscribe
>


spark.sql.hive.exec.dynamic.partition description

2019-04-29 Thread Mike Chan
Hi Guys,

Does any one have detailed descriptions for hive parameters in spark? like
spark.sql.hive.exec.dynamic.partition I couldn't find any reference in my
spark 2.3.2 configuration.

I'm looking into a problem that Spark cannot understand Hive partition at
all. In my Hive table it is partitioned by 1,000; however when I read the
same table in spark in RDD it becomes 105 if I query as
df.rdd.getNumPartitions()

Because create 1 task per partition when read, the reading is painfully
slow as 1 task reading many Hive folders in sequential order. My target is
spin up more tasks that increase parallelism during read operations. Hope
this makes sense.

Thank you

Best Regards,
Mike


Fwd: autoBroadcastJoinThreshold not working as expected

2019-04-24 Thread Mike Chan
Dear all,

I'm on a case that when certain table being exposed to broadcast join, the
query will eventually failed with remote block error.

Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely
10485760
[image: image.png]

Then we proceed to perform query. In the SQL plan, we found that one table
that is 25MB in size is broadcast as well.

[image: image.png]

Also in desc extended the table is 24452111 bytes. It is a Hive table. We
always ran into error when this table being broadcast. Below is the sample
error

Caused by: java.io.IOException: org.apache.spark.SparkException:
corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625
!= -992055931
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)


Also attached the physical plan if you're interested. One thing to
note that, if I turn down autoBroadcastJoinThreshold

to 5MB, this query will get successfully executed and default.product
NOT broadcasted.


However, when I change to another query that querying even less
columns than pervious one, even in 5MB this table still get
broadcasted and failed with the same error. I even changed to 1MB and
still the same.


Appreciate if you can share any input. Thank you very much.


Best Regards,

MIke
== Physical Plan ==
*(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 
AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 
END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, 
vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, 
bu_name#273, principle_supplier_code#154 AS supplier_code#476, 
mother_company_name#150 AS supplier_name#477, brand_type_name#117, 
brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, 
coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, 
h1_l1_hierarchy_name#126 AS Category_Name#480, 
coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, 
coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 
more fields]
+- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight
   :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 41 more fields]
   :  +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], 
[fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight
   : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 35 more fields]
   : :  +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], 
LeftOuter, BuildRight
   : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, 
adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 33 more fields]
   : : :  +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], 
[cast(store_key#155 as double)], LeftOuter, BuildRight
   : : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt

Fwd: autoBroadcastJoinThreshold not working as expected

2019-04-23 Thread Mike Chan
Dear all,

I'm on a case that when certain table being exposed to broadcast join, the
query will eventually failed with remote block error.

Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely
10485760
[image: image.png]

Then we proceed to perform query. In the SQL plan, we found that one table
that is 25MB in size is broadcast as well.

[image: image.png]

Also in desc extended the table is 24452111 bytes. It is a Hive table. We
always ran into error when this table being broadcast. Below is the sample
error

Caused by: java.io.IOException: org.apache.spark.SparkException:
corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625
!= -992055931
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)


Also attached the physical plan if you're interested. One thing to
note that, if I turn down autoBroadcastJoinThreshold

to 5MB, this query will get successfully executed and default.product
NOT broadcasted.


However, when I change to another query that querying even less
columns than pervious one, even in 5MB this table still get
broadcasted and failed with the same error. I even changed to 1MB and
still the same.


Appreciate if you can share any input. Thank you very much.


Best Regards,

MIke
== Physical Plan ==
*(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 
AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 
END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, 
vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, 
bu_name#273, principle_supplier_code#154 AS supplier_code#476, 
mother_company_name#150 AS supplier_name#477, brand_type_name#117, 
brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, 
coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, 
h1_l1_hierarchy_name#126 AS Category_Name#480, 
coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, 
coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 
more fields]
+- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight
   :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 41 more fields]
   :  +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], 
[fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight
   : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 35 more fields]
   : :  +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], 
LeftOuter, BuildRight
   : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, 
adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 33 more fields]
   : : :  +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], 
[cast(store_key#155 as double)], LeftOuter, BuildRight
   : : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt

autoBroadcastJoinThreshold not working as expected

2019-04-18 Thread Mike Chan
Dear all,

I'm on a case that when certain table being exposed to broadcast join, the
query will eventually failed with remote block error.

Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely
10485760
[image: image.png]

Then we proceed to perform query. In the SQL plan, we found that one table
that is 25MB in size is broadcast as well.

[image: image.png]

Also in desc extended the table is 24452111 bytes. It is a Hive table. We
always ran into error when this table being broadcast. Below is the sample
error

Caused by: java.io.IOException: org.apache.spark.SparkException:
corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625
!= -992055931
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)


Also attached the physical plan if you're interested. One thing to
note that, if I turn down autoBroadcastJoinThreshold

to 5MB, this query will get successfully executed and default.product
NOT broadcasted.


However, when I change to another query that querying even less
columns than pervious one, even in 5MB this table still get
broadcasted and failed with the same error. I even changed to 1MB and
still the same.


Appreciate if you can share any input. Thank you very much.


Best Regards,

MIke
== Physical Plan ==
*(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 
AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 
END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, 
vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, 
bu_name#273, principle_supplier_code#154 AS supplier_code#476, 
mother_company_name#150 AS supplier_name#477, brand_type_name#117, 
brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, 
coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, 
h1_l1_hierarchy_name#126 AS Category_Name#480, 
coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, 
coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 
more fields]
+- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight
   :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 41 more fields]
   :  +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], 
[fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight
   : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 35 more fields]
   : :  +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], 
LeftOuter, BuildRight
   : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, 
adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 33 more fields]
   : : :  +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], 
[cast(store_key#155 as double)], LeftOuter, BuildRight
   : : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt

Question about Spark, Inner Join and Delegation to a Parquet Table

2018-07-02 Thread Mike Buck
I have a question about Spark and how it delegates filters to a Parquet-based 
table. I have two tables in Hive in Parquet format. Table1 has with four 
columns of type double and table2 has two columns of type double. I am doing an 
INNER JOIN of the following:

SELECT table1.name FROM table1 INNER JOIN table2 ON table2.x BETWEEN 
table1.xmin AND table1.xmax AND table2.y BETWEEN table1.ymin AND table1.ymax

I noticed that the execution plan as reported by Spark is only delegating the 
IsNull filter to the tables and not any other filters:

== Physical Plan ==
*Project [name#0]
+- BroadcastNestedLoopJoin BuildRight, Inner, x#36 >= xmin#13) && (x#36 <= 
xmax#15)) && (y#37 >= ymin#14)) && (y#37 <= ymax#16))
   :- *Project [name#0, xmin#13, ymin#14, xmax#15, ymax#16]
   :  +- *Filter (((isnotnull(xmin#13) && isnotnull(ymin#14)) && 
isnotnull(ymax#16)) && isnotnull(xmax#15))
   : +- *FileScan parquet [name#0,xmin#13,ymin#14,xmax#15,ymax#16] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table1, 
PartitionFilters: [], PushedFilters: [IsNotNull(xmin), IsNotNull(ymin), 
IsNotNull(ymax), IsNotNull(xmax)], ReadSchema: 
struct
   +- BroadcastExchange IdentityBroadcastMode
  +- *Project [x#36, y#37]
 +- *Filter (isnotnull(y#37) && isnotnull(x#36))
+- *FileScan parquet [x#36,y#37] Batched: true, Format: Parquet, 
Location: 
InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table2], 
PartitionFilters: [], PushedFilters: [IsNotNull(y), IsNotNull(x)], ReadSchema: 
struct

However, when doing a filter against a single table the filter is delegated to 
the table:

SELECT name FROM table1 where table1.xmin > -73.4454183678

== Physical Plan ==
CollectLimit 21
+- *Project [pbkey#150]
   +- *Filter (isnotnull(xmin#163) && (xmin#163 > -73.4454183678))
  +- *FileScan parquet [pbkey#150,xmin#163] Batched: true, Format: Parquet, 
Location: 
InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table1, 
PartitionFilters: [], PushedFilters: [IsNotNull(xmin), 
GreaterThan(xmin,-73.4454183678)], ReadSchema: struct

So the question is: does Spark delegate filters in a join condition to a 
Parquet table or is this an error in the "explain plan" output?

Thanks.



Spark SQL within a DStream map function

2017-06-16 Thread Mike Hugo
Hello,

I have a web application that publishes JSON messages on to a messaging
queue that contain metadata and a link to a CSV document on S3.  I'd like
to iterate over these JSON messages, and for each one pull the CSV document
into spark SQL to transform it (based on the metadata in the JSON message)
and output the results to a search index.  Each file on S3 has different
headers, potentially different delimiters, and differing numbers of rows.

Basically what I'm trying to do is something like this:

JavaDStream parsedMetadataAndRows =
queueStream.map(new Function() {
@Override
ParsedDocument call(String metadata) throws Exception {
Map gson = new Gson().fromJson(metadata, Map.class)

// get metadata from gson
String s3Url = gson.url
String delimiter = gson.delimiter
// etc...

// read s3Url
Dataset dataFrame = sqlContext.read()
.format("com.databricks.spark.csv")
.option("delimiter", delimiter)
.option("header", true)
.option("inferSchema", true)
.load(url)

// process document,
ParsedDocument docPlusRows = //...

return docPlusRows
})

JavaEsSparkStreaming.saveToEs(parsedMetadataAndRows,
"index/type" // ...


But it appears I cannot get access to the sqlContext when I run this on the
spark cluster because that code is executing in the executor not in the
driver.

Is there a way I can access or create a SqlContext to be able to pull the
file down from S3 in my map function?  Or do you have any recommendations
as to how I could set up a streaming job in a different way that would
allow me to accept metadata on the stream of records coming in and pull
each file down from s3 for processing?

Thanks in advance for your help!

Mike


Parquet Read Speed: Spark SQL vs Parquet MR

2017-06-03 Thread Mike Wheeler
Hi Spark User,

I have run into some situation where Spark SQL is much slower than Parquet
MR for processing parquet files. Can you provide some guidance on
optimization?

Suppose I have a table "person" with columns: gender, age, name, address,
etc, which is stored in parquet files. I tried two ways to read the table
in spark:

1) define case class Person(gender: String, age: Int, etc), then I use
Spark SQL Dataset API:
val ds = spark.read.parquet("...").as[Person]

2) define avrò record "record Person {string gender, int age, etc}". Then
use parquet-avro
<https://github.com/apache/parquet-mr/tree/master/parquet-avro> and
newapiHadoopFile:

val rdd = sc.newAPIHadoopFile(
path,
classOf[ParquetInputFormat[avro.Person]],
classOf[Void],
classOf[avro.Person],
job.getConfiguration).values


Then I compare 3 actions (spark 2.1.1, parquet-avro 1.8.1):

a) ds.filter(" gender='female' and age > 50").count  // 1 min

b) ds.filter(" gender='female'").filter(_.age > 50).count// 15 min

c) rdd.filter(r => r.gender == "female" && r.age > 50).count // 7 min

I can understand a) is faster than c) because a) is limited to sql
query so Spark can do a lot things to optimize (such as not fully
deserialize the objects). But I don't understand b) is much slower
than c) because I assume both requires full deserialization.

Is there anything I can try to improve b)?

Thanks,


Mike


Re: Convert camelCase to snake_case when saving Dataframe/Dataset to parquet?

2017-05-22 Thread Mike Wheeler
Cool. Thanks a lot in advance.

On Mon, May 22, 2017 at 2:12 PM, Bryan Jeffrey 
wrote:

> Mike,
>
> I have code to do that. I'll share it tomorrow.
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
>
>
>
> On Mon, May 22, 2017 at 4:53 PM -0400, "Mike Wheeler" <
> rotationsymmetr...@gmail.com> wrote:
>
> Hi Spark User,
>>
>> For Scala case class, we usually use camelCase (carType) for member
>> fields. However, many data system use snake_case (car_type) for column
>> names. When saving a Dataset of case class to parquet, is there any way to
>> automatically convert camelCase to snake_case (carType -> car_type)?
>>
>> Thanks,
>>
>> Mike
>>
>>
>>


Convert camelCase to snake_case when saving Dataframe/Dataset to parquet?

2017-05-22 Thread Mike Wheeler
Hi Spark User,

For Scala case class, we usually use camelCase (carType) for member fields.
However, many data system use snake_case (car_type) for column names. When
saving a Dataset of case class to parquet, is there any way to
automatically convert camelCase to snake_case (carType -> car_type)?

Thanks,

Mike


Best Practice for Enum in Spark SQL

2017-05-11 Thread Mike Wheeler
Hi Spark Users,

I want to store Enum type (such as Vehicle Type: Car, SUV, Wagon)  in my
data. My storage format will be parquet and I need to access the data from
Spark-shell, Spark SQL CLI, and hive. My questions:

1) Should I store my Enum type as String or store it as numeric encoding
(aka 1=Car, 2=SUV, 3=Wagon)?

2) If I choose String, any penalty in hard drive space or memory?

Thank you!

Mike


Re: Schema Evolution for nested Dataset[T]

2017-05-02 Thread Mike Wheeler
Hi Michael,

Thank you for the suggestions. I am wondering how I can make `withColumn`
to handle nested structure?

For example, below is my code to generate the data. I basically add the
`age` field to `Person2`, which is nested in an Array for Course2. Then I
want to fill in 0 for age with age is null.

case class Person1(name: String)
case class Person2(name: String, age: Int)
case class Course1(id: Int, students: Array[Person1])
case class Course2(id: Int, students: Array[Person2])
Seq(Course1(10, Array(Person1("a"),
Person1("b".toDF.write.parquet("data1")
Seq(Course2(20, Array(Person2("c",20),
Person2("d",10.toDF.write.parquet("data2")
val allData = spark.read.option("mergeSchema", "true").parquet("data1",
"data2")
allData.show

+---++
| id|students|
+---++
| 20|[[c,20], [d,10]]|
| 10|[[a,null], [b,null]]|
+---++



*My first try:*

allData.withColumn("students.age", coalesce($"students.age", lit(0)))

It returns the exception:

org.apache.spark.sql.AnalysisException: cannot resolve
'coalesce(`students`.`age`, 0)' due to data type mismatch: input to
function coalesce should all be the same type, but it's [array, int];;



*My second try: *

allData.withColumn("students.age", coalesce($"students.age", array(lit(0),
lit(0.show


+---+++
| id|students|students.age|
+---+++
| 20|[[c,20], [d,10]]|[20, 10]|
| 10|[[a,null], [b,null]]|[null, null]|
+---+++

It creates a new column "students.age" instead of imputing the value age
nested in students.

Thank you very much in advance.

Mike




On Mon, May 1, 2017 at 10:31 AM, Michael Armbrust 
wrote:

> Oh, and if you want a default other than null:
>
> import org.apache.spark.sql.functions._
> df.withColumn("address", coalesce($"address", lit())
>
> On Mon, May 1, 2017 at 10:29 AM, Michael Armbrust 
> wrote:
>
>> The following should work:
>>
>> val schema = implicitly[org.apache.spark.sql.Encoder[Course]].schema
>> spark.read.schema(schema).parquet("data.parquet").as[Course]
>>
>> Note this will only work for nullable files (i.e. if you add a primitive
>> like Int you need to make it an Option[Int])
>>
>> On Sun, Apr 30, 2017 at 9:12 PM, Mike Wheeler <
>> rotationsymmetr...@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>>
>>> Suppose I have some data (stored in parquet for example) generated as
>>> below:
>>>
>>> package com.company.entity.old
>>> case class Course(id: Int, students: List[Student])
>>> case class Student(name: String)
>>>
>>> Then usually I can access the data by
>>>
>>> spark.read.parquet("data.parquet").as[Course]
>>>
>>> Now I want to add a new field `address` to Student:
>>>
>>> package com.company.entity.new
>>> case class Course(id: Int, students: List[Student])
>>> case class Student(name: String, address: String)
>>>
>>> Then obviously running `spark.read.parquet("data.parquet").as[Course]`
>>> on data generated by the old entity/schema will fail because `address`
>>> is missing.
>>>
>>> In this case, what is the best practice to read data generated with
>>> the old entity/schema to the new entity/schema, with the missing field
>>> set to some default value? I know I can manually write a function to
>>> do the transformation from the old to the new. But it is kind of
>>> tedious. Any automatic methods?
>>>
>>> Thanks,
>>>
>>> Mike
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Schema Evolution for nested Dataset[T]

2017-04-30 Thread Mike Wheeler
Hi Spark Users,

Suppose I have some data (stored in parquet for example) generated as below:

package com.company.entity.old
case class Course(id: Int, students: List[Student])
case class Student(name: String)

Then usually I can access the data by

spark.read.parquet("data.parquet").as[Course]

Now I want to add a new field `address` to Student:

package com.company.entity.new
case class Course(id: Int, students: List[Student])
case class Student(name: String, address: String)

Then obviously running `spark.read.parquet("data.parquet").as[Course]`
on data generated by the old entity/schema will fail because `address`
is missing.

In this case, what is the best practice to read data generated with
the old entity/schema to the new entity/schema, with the missing field
set to some default value? I know I can manually write a function to
do the transformation from the old to the new. But it is kind of
tedious. Any automatic methods?

Thanks,

Mike

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Combining reading from Kafka and HDFS w/ Spark Streaming

2017-03-01 Thread Mike Thomsen
(Sorry if this is a duplicate. I got a strange error message when I first
tried to send it earlier)

I want to pull HDFS paths from Kafka and build text streams based on those
paths. I currently have:

val lines = KafkaUtils.createStream(/* params here */).map(_._2)
val buffer  = new ArrayBuffer[String]()

lines.foreachRDD(rdd => {
  if (!rdd.partitions.isEmpty) {
rdd.collect().foreach(line => { buffer += line })
  }
})

buffer.foreach(path => {
  streamingContext.textFileStream(path).foreachRDD(rdd => {
println(s"${path} => ${rdd.count()}")
  })
})

streamingContext.start
streamingContext.awaitTermination

It's not actually counting any of the files in the paths, and I know the
paths are valid.

Can someone tell me if this is possible and if so, give me a pointer on how
to fix this?

Thanks,

Mike


Combining reading from Kafka and HDFS w/ Spark Streaming

2017-03-01 Thread Mike Thomsen
I want to pull HDFS paths from Kafka and build text streams based on those
paths. I currently have:

val lines = KafkaUtils.createStream(/* params here */).map(_._2)
val buffer  = new ArrayBuffer[String]()

lines.foreachRDD(rdd => {
  if (!rdd.partitions.isEmpty) {
rdd.collect().foreach(line => { buffer += line })
  }
})

buffer.foreach(path => {
  streamingContext.textFileStream(path).foreachRDD(rdd => {
println(s"${path} => ${rdd.count()}")
  })
})

streamingContext.start
streamingContext.awaitTermination

It's not actually counting any of the files in the paths, and I know the
paths are valid.

Can someone tell me if this is possible and if so, give me a pointer on how
to fix this?

Thanks,

Mike


RE: Best way to process lookup ETL with Dataframes

2017-01-04 Thread Sesterhenn, Mike
Thanks a lot Nicholas.  RE: Upgrading, I was afraid someone would suggest that. 
 ☺  Yes we have an upgrade planned, but due to politics, we have to finish this 
first round of ETL before we can do the upgrade.  I can’t confirm for sure that 
this issue would be fixed in Spark >= 1.6 without doing the upgrade first, so I 
won’t be able to win the argument for upgrading yet…  You see the problem…   :(

Anyway, the good news is we just had a memory upgrade, so I should be able to 
do more persisting of the dataframes.  I am currently only persisting the join 
table (the table I am joining to, not the input data).  Although I do cache the 
input at some point before the join, it is not every time I do a split+merge.  
I’ll have to persist the input data better.

Thinking on it now, is it even necessary to cache the table I am joining to?  
Probably only if it is used more than once, right?

Thanks,
-Mike


From: Nicholas Hakobian [mailto:nicholas.hakob...@rallyhealth.com]
Sent: Friday, December 30, 2016 5:50 PM
To: Sesterhenn, Mike
Cc: ayan guha; user@spark.apache.org
Subject: Re: Best way to process lookup ETL with Dataframes

Yep, sequential joins is what I have done in the past with similar requirements.

Splitting and merging DataFrames is most likely killing performance if you do 
not cache the DataFrame pre-split. If you do, it will compute the lineage prior 
to the cache statement once (at first invocation), then use the cached result 
to perform the additional join, then union the results. Without the cache, you 
are most likely computing the full lineage twice, all the way back to the raw 
data import and having double the read I/O.

The most optimal path will most likely depend on the size of the tables you are 
joining to. If both are small (compared to the primary data source) and can be 
broadcasted, doing the sequential join will most likely be the easiest and most 
efficient approach. If one (or both) of the tables you are joining to are 
significantly large enough that they cannot be efficiently broadcasted, going 
through the join / cache / split / second join / union path is likely to be 
faster. It also depends on how much memory you can dedicate to caching...the 
possibilities are endless.

I tend to approach this type of problem by weighing the cost of extra 
development time for a more complex join vs the extra execution time vs 
frequency of execution. For something that will execute daily (or more 
frequently) the cost of more development to have faster execution time (even if 
its only 2x faster) might be worth it.

It might also be worth investigating if a newer version of Spark (1.6 at the 
least, or 2.0 if possible) is feasible to install. There are lots of 
performance improvements in those versions, if you have the option of upgrading.

-Nick

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com<mailto:nicholas.hakob...@rallyhealth.com>


On Fri, Dec 30, 2016 at 3:35 PM, Sesterhenn, Mike 
mailto:msesterh...@cars.com>> wrote:

Thanks Nicholas.  It looks like for some of my use cases, I might be able to 
use do sequential joins, and then use coalesce() (or in combination with 
withColumn(when()...)) to sort out the results.



Splitting and merging dataframes seems to really kills my app performance.  I'm 
not sure if it's a spark 1.5 thing or what, but I just refactored one column to 
do one less split/merge, and it saved me almost half the time on my job.  But 
for some use cases I don't seem to be able to avoid them.  It is important in 
some cases to NOT do a join under certain conditions for a row because bad data 
will result.



Any other thoughts?


From: Nicholas Hakobian 
mailto:nicholas.hakob...@rallyhealth.com>>
Sent: Friday, December 30, 2016 2:12:40 PM
To: Sesterhenn, Mike
Cc: ayan guha; user@spark.apache.org<mailto:user@spark.apache.org>

Subject: Re: Best way to process lookup ETL with Dataframes

It looks like Spark 1.5 has the coalesce function, which is like NVL, but a bit 
more flexible. From Ayan's example you should be able to use:
coalesce(b.col, c.col, 'some default')

If that doesn't have the flexibility you want, you can always use nested case 
or if statements, but its just harder to read.

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com<mailto:nicholas.hakob...@rallyhealth.com>



On Fri, Dec 30, 2016 at 7:46 AM, Sesterhenn, Mike 
mailto:msesterh...@cars.com>> wrote:

Thanks, but is nvl() in Spark 1.5?  I can't find it in spark.sql.functions 
(http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.functions$)



Reading about the Oracle nvl function, it seems it is similar to the na 
functions.  Not sure it will help though, because what I need is to join after 
the first join fails.


Fr

Re: Best way to process lookup ETL with Dataframes

2016-12-30 Thread Sesterhenn, Mike
Thanks Nicholas.  It looks like for some of my use cases, I might be able to 
use do sequential joins, and then use coalesce() (or in combination with 
withColumn(when()...)) to sort out the results.


Splitting and merging dataframes seems to really kills my app performance.  I'm 
not sure if it's a spark 1.5 thing or what, but I just refactored one column to 
do one less split/merge, and it saved me almost half the time on my job.  But 
for some use cases I don't seem to be able to avoid them.  It is important in 
some cases to NOT do a join under certain conditions for a row because bad data 
will result.


Any other thoughts?


From: Nicholas Hakobian 
Sent: Friday, December 30, 2016 2:12:40 PM
To: Sesterhenn, Mike
Cc: ayan guha; user@spark.apache.org
Subject: Re: Best way to process lookup ETL with Dataframes

It looks like Spark 1.5 has the coalesce function, which is like NVL, but a bit 
more flexible. From Ayan's example you should be able to use:
coalesce(b.col, c.col, 'some default')

If that doesn't have the flexibility you want, you can always use nested case 
or if statements, but its just harder to read.

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com<mailto:nicholas.hakob...@rallyhealth.com>



On Fri, Dec 30, 2016 at 7:46 AM, Sesterhenn, Mike 
mailto:msesterh...@cars.com>> wrote:

Thanks, but is nvl() in Spark 1.5?  I can't find it in spark.sql.functions 
(http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.functions$)


Reading about the Oracle nvl function, it seems it is similar to the na 
functions.  Not sure it will help though, because what I need is to join after 
the first join fails.


From: ayan guha mailto:guha.a...@gmail.com>>
Sent: Thursday, December 29, 2016 11:06 PM
To: Sesterhenn, Mike
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Best way to process lookup ETL with Dataframes

How about this -

select a.*, nvl(b.col,nvl(c.col,'some default'))
from driving_table a
left outer join lookup1 b on a.id<http://a.id>=b.id<http://b.id>
left outer join lookup2 c on a.id<http://a.id>=c,id

?

On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike 
mailto:msesterh...@cars.com>> wrote:

Hi all,


I'm writing an ETL process with Spark 1.5, and I was wondering the best way to 
do something.


A lot of the fields I am processing require an algorithm similar to this:


Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

Lookup into some other table to join some other fields.

}


With Dataframes, it seems the only way to do this is to do something like this:


Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

   *SPLIT the dataframe into two DFs via DataFrame.filter(),

  one group with successful lookup, the other failed).*

   For failed lookup:  {

   Lookup into some other table to grab some other fields.

   }

   *MERGE the dataframe splits back together via DataFrame.unionAll().*

}


I'm seeing some really large execution plans as you might imagine in the Spark 
Ui, and the processing time seems way out of proportion with the size of the 
dataset.  (~250GB in 9 hours).


Is this the best approach to implement an algorithm like this?  Note also that 
some fields I am implementing require multiple staged split/merge steps due to 
cascading lookup joins.


Thanks,


Michael Sesterhenn

msesterh...@cars.com<mailto:msesterh...@cars.com>




--
Best Regards,
Ayan Guha



Re: Best way to process lookup ETL with Dataframes

2016-12-30 Thread Sesterhenn, Mike
Thanks, but is nvl() in Spark 1.5?  I can't find it in spark.sql.functions 
(http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.functions$)


Reading about the Oracle nvl function, it seems it is similar to the na 
functions.  Not sure it will help though, because what I need is to join after 
the first join fails.


From: ayan guha 
Sent: Thursday, December 29, 2016 11:06 PM
To: Sesterhenn, Mike
Cc: user@spark.apache.org
Subject: Re: Best way to process lookup ETL with Dataframes

How about this -

select a.*, nvl(b.col,nvl(c.col,'some default'))
from driving_table a
left outer join lookup1 b on a.id<http://a.id>=b.id<http://b.id>
left outer join lookup2 c on a.id<http://a.id>=c,id

?

On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike 
mailto:msesterh...@cars.com>> wrote:

Hi all,


I'm writing an ETL process with Spark 1.5, and I was wondering the best way to 
do something.


A lot of the fields I am processing require an algorithm similar to this:


Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

Lookup into some other table to join some other fields.

}


With Dataframes, it seems the only way to do this is to do something like this:


Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

   *SPLIT the dataframe into two DFs via DataFrame.filter(),

  one group with successful lookup, the other failed).*

   For failed lookup:  {

   Lookup into some other table to grab some other fields.

   }

   *MERGE the dataframe splits back together via DataFrame.unionAll().*

}


I'm seeing some really large execution plans as you might imagine in the Spark 
Ui, and the processing time seems way out of proportion with the size of the 
dataset.  (~250GB in 9 hours).


Is this the best approach to implement an algorithm like this?  Note also that 
some fields I am implementing require multiple staged split/merge steps due to 
cascading lookup joins.


Thanks,


Michael Sesterhenn

msesterh...@cars.com<mailto:msesterh...@cars.com>




--
Best Regards,
Ayan Guha


Best way to process lookup ETL with Dataframes

2016-12-29 Thread Sesterhenn, Mike
Hi all,


I'm writing an ETL process with Spark 1.5, and I was wondering the best way to 
do something.


A lot of the fields I am processing require an algorithm similar to this:


Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

Lookup into some other table to join some other fields.

}


With Dataframes, it seems the only way to do this is to do something like this:


Join input dataframe to a lookup table.

if (that lookup fails (the joined fields are null)) {

   *SPLIT the dataframe into two DFs via DataFrame.filter(),

  one group with successful lookup, the other failed).*

   For failed lookup:  {

   Lookup into some other table to grab some other fields.

   }

   *MERGE the dataframe splits back together via DataFrame.unionAll().*

}


I'm seeing some really large execution plans as you might imagine in the Spark 
Ui, and the processing time seems way out of proportion with the size of the 
dataset.  (~250GB in 9 hours).


Is this the best approach to implement an algorithm like this?  Note also that 
some fields I am implementing require multiple staged split/merge steps due to 
cascading lookup joins.


Thanks,


Michael Sesterhenn

msesterh...@cars.com



Re: Is spark a right tool for updating a dataframe repeatedly

2016-10-17 Thread Mike Metzger
I've not done this in Scala yet, but in PySpark I've run into a similar
issue where having too many dataframes cached does cause memory issues.
Unpersist by itself did not clear the memory usage, but rather setting the
variable equal to None allowed all the references to be cleared and the
memory issues went away.

I do not full understand Scala yet, but you may be able to set one of your
dataframes to null to accomplish the same.

Mike


On Mon, Oct 17, 2016 at 8:38 PM, Mungeol Heo  wrote:

> First of all, Thank you for your comments.
> Actually, What I mean "update" is generate a new data frame with modified
> data.
> The more detailed while loop will be something like below.
>
> var continue = 1
> var dfA = "a data frame"
> dfA.persist
>
> while (continue > 0) {
>   val temp = "modified dfA"
>   temp.persist
>   temp.count
>   dfA.unpersist
>
>   dfA = "modified temp"
>   dfA.persist
>   dfA.count
>   temp.unperist
>
>   if ("dfA is not modifed") {
> continue = 0
>   }
> }
>
> The problem is it will cause OOM finally.
> And, the number of skipped stages will increase ever time, even though
> I am not sure whether this is the reason causing OOM.
> Maybe, I need to check the source code of one of the spark ML algorithms.
> Again, thank you all.
>
>
> On Mon, Oct 17, 2016 at 10:54 PM, Thakrar, Jayesh
>  wrote:
> > Yes, iterating over a dataframe and making changes is not uncommon.
> >
> > Ofcourse RDDs, dataframes and datasets are immultable, but there is some
> > optimization in the optimizer that can potentially help to dampen the
> > effect/impact of creating a new rdd, df or ds.
> >
> > Also, the use-case you cited is similar to what is done in regression,
> > clustering and other algorithms.
> >
> > I.e. you iterate making a change to a dataframe/dataset until the desired
> > condition.
> >
> > E.g. see this -
> > https://spark.apache.org/docs/1.6.1/ml-classification-
> regression.html#linear-regression
> > and the setting of the iteration ceiling
> >
> >
> >
> > // instantiate the base classifier
> >
> > val classifier = new LogisticRegression()
> >
> >   .setMaxIter(params.maxIter)
> >
> >   .setTol(params.tol)
> >
> >   .setFitIntercept(params.fitIntercept)
> >
> >
> >
> > Now the impact of that depends on a variety of things.
> >
> > E.g. if the data is completely contained in memory and there is no spill
> > over to disk, it might not be a big issue (ofcourse there will still be
> > memory, CPU and network overhead/latency).
> >
> > If you are looking at storing the data on disk (e.g. as part of a
> checkpoint
> > or explicit storage), then there can be substantial I/O activity.
> >
> >
> >
> >
> >
> >
> >
> > From: Xi Shen 
> > Date: Monday, October 17, 2016 at 2:54 AM
> > To: Divya Gehlot , Mungeol Heo
> > 
> > Cc: "user @spark" 
> > Subject: Re: Is spark a right tool for updating a dataframe repeatedly
> >
> >
> >
> > I think most of the "big data" tools, like Spark and Hive, are not
> designed
> > to edit data. They are only designed to query data. I wonder in what
> > scenario you need to update large volume of data repetitively.
> >
> >
> >
> >
> >
> > On Mon, Oct 17, 2016 at 2:00 PM Divya Gehlot 
> > wrote:
> >
> > If  my understanding is correct about your query
> >
> > In spark Dataframes are immutable , cant update the dataframe.
> >
> > you have to create a new dataframe to update the current dataframe .
> >
> >
> >
> >
> >
> > Thanks,
> >
> > Divya
> >
> >
> >
> >
> >
> > On 17 October 2016 at 09:50, Mungeol Heo  wrote:
> >
> > Hello, everyone.
> >
> > As I mentioned at the tile, I wonder that is spark a right tool for
> > updating a data frame repeatedly until there is no more date to
> > update.
> >
> > For example.
> >
> > while (if there was a updating) {
> > update a data frame A
> > }
> >
> > If it is the right tool, then what is the best practice for this kind of
> > work?
> > Thank you.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> >
> > --
> >
> >
> > Thanks,
> > David S.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Time-unit of RDD.countApprox timeout parameter

2016-10-04 Thread Sesterhenn, Mike
It only exists in the latest docs, not in versions <= 1.6.


From: Sean Owen 
Sent: Tuesday, October 4, 2016 1:51:49 PM
To: Sesterhenn, Mike; user@spark.apache.org
Subject: Re: Time-unit of RDD.countApprox timeout parameter

The API docs already say: "maximum time to wait for the job, in milliseconds"

On Tue, Oct 4, 2016 at 7:14 PM Sesterhenn, Mike 
mailto:msesterh...@cars.com>> wrote:

Nevermind. Through testing it seems it is MILLISECONDS.  This should be added 
to the docs.


From: Sesterhenn, Mike mailto:msesterh...@cars.com>>
Sent: Tuesday, October 4, 2016 1:02:25 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Time-unit of RDD.countApprox timeout parameter


Hi all,


Does anyone know what the unit is on the 'timeout' parameter to the 
RDD.countApprox() function?
(ie. is that seconds, milliseconds, nanoseconds, ...?)


I was searching through the source but it got hairy pretty quickly.


Thanks




Re: Time-unit of RDD.countApprox timeout parameter

2016-10-04 Thread Sesterhenn, Mike
Nevermind. Through testing it seems it is MILLISECONDS.  This should be added 
to the docs.


From: Sesterhenn, Mike 
Sent: Tuesday, October 4, 2016 1:02:25 PM
To: user@spark.apache.org
Subject: Time-unit of RDD.countApprox timeout parameter


Hi all,


Does anyone know what the unit is on the 'timeout' parameter to the 
RDD.countApprox() function?
(ie. is that seconds, milliseconds, nanoseconds, ...?)


I was searching through the source but it got hairy pretty quickly.


Thanks




Time-unit of RDD.countApprox timeout parameter

2016-10-04 Thread Sesterhenn, Mike
Hi all,


Does anyone know what the unit is on the 'timeout' parameter to the 
RDD.countApprox() function?
(ie. is that seconds, milliseconds, nanoseconds, ...?)


I was searching through the source but it got hairy pretty quickly.


Thanks




Re: Issue with rogue data in csv file used in Spark application

2016-09-27 Thread Mike Metzger
Hi Mich -

   Can you run a filter command on df1 prior to your map for any rows where
p(3).toString != '-' then run your map command?

Thanks

Mike

On Tue, Sep 27, 2016 at 5:06 PM, Mich Talebzadeh 
wrote:

> Thanks guys
>
> Actually these are the 7 rogue rows. The column 0 is the Volume column
> which means there was no trades on those days
>
>
> *cat stock.csv|grep ",0"*SAP SE,SAP, 23-Dec-11,-,-,-,40.56,0
> SAP SE,SAP, 21-Apr-11,-,-,-,45.85,0
> SAP SE,SAP, 30-Dec-10,-,-,-,38.10,0
> SAP SE,SAP, 23-Dec-10,-,-,-,38.36,0
> SAP SE,SAP, 30-Apr-08,-,-,-,32.39,0
> SAP SE,SAP, 29-Apr-08,-,-,-,33.05,0
> SAP SE,SAP, 28-Apr-08,-,-,-,32.60,0
>
> So one way would be to exclude the rows that there was no volume of trade
> that day when cleaning up the csv file
>
> *cat stock.csv|grep -v **",0"*
>
> and that works. Bearing in mind that putting 0s in place of "-" will skew
> the price plot.
>
> BTW I am using Spark csv as well
>
> val df1 = spark.read.option("header", true).csv(location)
>
> This is the class and the mapping
>
>
> case class columns(Stock: String, Ticker: String, TradeDate: String, Open:
> Float, High: Float, Low: Float, Close: Float, Volume: Integer)
> val df2 = df1.map(p => columns(p(0).toString, p(1).toString,
> p(2).toString, p(3).toString.toFloat, p(4).toString.toFloat,
> p(5).toString.toFloat, p(6).toString.toFloat, p(7).toString.toInt))
>
>
> In here I have
>
> p(3).toString.toFloat
>
> How can one check for rogue data in p(3)?
>
>
> Thanks
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 27 September 2016 at 21:49, Mich Talebzadeh 
> wrote:
>
>>
>> I have historical prices for various stocks.
>>
>> Each csv file has 10 years trade one row per each day.
>>
>> These are the columns defined in the class
>>
>> case class columns(Stock: String, Ticker: String, TradeDate: String,
>> Open: Float, High: Float, Low: Float, Close: Float, Volume: Integer)
>>
>> The issue is with Open, High, Low, Close columns that all are defined as
>> Float.
>>
>> Most rows are OK like below but the red one with "-" defined as Float
>> causes issues
>>
>>   Date Open High  Low   Close Volume
>> 27-Sep-16 80.91 80.93 79.87 80.85 1873158
>> 23-Dec-11   - --40.56 0
>>
>> Because the prices are defined as Float, these rows cause the application
>> to crash
>> scala> val rs = df2.filter(changeToDate("TradeDate") >=
>> monthsago).select((changeToDate("TradeDate").as("TradeDate")
>> ),(('Close+'Open)/2).as("AverageDailyPrice"), 'Low.as("Day's Low"),
>> 'High.as("Day's High")).orderBy("TradeDate").collect
>> 16/09/27 21:48:53 ERROR Executor: Exception in task 0.0 in stage 61.0
>> (TID 260)
>> java.lang.NumberFormatException: For input string: "-"
>>
>>
>> One way is to define the prices as Strings but that is not
>> meaningful. Alternatively do the clean up before putting csv in HDFS but
>> that becomes tedious and error prone.
>>
>> Any ideas will be appreciated.
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>


Re: Total Shuffle Read and Write Size of Spark workload

2016-09-19 Thread Mike Metzger
While the SparkListener method is likely all around better, if you just
need this quickly you should be able to do a SSH local port redirection
over putty.  In the putty configuration:

- Go to Connection: SSH: Tunnels
- In the Source port field, enter 4040 (or another unused port on your
machine)
- In the Destination field, enter ipaddress:4040 where ipaddress is the IP
you'd normally access of the spark server.  If it's the same server you're
SSH'ing to it can be 127.0.0.1
- Make sure the "Local" and "Auto" radio buttons are checked and click "Add"
- Go back to the Session section and enter the IP / etc configuration
- If you're going to use this often, enter a name and save the
configuration.  Otherwise click open and login as normal.

Once the session is established, you should be able to open a web browser
to http://localhost:4040 which will redirect over the SSH session to the
remote server.  Note that any link that references a non-accessible IP
address can't be reached (though you can also setup putty / SSH as a proxy
to get around that if needed).

Thanks

Mike


On Mon, Sep 19, 2016 at 4:43 AM, Cristina Rozee 
wrote:

> I Mich,
>
> I do not have access to UI as I am running jobs on remote system and I can
> access it using putty only so only console or logs files are available to
> me.
>
> Thanks
>
> On Mon, Sep 19, 2016 at 11:36 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Spark UI on port 4040 by default
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 19 September 2016 at 10:34, Cristina Rozee 
>> wrote:
>>
>>> Could you please explain a little bit?
>>>
>>>
>>> On Sun, Sep 18, 2016 at 10:19 PM, Jacek Laskowski 
>>> wrote:
>>>
>>>> SparkListener perhaps?
>>>>
>>>> Jacek
>>>>
>>>> On 15 Sep 2016 1:41 p.m., "Cristina Rozee" 
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am running a spark application and I would like to know the total
>>>>> amount of shuffle data (read + write ) so could anyone let me know how to
>>>>> get this information?
>>>>>
>>>>> Thank you
>>>>> Cristina.
>>>>>
>>>>
>>>
>>
>


Re: year out of range

2016-09-08 Thread Mike Metzger
My guess is there's some row that does not match up with the expected
data.  While slower, I've found RDDs to be easier to troubleshoot this kind
of thing until you sort out exactly what's happening.

Something like:

raw_data = sc.textFile("")
rowcounts = raw_data.map(lambda x: (len(x.split(",")),
1)).reduceByKey(lambda x,y: x+y)
rowcounts.take(5)

badrows = raw_data.filter(lambda x: len(x.split(",")) != )
if badrows.count() > 0:
badrows.saveAsTextFile("")


You should be able to tell if there are any rows with column counts that
don't match up (the thing that usually bites me with CSV conversions).
Assuming these all match to what you want, I'd try mapping the unparsed
date column out to separate fields and try to see if a year field isn't
matching the expected values.

Thanks

Mike


On Thu, Sep 8, 2016 at 8:15 AM, Daniel Lopes  wrote:

> Thanks,
>
> I *tested* the function offline and works
> Tested too with select * from after convert the data and see the new data
> good
> *but* if I *register as temp table* to *join other table* stilll shows *the
> same error*.
>
> ValueError: year out of range
>
> Best,
>
> *Daniel Lopes*
> Chief Data and Analytics Officer | OneMatch
> c: +55 (18) 99764-2733 | https://www.linkedin.com/in/dslopes
>
> www.onematch.com.br
> <http://www.onematch.com.br/?utm_source=EmailSignature&utm_term=daniel-lopes>
>
> On Thu, Sep 8, 2016 at 9:43 AM, Marco Mistroni 
> wrote:
>
>> Daniel
>> Test the parse date offline to make sure it returns what you expect
>> If it does   in spark shell create a df with 1 row only and run ur UDF. U
>> should b able to see issue
>> If not send me a reduced CSV file at my email and I give it a try this
>> eve hopefully someone else will b able to assist in meantime
>> U don't need to run a full spark app to debug issue
>> Ur problem. Is either in the parse date or in what gets passed to the UDF
>> Hth
>>
>> On 8 Sep 2016 1:31 pm, "Daniel Lopes"  wrote:
>>
>>> Thanks Marco for your response.
>>>
>>> The field came encoded by SQL Server in locale pt_BR.
>>>
>>> The code that I am formating is:
>>>
>>> --
>>> def parse_date(argument, format_date='%Y-%m%d %H:%M:%S'):
>>> try:
>>> locale.setlocale(locale.LC_TIME, 'pt_BR.utf8')
>>> return datetime.strptime(argument, format_date)
>>> except:
>>> return None
>>>
>>> convert_date = funcspk.udf(lambda x: parse_date(x, '%b %d %Y %H:%M'),
>>> TimestampType())
>>>
>>> transacoes = transacoes.withColumn('tr_Vencimento',
>>> convert_date(transacoes.*tr_Vencimento*))
>>>
>>> --
>>>
>>> the sample is
>>>
>>> -
>>> +-++-+--
>>> --+--+---+-+
>>> -+--+--+
>>> +-+-+--+
>>> +++-
>>> ---+--+++--+
>>> -+--+
>>> |tr_NumeroContrato|tr_TipoDocumento|*tr_Vencimento*|tr_Valor|tr_Dat
>>> aRecebimento|tr_TaxaMora|tr_DescontoMaximo|tr_DescontoMaximo
>>> Corr|tr_ValorAtualizado|tr_ComGarantia|tr_ValorDesconto|tr_
>>> ValorJuros|tr_ValorMulta|tr_DataDevolucaoCheque|tr_ValorCorrigidoContratante|
>>>  tr_DataNotificacao|tr_Banco|tr_Praca|tr_DescricaoAlinea|tr_
>>> Enquadramento|tr_Linha|tr_Arquivo|tr_DataImportacao|tr_Agencia|
>>> +-++-+--
>>> --+--+---+-+
>>> -+--+--+
>>> +-+-+--+
>>> +++-
>>> ---+--+++--+
>>> -+--+
>>> | 992600153001||*Jul 20 2015 12:00*|  254.35|
>>>null|   null| null| null|
>>>null| 0|null| null| null|
>>>null|  254.35|2015-07-20 12:00:...|null|
>>>null|  null|

Re: Best ID Generator for ID field in parquet ?

2016-09-04 Thread Mike Metzger
Hi Kevin -

   There's not really a race condition as the 64 bit value is split into a
31 bit partition id (the upper portion) and a 33 bit incrementing id.  In
other words, as long as each partition contains fewer than 8 billion
entries there should be no overlap and there is not any communication
between executors to get the next id.

Depending on what you mean by duplication, there shouldn't be any within a
column as long as you maintain some sort of state (ie, the startval Mich
shows, a previous maxid, etc.)  While these ids are unique in that sense,
they are not the same as a uuid / guid which are generally unique across
all entries assuming enough randomness.  Think of the monotonically
increasing id as an auto-incrementing column (with potentially massive gaps
in ids) from a relational database.

Thanks

Mike


On Sun, Sep 4, 2016 at 6:41 PM, Kevin Tran  wrote:

> Hi Mich,
> Thank you for your input.
> Does monotonically incremental ensure about race condition and does it
> duplicates the ids at some points with multi threads, multi instances, ... ?
>
> Even System.currentTimeMillis() still has duplication?
>
> Cheers,
> Kevin.
>
> On Mon, Sep 5, 2016 at 12:30 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> You can create a monotonically incrementing ID column on your table
>>
>> scala> val ll_18740868 = spark.table("accounts.ll_18740868")
>> scala> val startval = 1
>> scala> val df = ll_18740868.withColumn("id",
>> *monotonically_increasing_id()+* startval).show (2)
>> +---+---+-+-+---
>> ---+---++---+---+
>> |transactiondate|transactiontype| sortcode|accountnumber|transac
>> tiondescription|debitamount|creditamount|balance| id|
>> +---+---+-+-+---
>> ---+---++---+---+
>> | 2011-12-30|DEB|'30-64-72| 18740868|  WWW.GFT.COM
>> CD 4628 |   50.0|null| 304.89|  1|
>> | 2011-12-30|DEB|'30-64-72| 18740868|
>> TDA.CONFECC.D.FRE...|  19.01|null| 354.89|  2|
>> +---+---+-+-+---
>> ---+---++---+---+
>>
>>
>> Now you have a new ID column
>>
>> HTH
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 4 September 2016 at 12:43, Kevin Tran  wrote:
>>
>>> Hi everyone,
>>> Please give me your opinions on what is the best ID Generator for ID
>>> field in parquet ?
>>>
>>> UUID.randomUUID();
>>> AtomicReference currentTime = new AtomicReference<>(System.curre
>>> ntTimeMillis());
>>> AtomicLong counter = new AtomicLong(0);
>>> 
>>>
>>> Thanks,
>>> Kevin.
>>>
>>>
>>> 
>>> https://issues.apache.org/jira/browse/SPARK-8406 (Race condition when
>>> writing Parquet files)
>>> https://github.com/apache/spark/pull/6864/files
>>>
>>
>>
>


Re: Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Mike Metzger
Pyspark example based on the data you provided (obviously your dataframes
will come from whatever source you have, not entered directly).  This uses
an intermediary dataframe with grouped data for clarity, but you could pull
this off in other ways.

-- Code --

from pyspark.sql.types import *
from pyspark.sql.functions import col

fcst_schema = StructType([
StructField('Product', StringType(), nullable=False),
StructField('fcst_qty', IntegerType(), nullable=False)
  ])

fcst_data = sc.parallelize([Row("A", 100),
 Row("B", 50)])

so_schema = StructType([
StructField('OrderNum', IntegerType(), nullable=False),
StructField('ItemNum', StringType(), nullable=False),
StructField('Sales_qty', IntegerType(), nullable=False)
  ])
so_data = sc.parallelize([
   Row(101, "A", 10),
   Row(101, "B", 5),
   Row(102, "A", 5),
   Row(102, "B", 10)])

fcst_df = sqlContext.createDataFrame(fcst_data, fcst_schema)
so_df = sqlContext.createDataFrame(so_data, so_schema)

fcst_df.show()
so_df.show()
orderTotals_df =
so_df.groupBy('ItemNum').sum('Sales_qty').select('ItemNum',col('sum(Sales_qty)').alias('Sales_qty'))
orderTotals_df.show()

fcst_df.join(orderTotals_df, fcst_df.Product == orderTotals_df.ItemNum,
'left_outer').select(fcst_df.Product, (fcst_df.fcst_qty -
orderTotals_df.Sales_qty).alias('fcst_qty')).show()


-- Output examples (fcst_df, so_df, orderTotals_df, and the resultant df) --

+---++ |Product|fcst_qty| +---++ | A| 100| | B| 50|
+---++ ++---+-+
|OrderNum|ItemNum|Sales_qty| ++---+-+ | 101| A| 10| |
101| B| 5| | 102| A| 5| | 102| B| 10| ++---+-+
+---+-+ |ItemNum|Sales_qty| +---+-+ | B| 15| | A|
15| +---+-+ +---++ |Product|fcst_qty|
+---++ | B| 35| | A| 85| +---++

The other languages should work similarly. Honestly, I'd probably just
setup the dataframes and write it in SQL, possibly with a UDF, to keep
things a little more clear.

Thanks

Mike


On Fri, Aug 26, 2016 at 4:45 PM, Subhajit Purkayastha 
wrote:

> So the data in the fcst dataframe is like this
>
>
>
> Product, fcst_qty
>
> A 100
>
> B 50
>
>
>
> Sales DF has data like this
>
>
>
> Order# Item#Sales qty
>
> 101 A 10
>
> 101 B 5
>
> 102 A 5
>
> 102 B 10
>
>
>
> I want to update the FCSt DF data, based on Product=Item#
>
>
>
> So the resultant FCST DF should have data
>
> Product, fcst_qty
>
> A 85
>
> B 35
>
>
>
> Hope it helps
>
>
>
> If I join the data between the 2 DFs (based on Product# and item#), I will
> get a cartesion join and my result will not be what I want
>
>
>
> Thanks for your help
>
>
>
>
>
> *From:* Mike Metzger [mailto:m...@flexiblecreations.com]
> *Sent:* Friday, August 26, 2016 2:12 PM
>
> *To:* Subhajit Purkayastha 
> *Cc:* user @spark 
> *Subject:* Re: Spark 2.0 - Insert/Update to a DataFrame
>
>
>
> Without seeing exactly what you were wanting to accomplish, it's hard to
> say.  A Join is still probably the method I'd suggest using something like:
>
>
>
> select (FCST.quantity - SO.quantity) as quantity
>
> 
>
> from FCST
>
> LEFT OUTER JOIN
>
> SO ON FCST.productid = SO.productid
>
> WHERE
>
> 
>
>
>
> with specifics depending on the layout and what language you're using.
>
>
>
> Thanks
>
>
>
> Mike
>
>
>
> On Fri, Aug 26, 2016 at 3:29 PM, Subhajit Purkayastha 
> wrote:
>
> Mike,
>
>
>
> The grains of the dataFrame are different.
>
>
>
> I need to reduce the forecast qty (which is in the FCST DF)  based on the
> sales qty (coming from the sales  order DF)
>
>
>
> Hope it helps
>
>
>
> Subhajit
>
>
>
> *From:* Mike Metzger [mailto:m...@flexiblecreations.com]
> *Sent:* Friday, August 26, 2016 1:13 PM
> *To:* Subhajit Purkayastha 
> *Cc:* user @spark 
> *Subject:* Re: Spark 2.0 - Insert/Update to a DataFrame
>
>
>
> Without seeing the makeup of the Dataframes nor what your logic is for
> updating them, I'd suggest doing a join of the Forecast DF with the
> appropriate columns from the SalesOrder DF.
>
>
>
> Mike
>
>
>
> On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha 
> wrote:
>
> I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need
> to update the Forecast Dataframe record(s), based on the SaleOrder DF
> record. What is the best way to achieve this functionality
>
>
>
>
>


Re: Please assist: Building Docker image containing spark 2.0

2016-08-26 Thread Mike Metzger
I would also suggest building the container manually first and setup
everything you specifically need.  Once done, you can then grab the history
file, pull out the invalid commands and build out the completed
Dockerfile.  Trying to troubleshoot an installation via Dockerfile is often
an exercise in futility.

Thanks

Mike


On Fri, Aug 26, 2016 at 5:14 PM, Michael Gummelt 
wrote:

> Run with "-X -e" like the error message says. See what comes out.
>
> On Fri, Aug 26, 2016 at 2:23 PM, Tal Grynbaum 
> wrote:
>
>> Did you specify -Dscala-2.10
>> As in
>> ./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4
>> -Dscala-2.10 -DskipTests clean package
>> If you're building with scala 2.10
>>
>> On Sat, Aug 27, 2016, 00:18 Marco Mistroni  wrote:
>>
>>> Hello Michael
>>> uhm i celebrated too soon
>>> Compilation of spark on docker image went near the end and then it
>>> errored out with this message
>>>
>>> INFO] BUILD FAILURE
>>> [INFO] 
>>> 
>>> [INFO] Total time: 01:01 h
>>> [INFO] Finished at: 2016-08-26T21:12:25+00:00
>>> [INFO] Final Memory: 69M/324M
>>> [INFO] 
>>> 
>>> [ERROR] Failed to execute goal 
>>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile
>>> (scala-compile-first) on project spark-mllib_2.11: Execution
>>> scala-compile-first of goal 
>>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile
>>> failed. CompileFailed -> [Help 1]
>>> [ERROR]
>>> [ERROR] To see the full stack trace of the errors, re-run Maven with the
>>> -e switch.
>>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>>> [ERROR]
>>> [ERROR] For more information about the errors and possible solutions,
>>> please read the following articles:
>>> [ERROR] [Help 1] http://cwiki.apache.org/conflu
>>> ence/display/MAVEN/PluginExecutionException
>>> [ERROR]
>>> [ERROR] After correcting the problems, you can resume the build with the
>>> command
>>> [ERROR]   mvn  -rf :spark-mllib_2.11
>>> The command '/bin/sh -c ./build/mvn -Pyarn -Phadoop-2.4
>>> -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero code:
>>> 1
>>>
>>> what am i forgetting?
>>> once again, last command i launched on the docker file is
>>>
>>>
>>> RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests
>>> clean package
>>>
>>> kr
>>>
>>>
>>>
>>> On Fri, Aug 26, 2016 at 6:18 PM, Michael Gummelt >> > wrote:
>>>
>>>> :)
>>>>
>>>> On Thu, Aug 25, 2016 at 2:29 PM, Marco Mistroni 
>>>> wrote:
>>>>
>>>>> No i wont accept that :)
>>>>> I can't believe i have wasted 3 hrs for a space!
>>>>>
>>>>> Many thanks MIchael!
>>>>>
>>>>> kr
>>>>>
>>>>> On Thu, Aug 25, 2016 at 10:01 PM, Michael Gummelt <
>>>>> mgumm...@mesosphere.io> wrote:
>>>>>
>>>>>> You have a space between "build" and "mvn"
>>>>>>
>>>>>> On Thu, Aug 25, 2016 at 1:31 PM, Marco Mistroni 
>>>>>> wrote:
>>>>>>
>>>>>>> HI all
>>>>>>>  sorry for the partially off-topic, i hope there's someone on the
>>>>>>> list who has tried the same and encountered similar issuse
>>>>>>>
>>>>>>> Ok so i have created a Docker file to build an ubuntu container
>>>>>>> which inlcudes spark 2.0, but somehow when it gets to the point where it
>>>>>>> has to kick off  ./build/mvn command, it errors out with the following
>>>>>>>
>>>>>>> ---> Running in 8c2aa6d59842
>>>>>>> /bin/sh: 1: ./build: Permission denied
>>>>>>> The command '/bin/sh -c ./build mvn -Pyarn -Phadoop-2.4
>>>>>>> -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero 
>>>>>>> code:
>>>>>>> 126
>>>>>>>
>>>>>>> I am puzzled as i am root when i build the container, so i should
>>>>>>> not encounter this issue (btw, if instead of running mvn from the build
>>>>>>> directory  i use the mvn which i installed on the container, it works 
>>>>>>> fine
>>>>>>> but it's  painfully slow)
>>>>>>>
>>>>>>> here are the details of my Spark command( scala 2.10, java 1.7 , mvn
>>>>>>> 3.3.9 and git have already been installed)
>>>>>>>
>>>>>>> # Spark
>>>>>>> RUN echo "Installing Apache spark 2.0"
>>>>>>> RUN git clone git://github.com/apache/spark.git
>>>>>>> WORKDIR /spark
>>>>>>> RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0
>>>>>>> -DskipTests clean package
>>>>>>>
>>>>>>>
>>>>>>> Could anyone assist pls?
>>>>>>>
>>>>>>> kindest regarsd
>>>>>>>  Marco
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Michael Gummelt
>>>>>> Software Engineer
>>>>>> Mesosphere
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Michael Gummelt
>>>> Software Engineer
>>>> Mesosphere
>>>>
>>>
>>>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


Re: Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Mike Metzger
Without seeing exactly what you were wanting to accomplish, it's hard to
say.  A Join is still probably the method I'd suggest using something like:

select (FCST.quantity - SO.quantity) as quantity

from FCST
LEFT OUTER JOIN
SO ON FCST.productid = SO.productid
WHERE


with specifics depending on the layout and what language you're using.

Thanks

Mike

On Fri, Aug 26, 2016 at 3:29 PM, Subhajit Purkayastha 
wrote:

> Mike,
>
>
>
> The grains of the dataFrame are different.
>
>
>
> I need to reduce the forecast qty (which is in the FCST DF)  based on the
> sales qty (coming from the sales  order DF)
>
>
>
> Hope it helps
>
>
>
> Subhajit
>
>
>
> *From:* Mike Metzger [mailto:m...@flexiblecreations.com]
> *Sent:* Friday, August 26, 2016 1:13 PM
> *To:* Subhajit Purkayastha 
> *Cc:* user @spark 
> *Subject:* Re: Spark 2.0 - Insert/Update to a DataFrame
>
>
>
> Without seeing the makeup of the Dataframes nor what your logic is for
> updating them, I'd suggest doing a join of the Forecast DF with the
> appropriate columns from the SalesOrder DF.
>
>
>
> Mike
>
>
>
> On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha 
> wrote:
>
> I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need
> to update the Forecast Dataframe record(s), based on the SaleOrder DF
> record. What is the best way to achieve this functionality
>
>
>


Re: Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Mike Metzger
Without seeing the makeup of the Dataframes nor what your logic is for
updating them, I'd suggest doing a join of the Forecast DF with the
appropriate columns from the SalesOrder DF.

Mike

On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha 
wrote:

> I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need
> to update the Forecast Dataframe record(s), based on the SaleOrder DF
> record. What is the best way to achieve this functionality
>


Re: UDF on lpad

2016-08-25 Thread Mike Metzger
Is this what you're after?

def padString(id: Int, chars: String, length: Int): String =
 chars * length + id.toString

padString(123, "0", 10)

res4: String = 00123

Mike

On Thu, Aug 25, 2016 at 12:39 PM, Mich Talebzadeh  wrote:

> Thanks Mike.
>
> Can one turn the first example into a generic UDF similar to the output
> from below where 10 "0" are padded to the left of 123
>
>   def padString(id: Int, chars: String, length: Int): String =
>  (0 until length).map(_ => chars(Random.nextInt(chars.length))).mkString
> + id.toString
>
> scala> padString(123, "0", 10)
> res6: String = 00123
>
> Cheers
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 25 August 2016 at 17:29, Mike Metzger 
> wrote:
>
>> Are you trying to always add x numbers of digits / characters or are you
>> trying to pad to a specific length?  If the latter, try using format
>> strings:
>>
>> // Pad to 10 0 characters
>> val c = 123
>> f"$c%010d"
>>
>> // Result is 000123
>>
>>
>> // Pad to 10 total characters with 0's
>> val c = 123.87
>> f"$c%010.2f"
>>
>> // Result is 123.87
>>
>>
>> You can also do inline operations on the values before formatting.  I've
>> used this specifically to pad for hex digits from strings.
>>
>> val d = "100"
>> val hexstring = f"0x${d.toInt}%08X"
>>
>> // hexstring is 0x0064
>>
>>
>> Thanks
>>
>> Mike
>>
>> On Thu, Aug 25, 2016 at 9:27 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Ok I tried this
>>>
>>> def padString(s: String, chars: String, length: Int): String =
>>>  |  (0 until length).map(_ => 
>>> chars(Random.nextInt(chars.length))).mkString
>>> + s
>>>
>>> padString: (s: String, chars: String, length: Int)String
>>> And use it like below:
>>>
>>> Example left pad the figure 12345.87 with 10 "0"s
>>>
>>> padString("12345.87", "0", 10)
>>> res79: String = 0012345.87
>>>
>>> Any better way?
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 25 August 2016 at 12:06, Mich Talebzadeh 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> This UDF on substring works
>>>>
>>>> scala> val SubstrUDF = udf { (s: String, start: Int, end: Int) =>
>>>> s.substring(start, end) }
>>>> SubstrUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
>>>> UserDefinedFunction(,StringType,Some(List(StringType,
>>>> IntegerType, IntegerType)))
>>>>
>>>> I want something similar to this
>>>>
>>>> scala> sql("""select lpad("str", 10, "0")""").show
>>>> ++
>>>> |lpad(str, 10, 0)|
>>>> ++
>>>> |  000str|
>>>> ++
>>>>
>>>> scala> val SubstrUDF = udf { (s: String, len: Int, chars: String) =>
>>>> lpad(s, len, chars) }
>>>> :40: error: type mismatch;
>>>>  found   : String
>>>>  required: org.apache.spark.sql.Column
>>>>val SubstrUDF = udf { (s: String, len: Int, chars: String) =>
>>>> lpad(s, len, chars) }
>>>>
>>>>
>>>> Any ideas?
>>>>
>>>> Thanks
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>
>>>
>>
>


Re: UDF on lpad

2016-08-25 Thread Mike Metzger
Are you trying to always add x numbers of digits / characters or are you
trying to pad to a specific length?  If the latter, try using format
strings:

// Pad to 10 0 characters
val c = 123
f"$c%010d"

// Result is 000123


// Pad to 10 total characters with 0's
val c = 123.87
f"$c%010.2f"

// Result is 123.87


You can also do inline operations on the values before formatting.  I've
used this specifically to pad for hex digits from strings.

val d = "100"
val hexstring = f"0x${d.toInt}%08X"

// hexstring is 0x0064


Thanks

Mike

On Thu, Aug 25, 2016 at 9:27 AM, Mich Talebzadeh 
wrote:

> Ok I tried this
>
> def padString(s: String, chars: String, length: Int): String =
>  |  (0 until length).map(_ => 
> chars(Random.nextInt(chars.length))).mkString
> + s
>
> padString: (s: String, chars: String, length: Int)String
> And use it like below:
>
> Example left pad the figure 12345.87 with 10 "0"s
>
> padString("12345.87", "0", 10)
> res79: String = 0012345.87
>
> Any better way?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 25 August 2016 at 12:06, Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> This UDF on substring works
>>
>> scala> val SubstrUDF = udf { (s: String, start: Int, end: Int) =>
>> s.substring(start, end) }
>> SubstrUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
>> UserDefinedFunction(,StringType,Some(List(StringType,
>> IntegerType, IntegerType)))
>>
>> I want something similar to this
>>
>> scala> sql("""select lpad("str", 10, "0")""").show
>> ++
>> |lpad(str, 10, 0)|
>> ++
>> |  000str|
>> ++
>>
>> scala> val SubstrUDF = udf { (s: String, len: Int, chars: String) =>
>> lpad(s, len, chars) }
>> :40: error: type mismatch;
>>  found   : String
>>  required: org.apache.spark.sql.Column
>>val SubstrUDF = udf { (s: String, len: Int, chars: String) =>
>> lpad(s, len, chars) }
>>
>>
>> Any ideas?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>


Re: Sum array values by row in new column

2016-08-15 Thread Mike Metzger
Assuming you know the number of elements in the list, this should work:

df.withColumn('total', df["_1"].getItem(0) + df["_1"].getItem(1) +
df["_1"].getItem(2))

Mike

On Mon, Aug 15, 2016 at 12:02 PM, Javier Rey  wrote:

> Hi everyone,
>
> I have one dataframe with one column this column is an array of numbers,
> how can I sum each array by row a obtain a new column with sum? in pyspark.
>
> Example:
>
> ++
> | numbers|
> ++
> |[10, 20, 30]|
> |[40, 50, 60]|
> |[70, 80, 90]|
> ++
>
> The idea is obtain the same df with a new column with totals:
>
> ++--
> | numbers| |
> ++--
> |[10, 20, 30]|60   |
> |[40, 50, 60]|150  |
> |[70, 80, 90]|240  |
> ++--
>
> Regards!
>
> Samir
>
>
>
>


Re: Generating unique id for a column in Row without breaking into RDD and joining back

2016-08-05 Thread Mike Metzger
If I understand your question correctly, the current implementation doesn't
allow a starting value, but it's easy enough to pull off with something
like:

val startval = 1
df.withColumn('id', monotonicallyIncreasingId + startval)

Two points - your test shows what happens with a single partition. With
multiple partitions, the id values will inherently be much higher (due to
the partition id being the upper 31 bits of the value.)

The other note is that the startval in this case would need to be
communicated along with the job.  It may be worth defining it as a
broadcast variable and referencing it that way so there's less cluster
communication involved.  Honestly I doubt there's a lot of variance with
this small of a value but it's a good habit to get into.

Thanks

Mike

On Fri, Aug 5, 2016 at 11:33 AM, Mich Talebzadeh 
wrote:

> Thanks Mike for this.
>
> This is Scala. As expected it adds the id column to the end of the column
> list starting from 0 0
>
> scala> val df = ll_18740868.withColumn("id", 
> monotonically_increasing_id()).show
> (2)
> +---+---+-+-+---
> ---+---++---+---+
> |transactiondate|transactiontype| sortcode|accountnumber|transac
> tiondescription|debitamount|creditamount|balance| id|
> +---+---+-+-+---
> ---+---++---+---+
> | 2009-12-31|CPT|'30-64-72| 18740868|  LTSB STH
> KENSINGT...|   90.0|null|  400.0|  0|
> | 2009-12-31|CPT|'30-64-72| 18740868|  LTSB CHELSEA
> (309...|   10.0|null|  490.0|  1|
> +---+---+-+-+---
> ---+---++---+---+
>
> Can one provide the starting value say 1?
>
> Cheers
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 5 August 2016 at 16:45, Mike Metzger 
> wrote:
>
>> You can use the monotonically_increasing_id method to generate guaranteed
>> unique (but not necessarily consecutive) IDs.  Calling something like:
>>
>> df.withColumn("id", monotonically_increasing_id())
>>
>> You don't mention which language you're using but you'll need to pull in
>> the sql.functions library.
>>
>> Mike
>>
>> On Aug 5, 2016, at 9:11 AM, Tony Lane  wrote:
>>
>> Ayan - basically i have a dataset with structure, where bid are unique
>> string values
>>
>> bid: String
>> val : integer
>>
>> I need unique int values for these string bid''s to do some processing in
>> the dataset
>>
>> like
>>
>> id:int   (unique integer id for each bid)
>> bid:String
>> val:integer
>>
>>
>>
>> -Tony
>>
>> On Fri, Aug 5, 2016 at 6:35 PM, ayan guha  wrote:
>>
>>> Hi
>>>
>>> Can you explain a little further?
>>>
>>> best
>>> Ayan
>>>
>>> On Fri, Aug 5, 2016 at 10:14 PM, Tony Lane 
>>> wrote:
>>>
>>>> I have a row with structure like
>>>>
>>>> identifier: String
>>>> value: int
>>>>
>>>> All identifier are unique and I want to generate a unique long id for
>>>> the data and get a row object back for further processing.
>>>>
>>>> I understand using the zipWithUniqueId function on RDD, but that would
>>>> mean first converting to RDD and then joining back the RDD and dataset
>>>>
>>>> What is the best way to do this ?
>>>>
>>>> -Tony
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>


Re: Generating unique id for a column in Row without breaking into RDD and joining back

2016-08-05 Thread Mike Metzger
Should be pretty much the same code for Scala -

import java.util.UUID
UUID.randomUUID

If you need it as a UDF, just wrap it accordingly.

Mike

On Fri, Aug 5, 2016 at 11:38 AM, Mich Talebzadeh 
wrote:

> On the same token can one generate  a UUID like below in Hive
>
> hive> select reflect("java.util.UUID", "randomUUID");
> OK
> 587b1665-b578-4124-8bf9-8b17ccb01fe7
>
> thx
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 5 August 2016 at 17:34, Mike Metzger 
> wrote:
>
>> Tony -
>>
>>From my testing this is built with performance in mind.  It's a 64-bit
>> value split between the partition id (upper 31 bits ~1billion) and the id
>> counter within a partition (lower 33 bits ~8 billion).  There shouldn't be
>> any added communication between the executors and the driver for that.
>>
>> I've been toying with an implementation that allows you to specify the
>> split for better control along with a start value.
>>
>> Thanks
>>
>> Mike
>>
>> On Aug 5, 2016, at 11:07 AM, Tony Lane  wrote:
>>
>> Mike.
>>
>> I have figured how to do this .  Thanks for the suggestion. It works
>> great.  I am trying to figure out the performance impact of this.
>>
>> thanks again
>>
>>
>> On Fri, Aug 5, 2016 at 9:25 PM, Tony Lane  wrote:
>>
>>> @mike  - this looks great. How can i do this in java ?   what is the
>>> performance implication on a large dataset  ?
>>>
>>> @sonal  - I can't have a collision in the values.
>>>
>>> On Fri, Aug 5, 2016 at 9:15 PM, Mike Metzger >> > wrote:
>>>
>>>> You can use the monotonically_increasing_id method to generate
>>>> guaranteed unique (but not necessarily consecutive) IDs.  Calling something
>>>> like:
>>>>
>>>> df.withColumn("id", monotonically_increasing_id())
>>>>
>>>> You don't mention which language you're using but you'll need to pull
>>>> in the sql.functions library.
>>>>
>>>> Mike
>>>>
>>>> On Aug 5, 2016, at 9:11 AM, Tony Lane  wrote:
>>>>
>>>> Ayan - basically i have a dataset with structure, where bid are unique
>>>> string values
>>>>
>>>> bid: String
>>>> val : integer
>>>>
>>>> I need unique int values for these string bid''s to do some processing
>>>> in the dataset
>>>>
>>>> like
>>>>
>>>> id:int   (unique integer id for each bid)
>>>> bid:String
>>>> val:integer
>>>>
>>>>
>>>>
>>>> -Tony
>>>>
>>>> On Fri, Aug 5, 2016 at 6:35 PM, ayan guha  wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> Can you explain a little further?
>>>>>
>>>>> best
>>>>> Ayan
>>>>>
>>>>> On Fri, Aug 5, 2016 at 10:14 PM, Tony Lane 
>>>>> wrote:
>>>>>
>>>>>> I have a row with structure like
>>>>>>
>>>>>> identifier: String
>>>>>> value: int
>>>>>>
>>>>>> All identifier are unique and I want to generate a unique long id for
>>>>>> the data and get a row object back for further processing.
>>>>>>
>>>>>> I understand using the zipWithUniqueId function on RDD, but that
>>>>>> would mean first converting to RDD and then joining back the RDD and 
>>>>>> dataset
>>>>>>
>>>>>> What is the best way to do this ?
>>>>>>
>>>>>> -Tony
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Generating unique id for a column in Row without breaking into RDD and joining back

2016-08-05 Thread Mike Metzger
Not that I've seen, at least not in any worker independent way.  To guarantee 
consecutive values you'd have to create a udf or some such that provided a new 
row id.  This probably isn't an issue on small data sets but would cause a lot 
of added communication on larger clusters / datasets.

Mike

> On Aug 5, 2016, at 11:21 AM, janardhan shetty  wrote:
> 
> Mike,
> 
> Any suggestions on doing it for consequitive id's?
> 
>> On Aug 5, 2016 9:08 AM, "Tony Lane"  wrote:
>> Mike.
>> 
>> I have figured how to do this .  Thanks for the suggestion. It works great.  
>> I am trying to figure out the performance impact of this. 
>> 
>> thanks again
>> 
>> 
>>> On Fri, Aug 5, 2016 at 9:25 PM, Tony Lane  wrote:
>>> @mike  - this looks great. How can i do this in java ?   what is the 
>>> performance implication on a large dataset  ? 
>>> 
>>> @sonal  - I can't have a collision in the values. 
>>> 
>>>> On Fri, Aug 5, 2016 at 9:15 PM, Mike Metzger  
>>>> wrote:
>>>> You can use the monotonically_increasing_id method to generate guaranteed 
>>>> unique (but not necessarily consecutive) IDs.  Calling something like:
>>>> 
>>>> df.withColumn("id", monotonically_increasing_id())
>>>> 
>>>> You don't mention which language you're using but you'll need to pull in 
>>>> the sql.functions library.
>>>> 
>>>> Mike
>>>> 
>>>>> On Aug 5, 2016, at 9:11 AM, Tony Lane  wrote:
>>>>> 
>>>>> Ayan - basically i have a dataset with structure, where bid are unique 
>>>>> string values
>>>>> 
>>>>> bid: String
>>>>> val : integer
>>>>> 
>>>>> I need unique int values for these string bid''s to do some processing in 
>>>>> the dataset
>>>>> 
>>>>> like 
>>>>> 
>>>>> id:int   (unique integer id for each bid)
>>>>> bid:String
>>>>> val:integer
>>>>> 
>>>>> 
>>>>> 
>>>>> -Tony
>>>>> 
>>>>>> On Fri, Aug 5, 2016 at 6:35 PM, ayan guha  wrote:
>>>>>> Hi
>>>>>> 
>>>>>> Can you explain a little further? 
>>>>>> 
>>>>>> best
>>>>>> Ayan
>>>>>> 
>>>>>>> On Fri, Aug 5, 2016 at 10:14 PM, Tony Lane  
>>>>>>> wrote:
>>>>>>> I have a row with structure like
>>>>>>> 
>>>>>>> identifier: String
>>>>>>> value: int
>>>>>>> 
>>>>>>> All identifier are unique and I want to generate a unique long id for 
>>>>>>> the data and get a row object back for further processing. 
>>>>>>> 
>>>>>>> I understand using the zipWithUniqueId function on RDD, but that would 
>>>>>>> mean first converting to RDD and then joining back the RDD and dataset
>>>>>>> 
>>>>>>> What is the best way to do this ? 
>>>>>>> 
>>>>>>> -Tony 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> -- 
>>>>>> Best Regards,
>>>>>> Ayan Guha


Re: Generating unique id for a column in Row without breaking into RDD and joining back

2016-08-05 Thread Mike Metzger
Tony -

   From my testing this is built with performance in mind.  It's a 64-bit value 
split between the partition id (upper 31 bits ~1billion) and the id counter 
within a partition (lower 33 bits ~8 billion).  There shouldn't be any added 
communication between the executors and the driver for that.

I've been toying with an implementation that allows you to specify the split 
for better control along with a start value. 

Thanks

Mike

> On Aug 5, 2016, at 11:07 AM, Tony Lane  wrote:
> 
> Mike.
> 
> I have figured how to do this .  Thanks for the suggestion. It works great.  
> I am trying to figure out the performance impact of this. 
> 
> thanks again
> 
> 
>> On Fri, Aug 5, 2016 at 9:25 PM, Tony Lane  wrote:
>> @mike  - this looks great. How can i do this in java ?   what is the 
>> performance implication on a large dataset  ? 
>> 
>> @sonal  - I can't have a collision in the values. 
>> 
>>> On Fri, Aug 5, 2016 at 9:15 PM, Mike Metzger  
>>> wrote:
>>> You can use the monotonically_increasing_id method to generate guaranteed 
>>> unique (but not necessarily consecutive) IDs.  Calling something like:
>>> 
>>> df.withColumn("id", monotonically_increasing_id())
>>> 
>>> You don't mention which language you're using but you'll need to pull in 
>>> the sql.functions library.
>>> 
>>> Mike
>>> 
>>>> On Aug 5, 2016, at 9:11 AM, Tony Lane  wrote:
>>>> 
>>>> Ayan - basically i have a dataset with structure, where bid are unique 
>>>> string values
>>>> 
>>>> bid: String
>>>> val : integer
>>>> 
>>>> I need unique int values for these string bid''s to do some processing in 
>>>> the dataset
>>>> 
>>>> like 
>>>> 
>>>> id:int   (unique integer id for each bid)
>>>> bid:String
>>>> val:integer
>>>> 
>>>> 
>>>> 
>>>> -Tony
>>>> 
>>>>> On Fri, Aug 5, 2016 at 6:35 PM, ayan guha  wrote:
>>>>> Hi
>>>>> 
>>>>> Can you explain a little further? 
>>>>> 
>>>>> best
>>>>> Ayan
>>>>> 
>>>>>> On Fri, Aug 5, 2016 at 10:14 PM, Tony Lane  
>>>>>> wrote:
>>>>>> I have a row with structure like
>>>>>> 
>>>>>> identifier: String
>>>>>> value: int
>>>>>> 
>>>>>> All identifier are unique and I want to generate a unique long id for 
>>>>>> the data and get a row object back for further processing. 
>>>>>> 
>>>>>> I understand using the zipWithUniqueId function on RDD, but that would 
>>>>>> mean first converting to RDD and then joining back the RDD and dataset
>>>>>> 
>>>>>> What is the best way to do this ? 
>>>>>> 
>>>>>> -Tony 
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Best Regards,
>>>>> Ayan Guha
> 


Re: Generating unique id for a column in Row without breaking into RDD and joining back

2016-08-05 Thread Mike Metzger
You can use the monotonically_increasing_id method to generate guaranteed 
unique (but not necessarily consecutive) IDs.  Calling something like:

df.withColumn("id", monotonically_increasing_id())

You don't mention which language you're using but you'll need to pull in the 
sql.functions library.

Mike

> On Aug 5, 2016, at 9:11 AM, Tony Lane  wrote:
> 
> Ayan - basically i have a dataset with structure, where bid are unique string 
> values
> 
> bid: String
> val : integer
> 
> I need unique int values for these string bid''s to do some processing in the 
> dataset
> 
> like 
> 
> id:int   (unique integer id for each bid)
> bid:String
> val:integer
> 
> 
> 
> -Tony
> 
>> On Fri, Aug 5, 2016 at 6:35 PM, ayan guha  wrote:
>> Hi
>> 
>> Can you explain a little further? 
>> 
>> best
>> Ayan
>> 
>>> On Fri, Aug 5, 2016 at 10:14 PM, Tony Lane  wrote:
>>> I have a row with structure like
>>> 
>>> identifier: String
>>> value: int
>>> 
>>> All identifier are unique and I want to generate a unique long id for the 
>>> data and get a row object back for further processing. 
>>> 
>>> I understand using the zipWithUniqueId function on RDD, but that would mean 
>>> first converting to RDD and then joining back the RDD and dataset
>>> 
>>> What is the best way to do this ? 
>>> 
>>> -Tony 
>> 
>> 
>> 
>> -- 
>> Best Regards,
>> Ayan Guha
> 


Re: Add column sum as new column in PySpark dataframe

2016-08-04 Thread Mike Metzger
This is a little ugly, but it may do what you're after -

df.withColumn('total', expr("+".join([col for col in df.columns])))

I believe this will handle null values ok, but will likely error if there
are any string columns present.


Mike



On Thu, Aug 4, 2016 at 8:41 AM, Javier Rey  wrote:

> Hi everybody,
>
> Sorry, I sent last mesage it was imcomplete this is complete:
>
> I'm using PySpark and I have a Spark dataframe with a bunch of numeric
> columns. I want to add a column that is the sum of all the other columns.
>
> Suppose my dataframe had columns "a", "b", and "c". I know I can do this:
>
> df.withColumn('total_col', df.a + df.b + df.c)
>
> The problem is that I don't want to type out each column individually and
> add them, especially if I have a lot of columns. I want to be able to do
> this automatically or by specifying a list of column names that I want to
> add. Is there another way to do this?
>
> I find this solution:
>
> df.withColumn('total', sum(df[col] for col in df.columns))
>
> But I get this error:
>
> "AttributeError: 'generator' object has no attribute '_get_object_id"
>
> Additionally I want to sum onlt not nulls values.
>
> Thanks in advance,
>
> Samir
>


Python memory included YARN-monitored memory?

2016-05-27 Thread Mike Sukmanowsky
Hi everyone,

More of a YARN/OS question than a Spark one, but would be good to clarify
this on the docs somewhere once I get an answer.

We use PySpark for all our Spark applications running on EMR. Like many
users, we're accustomed to seeing the occasional ExecutorLostFailure after
YARN kills a container using more memory than it was allocated.

We're beginning to tune spark.yarn.executor.memoryOverhead, but before
messing around with that I wanted to check if YARN is monitoring the memory
usage of both the executor JVM and the spawned pyspark.daemon process or
just the JVM? Inspecting things on one of the YARN nodes would seem to
indicate this isn't the case since the spawned daemon gets a separate
process ID and process group, but I wanted to check to confirm as it could
make a big difference to pyspark users hoping to tune things.

Thanks,
Mike


RE: SparkR query

2016-05-17 Thread Mike Lewis
Thanks, I’m just using RStudio. Running locally is fine, just issue with having 
cluster in Linux and workers looking for Windows path,
Which must be being passed through by the driver I guess. I checked the 
spark-env.sh on each node and the appropriate SPARK_HOME is set
correctly….


From: Sun Rui [mailto:sunrise_...@163.com]
Sent: 17 May 2016 11:32
To: Mike Lewis
Cc: user@spark.apache.org
Subject: Re: SparkR query

Lewis,
1. Could you check the values of “SPARK_HOME” environment on all of your worker 
nodes?
2. How did you start your SparkR shell?

On May 17, 2016, at 18:07, Mike Lewis 
mailto:mle...@nephilaadvisors.co.uk>> wrote:

Hi,

I have a SparkR driver process that connects to a master running on Linux,
I’ve tried to do a simple test, e.g.

sc <- sparkR.init(master="spark://my-linux-host.dev.local:7077",  
sparkEnvir=list(spark.cores.max="4"))
x <- SparkR:::parallelize(sc,1:100,2)
y <- count(x)

But I can see that the worker nodes are failing, they are looking for the 
Windows (rather than linux path) to
Daemon.R


16/05/17 06:52:33 INFO BufferedStreamThread: Fatal error: cannot open file 
'E:/Dev/spark/spark-1.4.0-bin-hadoop2.6/R/lib/SparkR/worker/daemon.R': No such 
file or directory

Is this a configuration setting that I’m missing, the worker nodes (linux) 
shouldn’t be looking in the spark home of the driver (windows) ?
If so, I’d appreciate someone letting me know what I need to change/set.

Thanks,
Mike Lewis


--
This email has been sent to you on behalf of Nephila Advisors LLC (“Advisors”). 
Advisors provides consultancy services to Nephila Capital Ltd. (“Capital”), an 
investment advisor managed and carrying on business in Bermuda. Advisors and 
its employees do not act as agents for Capital or the funds it advises and do 
not have the authority to bind Capital or such funds to any transaction or 
agreement.

The information in this e-mail, and any attachment therein, is confidential and 
for use by the addressee only. Any use, disclosure, reproduction, modification 
or distribution of the contents of this e-mail, or any part thereof, other than 
by the intended recipient, is strictly prohibited. If you are not the intended 
recipient, please return the e-mail to the sender and delete it from your 
computer. This email is for information purposes only, nothing contained herein 
constitutes an offer to sell or buy securities, as such an offer may only be 
made from a properly authorized offering document. Although Nephila attempts to 
sweep e-mail and attachments for viruses, it does not guarantee that either are 
virus-free and accepts no liability for any damage sustained as a result of 
viruses.
--

--
This email has been sent to you on behalf of Nephila Advisors UK (“Advisors 
UK”). Advisors UK provides consultancy services to Nephila Capital Ltd. 
(“Capital”), an investment advisor managed and carrying on business in Bermuda. 
Advisors UK and its employees do not act as agents for Capital or the funds it 
advises and do not have the authority to bind Capital or such funds to any 
transaction or agreement.

The information in this e-mail, and any attachment therein, is confidential and 
for use by the addressee only. Any use, disclosure, reproduction, modification 
or distribution of the contents of this e-mail, or any part thereof, other than 
by the intended recipient, is strictly prohibited. If you are not the intended 
recipient, please return the e-mail to the sender and delete it from your 
computer. This email is for information purposes only, nothing contained herein 
constitutes an offer to sell or buy securities, as such an offer may only be 
made from a properly authorized offering document. Although Nephila attempts to 
sweep e-mail and attachments for viruses, it does not guarantee that either are 
virus-free and accepts no liability for any damage sustained as a result of 
viruses.
--


--
This email has been sent to you on behalf of Nephila Advisors LLC (“Advisors”). 
Advisors provides consultancy services to Nephila Capital Ltd. (“Capital”), an 
investment advisor managed and carrying on business in Bermuda. Advisors and 
its employees do not act as agents for Capital or the funds it advises and do 
not have the authority to bind Capital or such funds to any transaction or 
agreement.

The information in this e-mail, and any attach

SparkR query

2016-05-17 Thread Mike Lewis
Hi,

I have a SparkR driver process that connects to a master running on Linux,
I’ve tried to do a simple test, e.g.

sc <- sparkR.init(master="spark://my-linux-host.dev.local:7077",  
sparkEnvir=list(spark.cores.max="4"))
x <- SparkR:::parallelize(sc,1:100,2)
y <- count(x)

But I can see that the worker nodes are failing, they are looking for the 
Windows (rather than linux path) to
Daemon.R


16/05/17 06:52:33 INFO BufferedStreamThread: Fatal error: cannot open file 
'E:/Dev/spark/spark-1.4.0-bin-hadoop2.6/R/lib/SparkR/worker/daemon.R': No such 
file or directory

Is this a configuration setting that I’m missing, the worker nodes (linux) 
shouldn’t be looking in the spark home of the driver (windows) ?
If so, I’d appreciate someone letting me know what I need to change/set.

Thanks,
Mike Lewis


--
This email has been sent to you on behalf of Nephila Advisors LLC (“Advisors”). 
Advisors provides consultancy services to Nephila Capital Ltd. (“Capital”), an 
investment advisor managed and carrying on business in Bermuda. Advisors and 
its employees do not act as agents for Capital or the funds it advises and do 
not have the authority to bind Capital or such funds to any transaction or 
agreement.

The information in this e-mail, and any attachment therein, is confidential and 
for use by the addressee only. Any use, disclosure, reproduction, modification 
or distribution of the contents of this e-mail, or any part thereof, other than 
by the intended recipient, is strictly prohibited. If you are not the intended 
recipient, please return the e-mail to the sender and delete it from your 
computer. This email is for information purposes only, nothing contained herein 
constitutes an offer to sell or buy securities, as such an offer may only be 
made from a properly authorized offering document. Although Nephila attempts to 
sweep e-mail and attachments for viruses, it does not guarantee that either are 
virus-free and accepts no liability for any damage sustained as a result of 
viruses.
--

--
This email has been sent to you on behalf of Nephila Advisors UK (“Advisors 
UK”). Advisors UK provides consultancy services to Nephila Capital Ltd. 
(“Capital”), an investment advisor managed and carrying on business in Bermuda. 
Advisors UK and its employees do not act as agents for Capital or the funds it 
advises and do not have the authority to bind Capital or such funds to any 
transaction or agreement.

The information in this e-mail, and any attachment therein, is confidential and 
for use by the addressee only. Any use, disclosure, reproduction, modification 
or distribution of the contents of this e-mail, or any part thereof, other than 
by the intended recipient, is strictly prohibited. If you are not the intended 
recipient, please return the e-mail to the sender and delete it from your 
computer. This email is for information purposes only, nothing contained herein 
constitutes an offer to sell or buy securities, as such an offer may only be 
made from a properly authorized offering document. Although Nephila attempts to 
sweep e-mail and attachments for viruses, it does not guarantee that either are 
virus-free and accepts no liability for any damage sustained as a result of 
viruses.
--


Re: executor delay in Spark

2016-04-27 Thread Mike Hynes
Hi Raghava,

I'm terribly sorry about the end of my last email; that garbled
sentence was garbled because it wasn't meant to exist; I wrote it on
my phone, realized I wouldn't realistically have time to look into
another set of logs deeply enough, and then mistook myself for having
deleted it. Again, I'm very sorry for my error here.

I did peek at your code, though, and think you could try the following:
0. The actions in your main method are many, and it will be hard to
isolate a problem; I would recommend only examing *one* RDD at first,
rather than six.
1. There is a lot of repetition for reading RDDs from textfiles
sequentially; if you put those lines into two methods depending on RDD
type, you will at least have one entry point to work with once you
make a simplified test program.
2. In one part you persist, count, immediately unpersist, and then
count again an RDD.. I'm not acquainted with this idiom, and I don't
understand what that is to achieve. It strikes me suspect for
triggering unusual garbage collection, which would, I think, only
complicate your trace debugging.

I've attached a python script that dumps relevant info from the Spark
JSON logs into a CSV for easier analysis in you language of choice;
hopefully it can aid in finer grained debugging (the headers of the
fields it prints are listed in one of the functions).

Mike

On 4/25/16, Raghava Mutharaju  wrote:
> Mike,
>
> We ran our program with 16, 32 and 64 partitions. The behavior was same as
> before with 8 partitions. It was mixed -- for some RDDs we see an
> all-nothing skew, but for others we see them getting split across the 2
> worker nodes. In some cases, they start with even split and in later
> iterations it goes to all-nothing split. Please find the logs attached.
>
> our program source code:
> https://github.com/raghavam/sparkel/blob/275ecbb901a58592d8a70a8568dd95c839d46ecc/src/main/scala/org/daselab/sparkel/SparkELDAGAnalysis.scala
>
> We put in persist() statements for different RDDs just to check their skew.
>
> @Jeff, setting minRegisteredResourcesRatio did not help. Behavior was same
> as before.
>
> Thank you for your time.
>
> Regards,
> Raghava.
>
>
> On Sun, Apr 24, 2016 at 7:17 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Could you change numPartitions to {16, 32, 64} and run your program for
>> each to see how many partitions are allocated to each worker? Let's see
>> if
>> you experience an all-nothing imbalance that way; if so, my guess is that
>> something else is odd in your program logic or spark runtime environment,
>> but if not and your executors all receive at least *some* partitions,
>> then
>> I still wouldn't rule out effects of scheduling delay. It's a simple
>> test,
>> but it could give some insight.
>>
>> Mike
>>
>> his could still be a  scheduling  If only one has *all* partitions,  and
>> email me the log file? (If it's 10+ MB, just the first few thousand lines
>> are fine).
>> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" 
>> wrote:
>>
>>> Mike, All,
>>>
>>> It turns out that the second time we encountered the uneven-partition
>>> issue is not due to spark-submit. It was resolved with the change in
>>> placement of count().
>>>
>>> Case-1:
>>>
>>> val numPartitions = 8
>>> // read uAxioms from HDFS, use hash partitioner on it and persist it
>>> // read type1Axioms from HDFS, use hash partitioner on it and persist it
>>> currDeltaURule1 = type1Axioms.join(uAxioms)
>>>  .values
>>>  .distinct(numPartitions)
>>>  .partitionBy(hashPartitioner)
>>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>>
>>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>>.count()
>>>
>>> 
>>>
>>> currDeltaURule1 RDD results in all the data on one node (there are 2
>>> worker nodes). If we move count(), the uneven partition issue is
>>> resolved.
>>>
>>> Case-2:
>>>
>>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>>
>>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>>
>>>
>>> 
>>>
>>>  -- this rdd depends on currDeltaURule1 and it gets
>>> executed. This resolved the uneven partitioning issue.
>>>
>>> I don't see why the moving of an action to a later part in the code
>>> would
>&

Re: executor delay in Spark

2016-04-24 Thread Mike Hynes
Could you change numPartitions to {16, 32, 64} and run your program for
each to see how many partitions are allocated to each worker? Let's see if
you experience an all-nothing imbalance that way; if so, my guess is that
something else is odd in your program logic or spark runtime environment,
but if not and your executors all receive at least *some* partitions, then
I still wouldn't rule out effects of scheduling delay. It's a simple test,
but it could give some insight.

Mike

his could still be a  scheduling  If only one has *all* partitions,  and
email me the log file? (If it's 10+ MB, just the first few thousand lines
are fine).
On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" 
wrote:

> Mike, All,
>
> It turns out that the second time we encountered the uneven-partition
> issue is not due to spark-submit. It was resolved with the change in
> placement of count().
>
> Case-1:
>
> val numPartitions = 8
> // read uAxioms from HDFS, use hash partitioner on it and persist it
> // read type1Axioms from HDFS, use hash partitioner on it and persist it
> currDeltaURule1 = type1Axioms.join(uAxioms)
>  .values
>  .distinct(numPartitions)
>  .partitionBy(hashPartitioner)
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>.count()
>
> 
>
> currDeltaURule1 RDD results in all the data on one node (there are 2
> worker nodes). If we move count(), the uneven partition issue is resolved.
>
> Case-2:
>
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>
>
> 
>
>  -- this rdd depends on currDeltaURule1 and it gets executed.
> This resolved the uneven partitioning issue.
>
> I don't see why the moving of an action to a later part in the code would
> affect the partitioning. Are there other factors at play here that affect
> the partitioning?
>
> (Inconsistent) uneven partitioning leads to one machine getting over
> burdened (memory and number of tasks). We see a clear improvement in
> runtime when the partitioning is even (happens when count is moved).
>
> Any pointers in figuring out this issue is much appreciated.
>
> Regards,
> Raghava.
>
>
>
>
> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Glad to hear that the problem was solvable! I have not seen delays of
>> this type for later stages in jobs run by spark-submit, but I do not think
>> it impossible if your stage has no lineage dependence on other RDDs.
>>
>> I'm CC'ing the dev list to report of other users observing load imbalance
>> caused by unusual initial task scheduling. I don't know of ways to avoid
>> this other than creating a dummy task to synchronize the executors, but
>> hopefully someone from there can suggest other possibilities.
>>
>> Mike
>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" 
>> wrote:
>>
>>> Mike,
>>>
>>> It turns out the executor delay, as you mentioned, is the cause. After
>>> we introduced a dummy stage, partitioning was working fine. Does this delay
>>> happen during later stages as well? We noticed the same behavior
>>> (partitioning happens on spark-shell but not through spark-submit) at a
>>> later stage also.
>>>
>>> Apart from introducing a dummy stage or running it from spark-shell, is
>>> there any other option to fix this?
>>>
>>> Regards,
>>> Raghava.
>>>
>>>
>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>>>
>>>> When submitting a job with spark-submit, I've observed delays (up to
>>>> 1--2 seconds) for the executors to respond to the driver in order to
>>>> receive tasks in the first stage. The delay does not persist once the
>>>> executors have been synchronized.
>>>>
>>>> When the tasks are very short, as may be your case (relatively small
>>>> data and a simple map task like you have described), the 8 tasks in
>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>>> the second executor won't have responded to the master before the
>>>> first 4 tasks on the first executor have completed.
>>>>
>>>> To see if this is the cause in your particular case, you could try the
>>>> following to confirm:
>>>&g

Re: executor delay in Spark

2016-04-22 Thread Mike Hynes
Glad to hear that the problem was solvable! I have not seen delays of this
type for later stages in jobs run by spark-submit, but I do not think it
impossible if your stage has no lineage dependence on other RDDs.

I'm CC'ing the dev list to report of other users observing load imbalance
caused by unusual initial task scheduling. I don't know of ways to avoid
this other than creating a dummy task to synchronize the executors, but
hopefully someone from there can suggest other possibilities.

Mike
On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" 
wrote:

> Mike,
>
> It turns out the executor delay, as you mentioned, is the cause. After we
> introduced a dummy stage, partitioning was working fine. Does this delay
> happen during later stages as well? We noticed the same behavior
> (partitioning happens on spark-shell but not through spark-submit) at a
> later stage also.
>
> Apart from introducing a dummy stage or running it from spark-shell, is
> there any other option to fix this?
>
> Regards,
> Raghava.
>
>
> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> When submitting a job with spark-submit, I've observed delays (up to
>> 1--2 seconds) for the executors to respond to the driver in order to
>> receive tasks in the first stage. The delay does not persist once the
>> executors have been synchronized.
>>
>> When the tasks are very short, as may be your case (relatively small
>> data and a simple map task like you have described), the 8 tasks in
>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>> the second executor won't have responded to the master before the
>> first 4 tasks on the first executor have completed.
>>
>> To see if this is the cause in your particular case, you could try the
>> following to confirm:
>> 1. Examine the starting times of the tasks alongside their
>> executor
>> 2. Make a "dummy" stage execute before your real stages to
>> synchronize the executors by creating and materializing any random RDD
>> 3. Make the tasks longer, i.e. with some silly computational work.
>>
>> Mike
>>
>>
>> On 4/17/16, Raghava Mutharaju  wrote:
>> > Yes its the same data.
>> >
>> > 1) The number of partitions are the same (8, which is an argument to the
>> > HashPartitioner). In the first case, these partitions are spread across
>> > both the worker nodes. In the second case, all the partitions are on the
>> > same node.
>> > 2) What resources would be of interest here? Scala shell takes the
>> default
>> > parameters since we use "bin/spark-shell --master " to run
>> the
>> > scala-shell. For the scala program, we do set some configuration options
>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>> > serializer.
>> >
>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>> > RAM.1 executor runs on each worker node. Following configuration options
>> > are set for the scala program -- perhaps we should move it to the spark
>> > config file.
>> >
>> > Driver memory and executor memory are set to 12GB
>> > parallelism is set to 8
>> > Kryo serializer is used
>> > Number of retainedJobs and retainedStages has been increased to check
>> them
>> > in the UI.
>> >
>> > What information regarding Spark Context would be of interest here?
>> >
>> > Regards,
>> > Raghava.
>> >
>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar 
>> wrote:
>> >
>> >> If the data file is same then it should have similar distribution of
>> >> keys.
>> >> Few queries-
>> >>
>> >> 1. Did you compare the number of partitions in both the cases?
>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> >> Program being submitted?
>> >>
>> >> Also, can you please share the details of Spark Context, Environment
>> and
>> >> Executors when you run via Scala program?
>> >>
>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> >> m.vijayaragh...@gmail.com> wrote:
>> >>
>> >>> Hello All,
>> >>>
>> >>> We are using HashPartitioner in the following way on a 3 node cluster
>> (1
>> >>> master and 2 worker nodes).
>> >>>
>> >>> val u =
>> >>> sc.textFile("hdfs://x.x.x.x:8020/us

Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Mike Hynes
A HashPartitioner will indeed partition based on the key, but you
cannot know on *which* node that key will appear. Again, the RDD
partitions will not necessarily be distributed evenly across your
nodes because of the greedy scheduling of the first wave of tasks,
particularly if those tasks have durations less than the initial
executor delay. I recommend you look at your logs to verify if this is
happening to you.

Mike

On 4/18/16, Anuj Kumar  wrote:
> Good point Mike +1
>
> On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> When submitting a job with spark-submit, I've observed delays (up to
>> 1--2 seconds) for the executors to respond to the driver in order to
>> receive tasks in the first stage. The delay does not persist once the
>> executors have been synchronized.
>>
>> When the tasks are very short, as may be your case (relatively small
>> data and a simple map task like you have described), the 8 tasks in
>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>> the second executor won't have responded to the master before the
>> first 4 tasks on the first executor have completed.
>>
>> To see if this is the cause in your particular case, you could try the
>> following to confirm:
>> 1. Examine the starting times of the tasks alongside their
>> executor
>> 2. Make a "dummy" stage execute before your real stages to
>> synchronize the executors by creating and materializing any random RDD
>> 3. Make the tasks longer, i.e. with some silly computational
>> work.
>>
>> Mike
>>
>>
>> On 4/17/16, Raghava Mutharaju  wrote:
>> > Yes its the same data.
>> >
>> > 1) The number of partitions are the same (8, which is an argument to
>> > the
>> > HashPartitioner). In the first case, these partitions are spread across
>> > both the worker nodes. In the second case, all the partitions are on
>> > the
>> > same node.
>> > 2) What resources would be of interest here? Scala shell takes the
>> default
>> > parameters since we use "bin/spark-shell --master " to run
>> the
>> > scala-shell. For the scala program, we do set some configuration
>> > options
>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>> > serializer.
>> >
>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>> > RAM.1 executor runs on each worker node. Following configuration
>> > options
>> > are set for the scala program -- perhaps we should move it to the spark
>> > config file.
>> >
>> > Driver memory and executor memory are set to 12GB
>> > parallelism is set to 8
>> > Kryo serializer is used
>> > Number of retainedJobs and retainedStages has been increased to check
>> them
>> > in the UI.
>> >
>> > What information regarding Spark Context would be of interest here?
>> >
>> > Regards,
>> > Raghava.
>> >
>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar 
>> > wrote:
>> >
>> >> If the data file is same then it should have similar distribution of
>> >> keys.
>> >> Few queries-
>> >>
>> >> 1. Did you compare the number of partitions in both the cases?
>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> >> Program being submitted?
>> >>
>> >> Also, can you please share the details of Spark Context, Environment
>> >> and
>> >> Executors when you run via Scala program?
>> >>
>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> >> m.vijayaragh...@gmail.com> wrote:
>> >>
>> >>> Hello All,
>> >>>
>> >>> We are using HashPartitioner in the following way on a 3 node cluster
>> (1
>> >>> master and 2 worker nodes).
>> >>>
>> >>> val u =
>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>> >>> (y.toInt,
>> >>> x.toInt) } }).partitionBy(new
>> HashPartitioner(8)).setName("u").persist()
>> >>>
>> >>> u.count()
>> >>>
>> >>> If we run this from the spark shell, the data (52 MB) is split across
>> >>> the
>> >>> two worker nodes. But if we put this in a scala program and run it,
>> then
>> >>> all the data goes to only one node. We have run it multiple times,
>> >>> but
>> >>> this
>> >>> behavior does not change. This seems strange.
>> >>>
>> >>> Is there some problem with the way we use HashPartitioner?
>> >>>
>> >>> Thanks in advance.
>> >>>
>> >>> Regards,
>> >>> Raghava.
>> >>>
>> >>
>> >>
>> >
>> >
>> > --
>> > Regards,
>> > Raghava
>> > http://raghavam.github.io
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>


-- 
Thanks,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Mike Hynes
When submitting a job with spark-submit, I've observed delays (up to
1--2 seconds) for the executors to respond to the driver in order to
receive tasks in the first stage. The delay does not persist once the
executors have been synchronized.

When the tasks are very short, as may be your case (relatively small
data and a simple map task like you have described), the 8 tasks in
your stage may be allocated to only 1 executor in 2 waves of 4, since
the second executor won't have responded to the master before the
first 4 tasks on the first executor have completed.

To see if this is the cause in your particular case, you could try the
following to confirm:
1. Examine the starting times of the tasks alongside their executor
2. Make a "dummy" stage execute before your real stages to
synchronize the executors by creating and materializing any random RDD
3. Make the tasks longer, i.e. with some silly computational work.

Mike


On 4/17/16, Raghava Mutharaju  wrote:
> Yes its the same data.
>
> 1) The number of partitions are the same (8, which is an argument to the
> HashPartitioner). In the first case, these partitions are spread across
> both the worker nodes. In the second case, all the partitions are on the
> same node.
> 2) What resources would be of interest here? Scala shell takes the default
> parameters since we use "bin/spark-shell --master " to run the
> scala-shell. For the scala program, we do set some configuration options
> such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> serializer.
>
> We are running this on Azure D3-v2 machines which have 4 cores and 14GB
> RAM.1 executor runs on each worker node. Following configuration options
> are set for the scala program -- perhaps we should move it to the spark
> config file.
>
> Driver memory and executor memory are set to 12GB
> parallelism is set to 8
> Kryo serializer is used
> Number of retainedJobs and retainedStages has been increased to check them
> in the UI.
>
> What information regarding Spark Context would be of interest here?
>
> Regards,
> Raghava.
>
> On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar  wrote:
>
>> If the data file is same then it should have similar distribution of
>> keys.
>> Few queries-
>>
>> 1. Did you compare the number of partitions in both the cases?
>> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> Program being submitted?
>>
>> Also, can you please share the details of Spark Context, Environment and
>> Executors when you run via Scala program?
>>
>> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> m.vijayaragh...@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> We are using HashPartitioner in the following way on a 3 node cluster (1
>>> master and 2 worker nodes).
>>>
>>> val u =
>>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
>>> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()
>>>
>>> u.count()
>>>
>>> If we run this from the spark shell, the data (52 MB) is split across
>>> the
>>> two worker nodes. But if we put this in a scala program and run it, then
>>> all the data goes to only one node. We have run it multiple times, but
>>> this
>>> behavior does not change. This seems strange.
>>>
>>> Is there some problem with the way we use HashPartitioner?
>>>
>>> Thanks in advance.
>>>
>>> Regards,
>>> Raghava.
>>>
>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>


-- 
Thanks,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD Partitions not distributed evenly to executors

2016-04-06 Thread Mike Hynes
mbalance arises due to
the greedy nature of the scheduler when data locality is not a factor?

2a. Why are the task times in plot 1 decreasing so dramatically, but
not in plot 2?
2b. Could the decrease in time be due to just-in-time compilation?
2c. If so, Why would the JIT occur only for the first case with many
partitions when the same amount of computational work is to be done in
both cases?

3. If an RDD is to be created in such a manner (i.e. initialized for,
say, an iterative algorithm, rather than by reading data from disk or
hdfs), what is the best practice to promote good load balancing? My
first idea would be to create the full RDD with 2x as many partitions
but then coalesce it down to half the number of partitions with the
shuffle flag set to true. Would that be reasonable?

Thank you very much for your time, and I very much hope that someone
from the dev community who is familiar with the scheduler may be able
to clarify the above observations and questions.

Thanks,
Mike

P.S. Koert Kuipers: neither spark-defaults.sh setting impacted the
observed behaviour, but thank you kindly for your suggestions.


On 4/5/16, Khaled Ammar  wrote:
> I have a similar experience.
>
> Using 32 machines, I can see than number of tasks (partitions) assigned to
> executors (machines) is not even. Moreover, the distribution change every
> stage (iteration).
>
> I wonder why Spark needs to move partitions around any way, should not the
> scheduler reduce network (and other IO) overhead by reducing such
> relocation.
>
> Thanks,
> -Khaled
>
>
>
>
> On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers  wrote:
>
>> can you try:
>> spark.shuffle.reduceLocality.enabled=false
>>
>> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> Dear all,
>>>
>>> Thank you for your responses.
>>>
>>> Michael Slavitch:
>>> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
>>> correctly propagated to all nodes?  Are they identical?
>>> Yes; these files are stored on a shared memory directory accessible to
>>> all nodes.
>>>
>>> Koert Kuipers:
>>> > we ran into similar issues and it seems related to the new memory
>>> > management. can you try:
>>> > spark.memory.useLegacyMode = true
>>> I reran the exact same code with a restarted cluster using this
>>> modification, and did not observe any difference. The partitioning is
>>> still imbalanced.
>>>
>>> Ted Yu:
>>> > If the changes can be ported over to 1.6.1, do you mind reproducing
>>> > the
>>> issue there ?
>>> Since the spark.memory.useLegacyMode setting did not impact my code
>>> execution, I will have to change the Spark dependency back to earlier
>>> versions to see if the issue persists and get back to you.
>>>
>>> Meanwhile, if anyone else has any other ideas or experience, please let
>>> me know.
>>>
>>> Mike
>>>
>>> On 4/4/16, Koert Kuipers  wrote:
>>> > we ran into similar issues and it seems related to the new memory
>>> > management. can you try:
>>> > spark.memory.useLegacyMode = true
>>> >
>>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote:
>>> >
>>> >> [ CC'ing dev list since nearly identical questions have occurred in
>>> >> user list recently w/o resolution;
>>> >> c.f.:
>>> >>
>>> >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>>> >>
>>> >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>>> >> ]
>>> >>
>>> >> Hello,
>>> >>
>>> >> In short, I'm reporting a problem concerning load imbalance of RDD
>>> >> partitions across a standalone cluster. Though there are 16 cores
>>> >> available per node, certain nodes will have >16 partitions, and some
>>> >> will correspondingly have <16 (and even 0).
>>> >>
>>> >> In more detail: I am running some scalability/performance tests for
>>> >> vector-type operations. The RDDs I'm considering are simple block
>>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>>> >> are generated with a fixed number of elements given by some multiple
>>> >> of the available cores, and subsequently hash-partitioned by their
>>>

Re: RDD Partitions not distributed evenly to executors

2016-04-04 Thread Mike Hynes
Dear all,

Thank you for your responses.

Michael Slavitch:
> Just to be sure:  Has spark-env.sh and spark-defaults.conf been correctly 
> propagated to all nodes?  Are they identical?
Yes; these files are stored on a shared memory directory accessible to
all nodes.

Koert Kuipers:
> we ran into similar issues and it seems related to the new memory
> management. can you try:
> spark.memory.useLegacyMode = true
I reran the exact same code with a restarted cluster using this
modification, and did not observe any difference. The partitioning is
still imbalanced.

Ted Yu:
> If the changes can be ported over to 1.6.1, do you mind reproducing the issue 
> there ?
Since the spark.memory.useLegacyMode setting did not impact my code
execution, I will have to change the Spark dependency back to earlier
versions to see if the issue persists and get back to you.

Meanwhile, if anyone else has any other ideas or experience, please let me know.

Mike

On 4/4/16, Koert Kuipers  wrote:
> we ran into similar issues and it seems related to the new memory
> management. can you try:
> spark.memory.useLegacyMode = true
>
> On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> [ CC'ing dev list since nearly identical questions have occurred in
>> user list recently w/o resolution;
>> c.f.:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>> ]
>>
>> Hello,
>>
>> In short, I'm reporting a problem concerning load imbalance of RDD
>> partitions across a standalone cluster. Though there are 16 cores
>> available per node, certain nodes will have >16 partitions, and some
>> will correspondingly have <16 (and even 0).
>>
>> In more detail: I am running some scalability/performance tests for
>> vector-type operations. The RDDs I'm considering are simple block
>> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>> are generated with a fixed number of elements given by some multiple
>> of the available cores, and subsequently hash-partitioned by their
>> integer block index.
>>
>> I have verified that the hash partitioning key distribution, as well
>> as the keys themselves, are both correct; the problem is truly that
>> the partitions are *not* evenly distributed across the nodes.
>>
>> For instance, here is a representative output for some stages and
>> tasks in an iterative program. This is a very simple test with 2
>> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>> examples stages from the stderr log are stages 7 and 9:
>> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
>> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>>
>> When counting the location of the partitions on the compute nodes from
>> the stderr logs, however, you can clearly see the imbalance. Examples
>> lines are:
>> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
>> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
>> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
>> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>>
>> Grep'ing the full set of above lines for each hostname, himrod-?,
>> shows the problem occurs in each stage. Below is the output, where the
>> number of partitions stored on each node is given alongside its
>> hostname as in (himrod-?,num_partitions):
>> Stage 7: (himrod-1,0) (himrod-2,64)
>> Stage 9: (himrod-1,16) (himrod-2,48)
>> Stage 12: (himrod-1,0) (himrod-2,64)
>> Stage 14: (himrod-1,16) (himrod-2,48)
>> The imbalance is also visible when the executor ID is used to count
>> the partitions operated on by executors.
>>
>> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
>> (but the modifications do not touch the scheduler, and are irrelevant
>> for these particular tests). Has something changed radically in 1.6+
>> that would make a previously (<=1.5) correct configuration go haywire?
>> Have new configuration settings been added of which I'm unaware that
>> could lead to this problem?
>>
>> Please let me know if others in the community have observed this, and
>> thank you for your time,
>> Mike
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Thanks,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RDD Partitions not distributed evenly to executors

2016-04-04 Thread Mike Hynes
[ CC'ing dev list since nearly identical questions have occurred in
user list recently w/o resolution;
c.f.:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
]

Hello,

In short, I'm reporting a problem concerning load imbalance of RDD
partitions across a standalone cluster. Though there are 16 cores
available per node, certain nodes will have >16 partitions, and some
will correspondingly have <16 (and even 0).

In more detail: I am running some scalability/performance tests for
vector-type operations. The RDDs I'm considering are simple block
vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
are generated with a fixed number of elements given by some multiple
of the available cores, and subsequently hash-partitioned by their
integer block index.

I have verified that the hash partitioning key distribution, as well
as the keys themselves, are both correct; the problem is truly that
the partitions are *not* evenly distributed across the nodes.

For instance, here is a representative output for some stages and
tasks in an iterative program. This is a very simple test with 2
nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
examples stages from the stderr log are stages 7 and 9:
7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639

When counting the location of the partitions on the compute nodes from
the stderr logs, however, you can clearly see the imbalance. Examples
lines are:
13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&

Grep'ing the full set of above lines for each hostname, himrod-?,
shows the problem occurs in each stage. Below is the output, where the
number of partitions stored on each node is given alongside its
hostname as in (himrod-?,num_partitions):
Stage 7: (himrod-1,0) (himrod-2,64)
Stage 9: (himrod-1,16) (himrod-2,48)
Stage 12: (himrod-1,0) (himrod-2,64)
Stage 14: (himrod-1,16) (himrod-2,48)
The imbalance is also visible when the executor ID is used to count
the partitions operated on by executors.

I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
(but the modifications do not touch the scheduler, and are irrelevant
for these particular tests). Has something changed radically in 1.6+
that would make a previously (<=1.5) correct configuration go haywire?
Have new configuration settings been added of which I'm unaware that
could lead to this problem?

Please let me know if others in the community have observed this, and
thank you for your time,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Metrics Framework?

2016-04-01 Thread Mike Sukmanowsky
Thanks Silvio, JIRA submitted
https://issues.apache.org/jira/browse/SPARK-14332.

On Fri, 25 Mar 2016 at 12:46 Silvio Fiorito 
wrote:

> Hi Mike,
>
> Sorry got swamped with work and didn’t get a chance to reply.
>
> I misunderstood what you were trying to do. I thought you were just
> looking to create custom metrics vs looking for the existing Hadoop Output
> Format counters.
>
> I’m not familiar enough with the Hadoop APIs but I think it would require
> a change to the SparkHadoopWriter
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala>
> class since it generates the JobContext which is required to read the
> counters. Then it could publish the counters to the Spark metrics system.
>
> I would suggest going ahead and submitting a JIRA request if there isn’t
> one already.
>
> Thanks,
> Silvio
>
> From: Mike Sukmanowsky 
> Date: Friday, March 25, 2016 at 10:48 AM
>
> To: Silvio Fiorito , "user@spark.apache.org"
> 
> Subject: Re: Spark Metrics Framework?
>
> Pinging again - any thoughts?
>
> On Wed, 23 Mar 2016 at 09:17 Mike Sukmanowsky 
> wrote:
>
>> Thanks Ted and Silvio. I think I'll need a bit more hand holding here,
>> sorry. The way we use ES Hadoop is in pyspark via
>> org.elasticsearch.hadoop.mr.EsOutputFormat in a saveAsNewAPIHadoopFile
>> call. Given the Hadoop interop, I wouldn't assume that the EsOutputFormat
>> class
>> <https://github.com/elastic/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java>
>> could be modified to define a new Source and register it via
>> MetricsSystem.createMetricsSystem. This feels like a good feature request
>> for Spark actually: "Support Hadoop Counters in Input/OutputFormats as
>> Spark metrics" but I wanted some feedback first to see if that makes sense.
>>
>> That said, some of the custom RDD classes
>> <https://github.com/elastic/elasticsearch-hadoop/tree/master/spark/core/main/scala/org/elasticsearch/spark/rdd>
>>  could
>> probably be modified to register a new Source when they perform
>> reading/writing from/to Elasticsearch.
>>
>> On Tue, 22 Mar 2016 at 15:17 Silvio Fiorito <
>> silvio.fior...@granturing.com> wrote:
>>
>>> Hi Mike,
>>>
>>> It’s been a while since I worked on a custom Source but I think all you
>>> need to do is make your Source in the org.apache.spark package.
>>>
>>> Thanks,
>>> Silvio
>>>
>>> From: Mike Sukmanowsky 
>>> Date: Tuesday, March 22, 2016 at 3:13 PM
>>> To: Silvio Fiorito , "
>>> user@spark.apache.org" 
>>> Subject: Re: Spark Metrics Framework?
>>>
>>> The Source class is private
>>> <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/source/Source.scala#L22-L25>
>>> to the spark package and any new Sources added to the metrics registry must
>>> be of type Source
>>> <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L144-L152>.
>>> So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1
>>> code, but the same is true in 1.6.1.
>>>
>>> On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito <
>>> silvio.fior...@granturing.com> wrote:
>>>
>>>> You could use the metric sources and sinks described here:
>>>> http://spark.apache.org/docs/latest/monitoring.html#metrics
>>>>
>>>> If you want to push the metrics to another system you can define a
>>>> custom sink. You can also extend the metrics by defining a custom source.
>>>>
>>>> From: Mike Sukmanowsky 
>>>> Date: Monday, March 21, 2016 at 11:54 AM
>>>> To: "user@spark.apache.org" 
>>>> Subject: Spark Metrics Framework?
>>>>
>>>> We make extensive use of the elasticsearch-hadoop library for
>>>> Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be
>>>> very handy to have access to some of the many metrics
>>>> <https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html>
>>>> that the library makes available when running in map reduce mode. The 
>>>> library's
>>>> author noted
>>>> <https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913>
>>>> that Spark doesn't offer any kind of a similar metrics API where by these
>>>> metrics could be reported or aggregated on.
>>>>
>>>> Are there any plans to bring a metrics framework similar to Hadoop's
>>>> Counter system to Spark or is there an alternative means for us to grab
>>>> metrics exposed when using Hadoop APIs to load/save RDDs?
>>>>
>>>> Thanks,
>>>> Mike
>>>>
>>>


Re: Spark Metrics Framework?

2016-03-25 Thread Mike Sukmanowsky
Pinging again - any thoughts?

On Wed, 23 Mar 2016 at 09:17 Mike Sukmanowsky 
wrote:

> Thanks Ted and Silvio. I think I'll need a bit more hand holding here,
> sorry. The way we use ES Hadoop is in pyspark via
> org.elasticsearch.hadoop.mr.EsOutputFormat in a saveAsNewAPIHadoopFile
> call. Given the Hadoop interop, I wouldn't assume that the EsOutputFormat
> class
> <https://github.com/elastic/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java>
> could be modified to define a new Source and register it via
> MetricsSystem.createMetricsSystem. This feels like a good feature request
> for Spark actually: "Support Hadoop Counters in Input/OutputFormats as
> Spark metrics" but I wanted some feedback first to see if that makes sense.
>
> That said, some of the custom RDD classes
> <https://github.com/elastic/elasticsearch-hadoop/tree/master/spark/core/main/scala/org/elasticsearch/spark/rdd>
>  could
> probably be modified to register a new Source when they perform
> reading/writing from/to Elasticsearch.
>
> On Tue, 22 Mar 2016 at 15:17 Silvio Fiorito 
> wrote:
>
>> Hi Mike,
>>
>> It’s been a while since I worked on a custom Source but I think all you
>> need to do is make your Source in the org.apache.spark package.
>>
>> Thanks,
>> Silvio
>>
>> From: Mike Sukmanowsky 
>> Date: Tuesday, March 22, 2016 at 3:13 PM
>> To: Silvio Fiorito , "
>> user@spark.apache.org" 
>> Subject: Re: Spark Metrics Framework?
>>
>> The Source class is private
>> <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/source/Source.scala#L22-L25>
>> to the spark package and any new Sources added to the metrics registry must
>> be of type Source
>> <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L144-L152>.
>> So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1
>> code, but the same is true in 1.6.1.
>>
>> On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito <
>> silvio.fior...@granturing.com> wrote:
>>
>>> You could use the metric sources and sinks described here:
>>> http://spark.apache.org/docs/latest/monitoring.html#metrics
>>>
>>> If you want to push the metrics to another system you can define a
>>> custom sink. You can also extend the metrics by defining a custom source.
>>>
>>> From: Mike Sukmanowsky 
>>> Date: Monday, March 21, 2016 at 11:54 AM
>>> To: "user@spark.apache.org" 
>>> Subject: Spark Metrics Framework?
>>>
>>> We make extensive use of the elasticsearch-hadoop library for
>>> Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be
>>> very handy to have access to some of the many metrics
>>> <https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html>
>>> that the library makes available when running in map reduce mode. The 
>>> library's
>>> author noted
>>> <https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913>
>>> that Spark doesn't offer any kind of a similar metrics API where by these
>>> metrics could be reported or aggregated on.
>>>
>>> Are there any plans to bring a metrics framework similar to Hadoop's
>>> Counter system to Spark or is there an alternative means for us to grab
>>> metrics exposed when using Hadoop APIs to load/save RDDs?
>>>
>>> Thanks,
>>> Mike
>>>
>>


Re: Spark Metrics Framework?

2016-03-23 Thread Mike Sukmanowsky
Thanks Ted and Silvio. I think I'll need a bit more hand holding here,
sorry. The way we use ES Hadoop is in pyspark via
org.elasticsearch.hadoop.mr.EsOutputFormat
https://github.com/elastic/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java>
in a saveAsNewAPIHadoopFile call. Given the Hadoop interop, I wouldn't
assume that the EsOutputFormat class
<https://github.com/elastic/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java>
could be modified to define a new Source and register it via
MetricsSystem.createMetricsSystem. This feels like a good feature request
for Spark actually: "Support Hadoop Counters in Input/OutputFormats as
Spark metrics" but I wanted some feedback first to see if that makes sense.

That said, some of the custom RDD classes
<https://github.com/elastic/elasticsearch-hadoop/tree/master/spark/core/main/scala/org/elasticsearch/spark/rdd>
could
probably be modified to register a new Source when they perform
reading/writing from/to Elasticsearch.

On Tue, 22 Mar 2016 at 15:17 Silvio Fiorito 
wrote:

> Hi Mike,
>
> It’s been a while since I worked on a custom Source but I think all you
> need to do is make your Source in the org.apache.spark package.
>
> Thanks,
> Silvio
>
> From: Mike Sukmanowsky 
> Date: Tuesday, March 22, 2016 at 3:13 PM
> To: Silvio Fiorito , "user@spark.apache.org"
> 
> Subject: Re: Spark Metrics Framework?
>
> The Source class is private
> <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/source/Source.scala#L22-L25>
> to the spark package and any new Sources added to the metrics registry must
> be of type Source
> <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L144-L152>.
> So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1
> code, but the same is true in 1.6.1.
>
> On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito 
> wrote:
>
>> You could use the metric sources and sinks described here:
>> http://spark.apache.org/docs/latest/monitoring.html#metrics
>>
>> If you want to push the metrics to another system you can define a custom
>> sink. You can also extend the metrics by defining a custom source.
>>
>> From: Mike Sukmanowsky 
>> Date: Monday, March 21, 2016 at 11:54 AM
>> To: "user@spark.apache.org" 
>> Subject: Spark Metrics Framework?
>>
>> We make extensive use of the elasticsearch-hadoop library for
>> Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be
>> very handy to have access to some of the many metrics
>> <https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html>
>> that the library makes available when running in map reduce mode. The 
>> library's
>> author noted
>> <https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913>
>> that Spark doesn't offer any kind of a similar metrics API where by these
>> metrics could be reported or aggregated on.
>>
>> Are there any plans to bring a metrics framework similar to Hadoop's
>> Counter system to Spark or is there an alternative means for us to grab
>> metrics exposed when using Hadoop APIs to load/save RDDs?
>>
>> Thanks,
>> Mike
>>
>


Re: Spark Metrics Framework?

2016-03-22 Thread Mike Sukmanowsky
The Source class is private
<https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/source/Source.scala#L22-L25>
to the spark package and any new Sources added to the metrics registry must
be of type Source
<https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L144-L152>.
So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1
code, but the same is true in 1.6.1.

On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito 
wrote:

> You could use the metric sources and sinks described here:
> http://spark.apache.org/docs/latest/monitoring.html#metrics
>
> If you want to push the metrics to another system you can define a custom
> sink. You can also extend the metrics by defining a custom source.
>
> From: Mike Sukmanowsky 
> Date: Monday, March 21, 2016 at 11:54 AM
> To: "user@spark.apache.org" 
> Subject: Spark Metrics Framework?
>
> We make extensive use of the elasticsearch-hadoop library for
> Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be
> very handy to have access to some of the many metrics
> <https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html>
> that the library makes available when running in map reduce mode. The 
> library's
> author noted
> <https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913>
> that Spark doesn't offer any kind of a similar metrics API where by these
> metrics could be reported or aggregated on.
>
> Are there any plans to bring a metrics framework similar to Hadoop's
> Counter system to Spark or is there an alternative means for us to grab
> metrics exposed when using Hadoop APIs to load/save RDDs?
>
> Thanks,
> Mike
>


Spark Metrics Framework?

2016-03-21 Thread Mike Sukmanowsky
We make extensive use of the elasticsearch-hadoop library for Hadoop/Spark.
In trying to troubleshoot our Spark applications, it'd be very handy to
have access to some of the many metrics
<https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html>
that the library makes available when running in map reduce mode. The library's
author noted
<https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913> that
Spark doesn't offer any kind of a similar metrics API where by these
metrics could be reported or aggregated on.

Are there any plans to bring a metrics framework similar to Hadoop's
Counter system to Spark or is there an alternative means for us to grab
metrics exposed when using Hadoop APIs to load/save RDDs?

Thanks,
Mike


Spark Job Server with Yarn and Kerberos

2016-01-04 Thread Mike Wright
Has anyone used Spark Job Server on a "kerberized" cluster in YARN-Client
mode? When Job Server contacts the YARN resource manager, we see a "Cannot
impersonate root" error and am not sure what we have misconfigured.

Thanks.

___

*Mike Wright*
Principal Architect, Software Engineering
S&P Capital IQ and SNL

434-951-7816 *p*
434-244-4466 *f*
540-470-0119 *m*

mwri...@snl.com


Re: Questions on Kerberos usage with YARN and JDBC

2015-12-13 Thread Mike Wright
Kerberos seems to be working otherwise ... for example, we're using it
successfully to control access to HDFS and it's linked to AD ... we're
using Ranger if that helps. I'm not a systems admin guy so this is really
not my area of expertise.


___

*Mike Wright*
Principal Architect, Software Engineering
S&P Capital IQ and SNL

434-951-7816 *p*
434-244-4466 *f*
540-470-0119 *m*

mwri...@snl.com



On Fri, Dec 11, 2015 at 4:36 PM, Todd Simmer  wrote:

> hey Mike,
>
> Are these part of an Active Directory Domain? If so are they pointed at
> the AD domain controllers that hosts the Kerberos server? Windows AD create
> SRV records in DNS to help windows clients find the Kerberos server for
> their domain. If you look you can see if you have a kdc record in Windows
> DNS and what it's pointing at. Can you do a
>
> kinit *username *
>
> on that host? It should tell you if it can find the KDC.
>
> Let me know if that's helpful at all.
>
> Todd
>
> On Fri, Dec 11, 2015 at 1:50 PM, Mike Wright  wrote:
>
>> As part of our implementation, we are utilizing a full "Kerberized"
>> cluster built on the Hortonworks suite. We're using Job Server as the front
>> end to initiate short-run jobs directly from our client-facing product
>> suite.
>>
>> 1) We believe we have configured the job server to start with the
>> appropriate credentials, specifying a principal and keytab. We switch to
>> YARN-CLIENT mode and can see Job Server attempt to connect to the resource
>> manager, and the result is that whatever the principal name is, it "cannot
>> impersonate root."  We have been unable to solve this.
>>
>> 2) We are primarily a Windows shop, hence our cluelessness here. That
>> said, we're using the JDBC driver version 4.2 and want to use JavaKerberos
>> authentication to connect to SQL Server. The queries performed by the job
>> are done in the driver, and hence would be running on the Job Server, which
>> we confirmed is running as the principal we have designated. However, when
>> attempting to connect with this option enabled I receive a "Unable to
>> obtain Principal Name for authentication" exception.
>>
>> Reading this:
>>
>> https://msdn.microsoft.com/en-us/library/ms378428.aspx
>>
>> We have Kerberos working on the machine and thus have krb5.conf setup
>> correctly. However the section, "
>> ​​
>> Enabling the Domain Configuration File and the Login Module Configuration
>> File" seems to indicate we've missed a step somewhere.
>>
>> Forgive my ignorance here ... I've been on Windows for 20 years and this
>> is all new to.
>>
>> Thanks for any guidance you can provide.
>>
>
>


Re: is Multiple Spark Contexts is supported in spark 1.5.0 ?

2015-12-11 Thread Mike Wright
Thanks for the insight!


___

*Mike Wright*
Principal Architect, Software Engineering
S&P Capital IQ and SNL

434-951-7816 *p*
434-244-4466 *f*
540-470-0119 *m*

mwri...@snl.com



On Fri, Dec 11, 2015 at 2:38 PM, Michael Armbrust 
wrote:

> The way that we do this is to have a single context with a server in front
> that multiplexes jobs that use that shared context.  Even if you aren't
> sharing data this is going to give you the best fine grained sharing of the
> resources that the context is managing.
>
> On Fri, Dec 11, 2015 at 10:55 AM, Mike Wright  wrote:
>
>> Somewhat related - What's the correct implementation when you have a
>> single cluster to support multiple jobs that are unrelated and NOT sharing
>> data? I was directed to figure out, via job server, to support "multiple
>> contexts" and explained that multiple contexts per JVM is not really
>> supported. So, via job server, how does one support multiple contexts in
>> DIFFERENT JVM's? I specify multiple contexts in the conf file and the
>> initialization of the subsequent contexts fail.
>>
>>
>>
>> On Fri, Dec 4, 2015 at 3:37 PM, Michael Armbrust 
>> wrote:
>>
>>> On Fri, Dec 4, 2015 at 11:24 AM, Anfernee Xu 
>>> wrote:
>>>
>>>> If multiple users are looking at the same data set, then it's good
>>>> choice to share the SparkContext.
>>>>
>>>> But my usercases are different, users are looking at different data(I
>>>> use custom Hadoop InputFormat to load data from my data source based on the
>>>> user input), the data might not have any overlap. For now I'm taking below
>>>> approach
>>>>
>>>
>>> Still if you want fine grained sharing of compute resources as well, you
>>> want to using single SparkContext.
>>>
>>
>>
>


Re: is Multiple Spark Contexts is supported in spark 1.5.0 ?

2015-12-11 Thread Mike Wright
Somewhat related - What's the correct implementation when you have a single
cluster to support multiple jobs that are unrelated and NOT sharing data? I
was directed to figure out, via job server, to support "multiple contexts"
and explained that multiple contexts per JVM is not really supported. So,
via job server, how does one support multiple contexts in DIFFERENT JVM's?
I specify multiple contexts in the conf file and the initialization of the
subsequent contexts fail.



On Fri, Dec 4, 2015 at 3:37 PM, Michael Armbrust 
wrote:

> On Fri, Dec 4, 2015 at 11:24 AM, Anfernee Xu 
> wrote:
>
>> If multiple users are looking at the same data set, then it's good choice
>> to share the SparkContext.
>>
>> But my usercases are different, users are looking at different data(I use
>> custom Hadoop InputFormat to load data from my data source based on the
>> user input), the data might not have any overlap. For now I'm taking below
>> approach
>>
>
> Still if you want fine grained sharing of compute resources as well, you
> want to using single SparkContext.
>


Questions on Kerberos usage with YARN and JDBC

2015-12-11 Thread Mike Wright
As part of our implementation, we are utilizing a full "Kerberized" cluster
built on the Hortonworks suite. We're using Job Server as the front end to
initiate short-run jobs directly from our client-facing product suite.

1) We believe we have configured the job server to start with the
appropriate credentials, specifying a principal and keytab. We switch to
YARN-CLIENT mode and can see Job Server attempt to connect to the resource
manager, and the result is that whatever the principal name is, it "cannot
impersonate root."  We have been unable to solve this.

2) We are primarily a Windows shop, hence our cluelessness here. That said,
we're using the JDBC driver version 4.2 and want to use JavaKerberos
authentication to connect to SQL Server. The queries performed by the job
are done in the driver, and hence would be running on the Job Server, which
we confirmed is running as the principal we have designated. However, when
attempting to connect with this option enabled I receive a "Unable to
obtain Principal Name for authentication" exception.

Reading this:

https://msdn.microsoft.com/en-us/library/ms378428.aspx

We have Kerberos working on the machine and thus have krb5.conf setup
correctly. However the section, "
​​
Enabling the Domain Configuration File and the Login Module Configuration
File" seems to indicate we've missed a step somewhere.

Forgive my ignorance here ... I've been on Windows for 20 years and this is
all new to.

Thanks for any guidance you can provide.


Re: New to Spark - Paritioning Question

2015-09-08 Thread Mike Wright
Thanks for the response!

Well, in retrospect each partition doesn't need to be restricted to a
single key. But, I cannot have values associated with a key span partitions
since they all need to be processed together for a key to facilitate
cumulative calcs. So provided an individual key has all its values in a
single partition, I'm OK.

Additionally, the values will be written to the database, and from what I
have read doing this at the partition level is the best compromise between
1) Writing the calculated values for each key (lots of connect/disconnects)
and collecting them all at the end and writing them all at once.

I am using a groupBy against the filtered RDD the get the grouping I want,
but apparently this may not be the most efficient way, and it seems that
everything is always in a single partition under this scenario.


___

*Mike Wright*
Principal Architect, Software Engineering

SNL Financial LC
434-951-7816 *p*
434-244-4466 *f*
540-470-0119 *m*

mwri...@snl.com

On Tue, Sep 8, 2015 at 5:38 PM, Richard Marscher 
wrote:

> That seems like it could work, although I don't think `partitionByKey` is
> a thing, at least for RDD. You might be able to merge step #2 and step #3
> into one step by using the `reduceByKey` function signature that takes in a
> Partitioner implementation.
>
> def reduceByKey(partitioner: Partitioner
> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/Partitioner.html>
> , func: (V, V) ⇒ V): RDD
> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html>
> [(K, V)]
>
> Merge the values for each key using an associative reduce function. This
> will also perform the merging locally on each mapper before sending results
> to a reducer, similarly to a "combiner" in MapReduce.
>
> The tricky part might be getting the partitioner to know about the number
> of partitions, which I think it needs to know upfront in `abstract def
> numPartitions: Int`. The `HashPartitioner` for example takes in the
> number as a constructor argument, maybe you could use that with an upper
> bound size if you don't mind empty partitions. Otherwise you might have to
> mess around to extract the exact number of keys if it's not readily
> available.
>
> Aside: what is the requirement to have each partition only contain the
> data related to one key?
>
> On Fri, Sep 4, 2015 at 11:06 AM, mmike87  wrote:
>
>> Hello, I am new to Apache Spark and this is my company's first Spark
>> project.
>> Essentially, we are calculating models dealing with Mining data using
>> Spark.
>>
>> I am holding all the source data in a persisted RDD that we will refresh
>> periodically. When a "scenario" is passed to the Spark job (we're using
>> Job
>> Server) the persisted RDD is filtered to the relevant mines. For example,
>> we
>> may want all mines in Chile and the 1990-2015 data for each.
>>
>> Many of the calculations are cumulative, that is when we apply user-input
>> "adjustment factors" to a value, we also need the "flexed" value we
>> calculated for that mine previously.
>>
>> To ensure that this works, the idea if to:
>>
>> 1) Filter the superset to relevant mines (done)
>> 2) Group the subset by the unique identifier for the mine. So, a group may
>> be all the rows for mine "A" for 1990-2015
>> 3) I then want to ensure that the RDD is partitioned by the Mine
>> Identifier
>> (and Integer).
>>
>> It's step 3 that is confusing me. I suspect it's very easy ... do I simply
>> use PartitionByKey?
>>
>> We're using Java if that makes any difference.
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/New-to-Spark-Paritioning-Question-tp24580.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>


Re: How to unit test HiveContext without OutOfMemoryError (using sbt)

2015-08-26 Thread Mike Trienis
Thanks for your response Yana,

I can increase the MaxPermSize parameter and it will allow me to run the
unit test a few more times before I run out of memory.

However, the primary issue is that running the same unit test in the same
JVM (multiple times) results in increased memory (each run of the unit
test) and I believe it has something to do with HiveContext not reclaiming
memory after it is finished (or I'm not shutting it down properly).

It could very well be related to sbt, however, it's not clear to me.


On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska 
wrote:

> The PermGen space error is controlled with MaxPermSize parameter. I run
> with this in my pom, I think copied pretty literally from Spark's own
> tests... I don't know what the sbt equivalent is but you should be able to
> pass it...possibly via SBT_OPTS?
>
>
>  
>   org.scalatest
>   scalatest-maven-plugin
>   1.0
>   
>
> ${project.build.directory}/surefire-reports
>   false
>   .
>   SparkTestSuite.txt
>   -Xmx3g -XX:MaxPermSize=256m
> -XX:ReservedCodeCacheSize=512m
>   
>   
>   true
>   1
>   false
>
> true
>   
>   
>   
>   
>   test
>   
>   test
>   
>   
>   
>   
>   
>
>
> On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis 
> wrote:
>
>> Hello,
>>
>> I am using sbt and created a unit test where I create a `HiveContext` and
>> execute some query and then return. Each time I run the unit test the JVM
>> will increase it's memory usage until I get the error:
>>
>> Internal error when running tests: java.lang.OutOfMemoryError: PermGen
>> space
>> Exception in thread "Thread-2" java.io.EOFException
>>
>> As a work-around, I can fork a new JVM each time I run the unit test,
>> however, it seems like a bad solution as takes a while to run the unit
>> test.
>>
>> By the way, I tried to importing the TestHiveContext:
>>
>>- import org.apache.spark.sql.hive.test.TestHiveContext
>>
>> However, it suffers from the same memory issue. Has anyone else suffered
>> from the same problem? Note that I am running these unit tests on my mac.
>>
>> Cheers, Mike.
>>
>>
>


How to unit test HiveContext without OutOfMemoryError (using sbt)

2015-08-25 Thread Mike Trienis
Hello,

I am using sbt and created a unit test where I create a `HiveContext` and
execute some query and then return. Each time I run the unit test the JVM
will increase it's memory usage until I get the error:

Internal error when running tests: java.lang.OutOfMemoryError: PermGen space
Exception in thread "Thread-2" java.io.EOFException

As a work-around, I can fork a new JVM each time I run the unit test,
however, it seems like a bad solution as takes a while to run the unit
test.

By the way, I tried to importing the TestHiveContext:

   - import org.apache.spark.sql.hive.test.TestHiveContext

However, it suffers from the same memory issue. Has anyone else suffered
from the same problem? Note that I am running these unit tests on my mac.

Cheers, Mike.


Spark SQL window functions (RowsBetween)

2015-08-20 Thread Mike Trienis
Hi All,

I would like some clarification regarding window functions for Apache Spark
1.4.0

   -
   
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

In particular, the "rowsBetween"

   * {{{
   *   val w = Window.partitionBy("name").orderBy("id")
   *   df.select(
   * sum("price").over(w.rangeBetween(Long.MinValue, 2)),
   * avg("price").over(w.rowsBetween(0, 4))
   *   )
   * }}}


Are any of the window functions available without a hive context? If the
answer is no, then is there any other way to accomplish this without using
hive?

I need to compare the the i[th] row with the [i-1]th row of col2 (sorted by
col1). If item_i of the i[th] row and the item_[i-1] of the [i-1]th row are
different then I need to increment the count of item_[i-1] by 1.


col1| col2
--
1| item_1
2| item_1
3| item_2
4| item_1
5| item_2
6| item_1

In the above example, if we scan two rows at a time downwards,  we see that
row 2 and row 3 are different therefore we add one to item_1. Next, we see
that row 3 is different from row 4, then add one to item_2. Continue until
we end up with:

 col2  | col3
---
 item_1  | 2
 item_2  | 2

Thanks, Mike.


PySpark concurrent jobs using single SparkContext

2015-08-20 Thread Mike Sukmanowsky
Hi all,

We're using Spark 1.3.0 via a small YARN cluster to do some log processing.
The jobs are pretty simple, for a number of customers and a number of days,
fetch some event log data, build aggregates and store those aggregates into
a data store.

The way our script is written right now does something akin to:

with SparkContext() as sc:
for customer in customers:
for day in days:
logs = sc.textFile(get_logs(customer, day))
aggregate = make_aggregate(logs)
# This function contains the action saveAsNewAPIHadoopFile which
# triggers a save
save_aggregate(aggregate)

​
So we have a Spark job per customer, per day.

I tried doing some parallel job submission with something similar to:

def make_and_save_aggregate(customer, day, spark_context):
# Without a separate threading.Lock() here or better yet, one guarding the
# Spark context, multiple customer/day transformations and actions could
# be interweaved
sc = spark_context
logs = sc.textFile(get_logs(customer, day))
aggregate = make_aggregate(logs)
save_aggregate(aggregate)
with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor:
for customer in customers:
for day in days:
executor.submit(make_and_save_aggregate, customer, day, sc)

​
The problem is, with no locks on a SparkContext except during initialization
<https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L214-L241>
and
shutdown
<https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L296-L307>,
operations on the context could (if I understand correctly) be interweaved
leading to DAG which contains transformations out of order and from
different customer, day periods.

One solution is instead to launch multiple Spark jobs via spark-submit and
let YARN/Spark's dynamic executor allocation take care of fair scheduling.
In practice, this doesn't seem to yield very fast computation perhaps due
to some additional overhead with YARN.

Is there any safe way to launch concurrent jobs like this using a single
PySpark context?

-- 
Mike Sukmanowsky
Aspiring Digital Carpenter

*e*: mike.sukmanow...@gmail.com

LinkedIn <http://www.linkedin.com/profile/view?id=10897143> | github
<https://github.com/msukmanowsky>


Optimal way to implement a small lookup table for identifiers in an RDD

2015-08-10 Thread Mike Trienis
Hi All,

I have an RDD of case class objects.

scala> case class Entity(
 | value: String,
 | identifier: String
 | )
defined class Entity

scala> Entity("hello", "id1")
res25: Entity = Entity(hello,id1)

During a map operation, I'd like to return a new RDD that contains all of
the data of the original RDD with the addition of new data that was looked
up based on the identifiers provided.

The lookup table table in Cassandra looks something like...

id|   type
-+-
id1 |  action
id2 |  view

The end result would be an RDD of EntityExtended

case class EntityExtended(
value: String,
identifier: String
type: String
)

I believe that it would make sense to use a broadcast variable. However,
I'm not sure what the best way would be to incorporate it during a map
operation.

rdd.map(MyObject.extendEntity)

object MyObject {
   def extendEntity(entity: Entity): EntityExtended = {
   val id = entity.identifier

   // lookup identifier in broadcast variable?
   }
}

Thanks, Mike.


control the number of reducers for groupby in data frame

2015-08-04 Thread Fang, Mike
Hi,

Does anyone know how I could control the number of reducer when we do operation 
such as groupie For data frame?
I could set spark.sql.shuffle.partitions in sql but not sure how to do in 
df.groupBy("XX") api.

Thanks,
Mike


streamingContext.stop(true,true) doesn't end the job

2015-07-29 Thread mike
Hi all,

I have a streaming job which reads messages from kafka via directStream, 
transforms it and writes it out to Cassandra.
Also I use a large broadcast variable (~300MB)

I tried to implement a stopping mechanism for this job , something like this
:


When I test it in local mode, the job doesn't finish.
All I see in the log is this : 


Can someone point me , to what I'm doing wrong ?
Thanks,
Mike 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streamingContext-stop-true-true-doesn-t-end-the-job-tp24064.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Data frames select and where clause dependency

2015-07-20 Thread Mike Trienis
Definitely, thanks Mohammed.

On Mon, Jul 20, 2015 at 5:47 PM, Mohammed Guller 
wrote:

>  Thanks, Harish.
>
>
>
> Mike – this would be a cleaner version for your use case:
>
> df.filter(df("filter_field") === "value").select("field1").show()
>
>
>
> Mohammed
>
>
>
> *From:* Harish Butani [mailto:rhbutani.sp...@gmail.com]
> *Sent:* Monday, July 20, 2015 5:37 PM
> *To:* Mohammed Guller
> *Cc:* Michael Armbrust; Mike Trienis; user@spark.apache.org
>
> *Subject:* Re: Data frames select and where clause dependency
>
>
>
> Yes via:  org.apache.spark.sql.catalyst.optimizer.ColumnPruning
>
> See DefaultOptimizer.batches for list of logical rewrites.
>
>
>
> You can see the optimized plan by printing: df.queryExecution.optimizedPlan
>
>
>
> On Mon, Jul 20, 2015 at 5:22 PM, Mohammed Guller 
> wrote:
>
> Michael,
>
> How would the Catalyst optimizer optimize this version?
>
> df.filter(df("filter_field") === "value").select("field1").show()
>
> Would it still read all the columns in df or would it read only
> “filter_field” and “field1” since only two columns are used (assuming other
> columns from df are not used anywhere else)?
>
>
>
> Mohammed
>
>
>
> *From:* Michael Armbrust [mailto:mich...@databricks.com]
> *Sent:* Friday, July 17, 2015 1:39 PM
> *To:* Mike Trienis
> *Cc:* user@spark.apache.org
> *Subject:* Re: Data frames select and where clause dependency
>
>
>
> Each operation on a dataframe is completely independent and doesn't know
> what operations happened before it.  When you do a selection, you are
> removing other columns from the dataframe and so the filter has nothing to
> operate on.
>
>
>
> On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis 
> wrote:
>
> I'd like to understand why the where field must exist in the select
> clause.
>
>
>
> For example, the following select statement works fine
>
>- df.select("field1", "filter_field").filter(df("filter_field") ===
>"value").show()
>
>  However, the next one fails with the error "in operator !Filter
> (filter_field#60 = value);"
>
>- df.select("field1").filter(df("filter_field") === "value").show()
>
>  As a work-around, it seems that I can do the following
>
>- df.select("field1", "filter_field").filter(df("filter_field") ===
>"value").drop("filter_field").show()
>
>
>
> Thanks, Mike.
>
>
>
>
>


[General Question] [Hadoop + Spark at scale] Spark Rack Awareness ?

2015-07-18 Thread Mike Frampton
I wanted to ask a general question about Hadoop/Yarn and Apache Spark 
integration. I know that 
Hadoop on a physical cluster has rack awareness. i.e. It attempts to minimise 
network traffic 
by saving replicated blocks within a rack. i.e. 

I wondered whether, when Spark is configured to use Yarn as a cluster manager, 
it is able to 
use this feature to also minimise network traffic to a degree. 

Sorry if this questionn is not quite accurate but I think you can generally see 
what I mean ? 
  

Data frames select and where clause dependency

2015-07-17 Thread Mike Trienis
I'd like to understand why the where field must exist in the select clause.

For example, the following select statement works fine

   - df.select("field1", "filter_field").filter(df("filter_field") ===
   "value").show()

However, the next one fails with the error "in operator !Filter
(filter_field#60 = value);"

   - df.select("field1").filter(df("filter_field") === "value").show()

As a work-around, it seems that I can do the following

   - df.select("field1", "filter_field").filter(df("filter_field") ===
   "value").drop("filter_field").show()


Thanks, Mike.


Splitting dataframe using Spark 1.4 for nested json input

2015-07-04 Thread Mike Tracy
Hello, 

I am having issues with splitting contents of a dataframe column using Spark
1.4. The dataframe was created by reading a nested complex json file. I used
df.explode but keep getting error message.

scala> val df = sqlContext.read.json("/Users/xx/target/statsfile.json")
scala> df.show() 
+++
|  mi|neid|
+++
|[900,["pmEs","pmS...|[SubNetwork=ONRM_...|
|[900,["pmIcmpInEr...|[SubNetwork=ONRM_...|
|[900,pmUnsuccessf...|[SubNetwork=ONRM_...|

+++
scala> df.printSchema()
root 
 |-- mi: struct (nullable = true)
 ||-- gp: long (nullable = true)
 ||-- mt: string (nullable = true)
 ||-- mts: string (nullable = true)
 ||-- mv: string (nullable = true)
 |-- neid: struct (nullable = true)
 ||-- nedn: string (nullable = true)
 ||-- nesw: string (nullable = true)
 ||-- neun: string (nullable = true)

scala> val df1=df.select("mi.mv²)
df1: org.apache.spark.sql.DataFrame = [mv: string]

scala> val df1=df.select("mi.mv").show()
++
|  mv|
++
|[{"r":[0,0,0],"mo...|
|{"r":[0,4,0,4],"m...|
|{"r":5,"moid":"Ma...|

++


scala> df1.explode("mv","mvnew")(mv => mv.split(","))

:28: error: value split is not a member of Nothing
df1.explode("mv","mvnew")(mv => mv.split(","))

The json file format looks like

[   
{   
"neid":{  },
"mi":{   
"mts":"20100609071500Z",
"gp":"900",
"tMOID":"Aal2Ap",
"mt":[  ],
"mv":[ 
{  
   
"moid":"ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1552q",
"r":
   [ ŠŠ.]
}, 
{ 
   
"moid":"ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1556q",
"r":
   [ ŠŠ.]
} 
] 
} 
}
] 


Am i doing something wrong? I need to extract data under mi.mv in separate
columns so i can apply some transformations.

Regards

Mike




Error with splitting contents of a dataframe column using Spark 1.4 for nested complex json file

2015-07-01 Thread Mike Tracy
Hello, 

I am having issues with splitting contents of a dataframe column using Spark
1.4. The dataframe was created by reading a nested complex json file. I used
df.explode but keep getting error message. The json file format looks like

[   
{   
"neid":{  },
"mi":{   
"mts":"20100609071500Z",
"gp":"900",
"tMOID":"Aal2Ap",
"mt":[  ],
"mv":[ 
{  
   
"moid":"ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1552q",
"r":
[
 1,
 2,
 5
 ]
}, 
{ 
   
"moid":"ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1542q",
"r":
[
 1,
 2,
 5
 ]
 } 
] 
} 
}, 
{   
"neid":{   
"neun":"RC003",

"nedn":"SubNetwork=ONRM_RootMo_R,SubNetwork=RC003,MeContext=RC003",
"nesw":"CP90831_R9YC/11"
}, 
"mi":{   
"mts":"20100609071500Z",
"gp":"900",
"tMOID":"PlugInUnit",
"mt":"pmProcessorLoad",
"mv":[ 
{  
   
"moid":"ManagedElement=1,Equipment=1,Subrack=MS,Slot=6,PlugInUnit=1",
   "r":
 [ 1, 2, 5
 ]
}, 
{  
   
"moid":"ManagedElement=1,Equipment=1,Subrack=ES-1,Slot=1,PlugInUnit=1",
   "r":
  [ 1, 2, 5
 ]
} 
] 
} 
} 
] 


scala> val df = sqlContext.read.json("/Users/xx/target/statsfile.json")

scala> df.show() 
+++
|  mi|neid|
+++
|[900,["pmEs","pmS...|[SubNetwork=ONRM_...|
|[900,["pmIcmpInEr...|[SubNetwork=ONRM_...|
|[900,pmUnsuccessf...|[SubNetwork=ONRM_...|
|[900,["pmBwErrBlo...|[SubNetwork=ONRM_...|
|[900,["pmSctpStat...|[SubNetwork=ONRM_...|
|[900,["pmLinkInSe...|[SubNetwork=ONRM_...|
|[900,["pmGrFc","p...|[SubNetwork=ONRM_...|
|[900,["pmReceived...|[SubNetwork=ONRM_...|
|[900,["pmIvIma","...|[SubNetwork=ONRM_...|
|[900,["pmEs","pmS...|[SubNetwork=ONRM_...|
|[900,["pmEs","pmS...|[SubNetwork=ONRM_...|
|[900,["pmExisOrig...|[SubNetwork=ONRM_...|
|[900,["pmHDelayVa...|[SubNetwork=ONRM_...|
|[900,["pmReceived...|[SubNetwork=ONRM_...|
|[900,["pmReceived...|[SubNetwork=ONRM_...|
|[900,["pmAverageR...|[SubNetwork=ONRM_...|
|[900,["pmDchFrame...|[SubNetwork=ONRM_...|
|[900,["pmReceived...|[SubNetwork=ONRM_...|
|[900,["pmNegative...|[SubNetwork=ONRM_...|
|[900,["pmUsedTbsQ...|[SubNetwork=ONRM_...|
+++
scala> df.printSchema()
root 
 |-- mi: struct (nullable = true)
 ||-- gp: long (nullable = true)
 ||-- mt: string (nullable = true)
 ||-- mts: string (nullable = true)
 ||-- mv: string (nullable = true)
 |-- neid: struct (nullable = true)
 ||-- nedn: string (nullable = true)
 ||-- nesw: string (nullable = true)
 ||-- neun: string (nullable = true)

scala> val df1=df.select("mi.mv²)
df1: org.apache.spark.sql.DataFrame = [mv: string]

scala> val df1=df.select("mi.mv").show()
++
|  mv|
++
|[{"r":[0,0,0],"mo...|
|{"r":[0,4,0,4],"m...|
|{"r":5,"moid":"Ma...|
|[{"r":[2147483647...|
|{"r":[225,1112986...|
|[{"r":[83250,0,0,...|
|[{"r":[1,2,529982...|
|[{"r":[26998564,0...|
|[{"r":[0,0,0,0,0,...|
|[{"r":[0,0,0],"mo...|
|[{"r":[0,0,0],"mo...|
|{"r":[0,0,0,0,0,0...|
|{"r":[0,0,1],"moi...|
|{"r":[4587,4587],...|
|[{"r":[180,180],"...|
|[{"r":["0,0,0,0,0...|
|{"r":[0,35101,0,0...|
|[{"r":["0,0,0,0,0...|
|[{"r":[0,1558],"m...|
|[{"r":["7484,4870...|
++


scala> df1.explode("mv","mvnew")(mv => mv.split(","))

:28: error: value split is not a member of Nothing
df1.explode("mv","mvnew")(mv => mv.split(","))

Am i doing something wrong? I need to extract data under mi.mv in separate
columns so i can apply some transformations.

Regards

Mike




[Spark 1.3.1] Spark HiveQL -> CDH 5.3 Hive 0.13 UDF's

2015-06-26 Thread Mike Frampton
Hi 

I have a five node CDH 5.3 cluster running on CentOS 6.5, I also have a 
separate 
install of Spark 1.3.1. ( The CDH 5.3 install has Spark 1.2 but I wanted a 
newer version. )

I managed to write some Scala based code using a Hive Context to connect to 
Hive and 
create/populate  tables etc. I compiled my application using sbt and ran it 
with spark-submit
in local mode. 

My question concerns UDF's, specifically the function row_sequence function in 
the hive-contrib 
jar file i.e.  

hiveContext.sql("""

ADD JAR 
/opt/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/jars/hive-contrib-0.13.1-cdh5.3.3.jar

  """)

hiveContext.sql("""

CREATE TEMPORARY FUNCTION row_sequence as 
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';

  """)


 val resRDD = hiveContext.sql("""

  SELECT row_sequence(),t1.edu FROM
( SELECT DISTINCT education AS edu FROM adult3 ) t1
  ORDER BY t1.edu

""")

This seems to generate its sequence in the map (?) phase of execution because 
no matter how I fiddle 
with the main SQL I could not get an ascending index for dimension data. i.e. I 
always get 

1  val1
1  val2
1  val3

instead of 

1  val1
2  val2
3  val3

Im well aware that I can play with scala and get around this issue and I have 
but I wondered whether others
have come across this and solved it ? 

cheers

Mike F

Aggregating metrics using Cassandra and Spark streaming

2015-06-24 Thread Mike Trienis
Hello,

I'd like to understand how other people have been aggregating metrics
using Spark Streaming and Cassandra database. Currently I have design
some data models that will stored the rolled up metrics. There are two
models that I am considering:

CREATE TABLE rollup_using_counters (
metric_1 text,
metric_1_value counter
);

The model above is nice because I only need to write and execute a
single query when updating the counter value. The problem is that I
need these counter values to be fairly accurate and based on some
discussions from the Cassandra folks it sounds like there is some
potential for over and under counting if the database is under load.

CREATE TABLE rollup_using_update (
metric_1 text,
metric_1_value int
);

This model on the other hand will require the metric values to be
updated and therefore the operation is idempotent. The problem is that
I will need to read the metrics into the Spark streaming application
and perform the addition prior to the writing the result to Cassandra.
I believe that ensures that the metrics are accurate but I believe it
also introduces a lot of complexity and possibly latency into my Spark
streaming application.

Has any one else run into this problem before and how did you solve it?

Thanks, Mike.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: understanding on the "waiting batches" and "scheduling delay" in Streaming UI

2015-06-22 Thread Fang, Mike
Hi Das,

Thanks for your reply. Somehow I missed it..
I am using Spark 1.3. The data source is from kafka.
Yeah, not sure why the delay is 0. I'll run against 1.4 and give a screenshot.

Thanks,
Mike

From: Akhil Das mailto:ak...@sigmoidanalytics.com>>
Date: Thursday, June 18, 2015 at 6:05 PM
To: Mike Fang mailto:chyfan...@gmail.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: understanding on the "waiting batches" and "scheduling delay" in 
Streaming UI

Which version of spark? and what is your data source? For some reason, your 
processing delay is exceeding the batch duration. And its strange that you are 
not seeing any scheduling delay.

Thanks
Best Regards

On Thu, Jun 18, 2015 at 7:29 AM, Mike Fang 
mailto:chyfan...@gmail.com>> wrote:
Hi,

I have a spark streaming program running for ~ 25hrs. When I check the 
Streaming UI tab. I found the "Waiting batches" is 144. But the "scheduling 
delay" is 0. I am a bit confused.
If the "waiting batches" is 144, that means many batches are waiting in the 
queue to be processed? If this is the case, the scheduling delay should be high 
rather than 0. Am I missing anything?

Thanks,
Mike



[Spark 1.3.1 SQL] Using Hive

2015-06-21 Thread Mike Frampton
Hi 

Is it true that if I want to use Spark SQL ( for Spark 1.3.1 ) against Apache 
Hive I need to build a source version of Spark ? 

Im using CDH 5.3 on CentOS Linux 6.5 which uses Hive 0.13.0 ( I think ). 

cheers

Mike F

  

understanding on the "waiting batches" and "scheduling delay" in Streaming UI

2015-06-17 Thread Mike Fang
Hi,



I have a spark streaming program running for ~ 25hrs. When I check the
Streaming UI tab. I found the “Waiting batches” is 144. But the “scheduling
delay” is 0. I am a bit confused.

If the “waiting batches” is 144, that means many batches are waiting in the
queue to be processed? If this is the case, the scheduling delay should be
high rather than 0. Am I missing anything?



Thanks,

Mike


questions on the "waiting batches" and "scheduling delay" in Streaming UI

2015-06-16 Thread Fang, Mike
Hi,

I have a spark streaming program running for ~ 25hrs. When I check the 
Streaming UI tab. I found the "Waiting batches" is 144. But the "scheduling 
delay" is 0. I am a bit confused.
If the "waiting batches" is 144, that means many batches are waiting in the 
queue to be processed? If this is the case, the scheduling delay should be high 
rather than 0. Am I missing anything?

Thanks,
Mike



spark stream twitter question ..

2015-06-13 Thread Mike Frampton
Hi 

I have a question about Spark Twitter stream processing in Spark 1.3.1, the 
code sample below just opens 
up a twitter stream, uses auth keys, splits out has tags and creates a temp 
table. However, when I try to compile
it using sbt ( CentOS 6.5) I get the error 

[error] /home/hadoop/spark/twitter1/src/main/scala/twitter1.scala:54: value 
toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error]   val dfHashTags = rdd.map(hashT => Row(hashT) ).toDF()

I know that I need to "import sqlContext.implicits._" which is what Ive tried 
but I still get the error. Can anyone advise ? 

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType,StructField,StringType}


object twitter1 {

  def main(args: Array[String]) {

// create  a spark conf and context

val appName = "Twitter example 1"
val conf= new SparkConf()

conf.setAppName(appName)

val sc = new SparkContext(conf)

// set twitter auth key values

val consumerKey = "QQxxx"
val consumerSecret = "0HFxxx"
val accessToken   = "32394xxx"
val accessTokenSecret = "IlQvscxxx"

// set twitter auth properties
//   https://apps.twitter.com/

System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

val ssc= new StreamingContext(sc, Seconds(5) )
val stream = TwitterUtils.createStream(ssc,None)

val hashTags = stream.flatMap( status => status.getText.split(" 
").filter(_.startsWith("#")))

  //  val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._

hashTags.foreachRDD{ rdd =>
  val dfHashTags = rdd.map(hashT => Row(hashT) ).toDF()
  dfHashTags.registerTempTable("tweets")
}

// extra stuff here

ssc.start()
ssc.awaitTermination()

  } // end main

} // end twitter1

cheers

Mike F

  

Re: Managing spark processes via supervisord

2015-06-05 Thread Mike Trienis
Thanks Ignor,

I managed to find a fairly simple solution. It seems that the shell scripts
(e.g. .start-master.sh, start-slave.sh) end up executing /bin/spark-class
which is always run in the foreground.

Here is a solution I provided on stackoverflow:

   -
   
http://stackoverflow.com/questions/30672648/how-to-autostart-an-apache-spark-cluster-using-supervisord/30676844#30676844


Cheers Mike


On Wed, Jun 3, 2015 at 12:29 PM, Igor Berman  wrote:

> assuming you are talking about standalone cluster
> imho, with workers you won't get any problems and it's straightforward
> since they are usually foreground processes
> with master it's a bit more complicated, ./sbin/start-master.sh goes
> background which is not good for supervisor, but anyway I think it's
> doable(going to setup it too in a few days)
>
> On 3 June 2015 at 21:46, Mike Trienis  wrote:
>
>> Hi All,
>>
>> I am curious to know if anyone has successfully deployed a spark cluster
>> using supervisord?
>>
>>- http://supervisord.org/
>>
>> Currently I am using the cluster launch scripts which are working
>> greater, however, every time I reboot my VM or development environment I
>> need to re-launch the cluster.
>>
>> I am considering using supervisord to control all the processes (worker,
>> master, ect.. ) in order to have the cluster up an running after boot-up;
>> although I'd like to understand if it will cause more issues than it
>> solves.
>>
>> Thanks, Mike.
>>
>>
>


Managing spark processes via supervisord

2015-06-03 Thread Mike Trienis
Hi All,

I am curious to know if anyone has successfully deployed a spark cluster
using supervisord?

   - http://supervisord.org/

Currently I am using the cluster launch scripts which are working greater,
however, every time I reboot my VM or development environment I need to
re-launch the cluster.

I am considering using supervisord to control all the processes (worker,
master, ect.. ) in order to have the cluster up an running after boot-up;
although I'd like to understand if it will cause more issues than it
solves.

Thanks, Mike.


Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-23 Thread Mike Trienis
Yup, and since I have only one core per executor it explains why there was
only one executor utilized. I'll need to investigate which EC2 instance
type is going to be the best fit.

Thanks Evo.

On Fri, May 22, 2015 at 3:47 PM, Evo Eftimov  wrote:

> A receiver occupies a cpu core, an executor is simply a jvm instance and
> as such it can be granted any number of cores and ram
>
> So check how many cores you have per executor
>
>
> Sent from Samsung Mobile
>
>
>  Original message 
> From: Mike Trienis
> Date:2015/05/22 21:51 (GMT+00:00)
> To: user@spark.apache.org
> Subject: Re: Spark Streaming: all tasks running on one executor (Kinesis +
> Mongodb)
>
> I guess each receiver occupies a executor. So there was only one executor
> available for processing the job.
>
> On Fri, May 22, 2015 at 1:24 PM, Mike Trienis 
> wrote:
>
>> Hi All,
>>
>> I have cluster of four nodes (three workers and one master, with one core
>> each) which consumes data from Kinesis at 15 second intervals using two
>> streams (i.e. receivers). The job simply grabs the latest batch and pushes
>> it to MongoDB. I believe that the problem is that all tasks are executed on
>> a single worker node and never distributed to the others. This is true even
>> after I set the number of concurrentJobs to 3. Overall, I would really like
>> to increase throughput (i.e. more than 500 records / second) and understand
>> why all executors are not being utilized.
>>
>> Here are some parameters I have set:
>>
>>-
>>- spark.streaming.blockInterval   200
>>- spark.locality.wait 500
>>- spark.streaming.concurrentJobs  3
>>
>> This is the code that's actually doing the writing:
>>
>> def write(rdd: RDD[Data], time:Time) : Unit = {
>> val result = doSomething(rdd, time)
>> result.foreachPartition { i =>
>>     i.foreach(record => connection.insert(record))
>> }
>> }
>>
>> def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
>> rdd.flatMap(MyObject)
>> }
>>
>> Any ideas as to how to improve the throughput?
>>
>> Thanks, Mike.
>>
>
>


Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-22 Thread Mike Trienis
I guess each receiver occupies a executor. So there was only one executor
available for processing the job.

On Fri, May 22, 2015 at 1:24 PM, Mike Trienis 
wrote:

> Hi All,
>
> I have cluster of four nodes (three workers and one master, with one core
> each) which consumes data from Kinesis at 15 second intervals using two
> streams (i.e. receivers). The job simply grabs the latest batch and pushes
> it to MongoDB. I believe that the problem is that all tasks are executed on
> a single worker node and never distributed to the others. This is true even
> after I set the number of concurrentJobs to 3. Overall, I would really like
> to increase throughput (i.e. more than 500 records / second) and understand
> why all executors are not being utilized.
>
> Here are some parameters I have set:
>
>-
>- spark.streaming.blockInterval   200
>- spark.locality.wait 500
>- spark.streaming.concurrentJobs  3
>
> This is the code that's actually doing the writing:
>
> def write(rdd: RDD[Data], time:Time) : Unit = {
> val result = doSomething(rdd, time)
> result.foreachPartition { i =>
> i.foreach(record => connection.insert(record))
> }
> }
>
> def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
> rdd.flatMap(MyObject)
> }
>
> Any ideas as to how to improve the throughput?
>
> Thanks, Mike.
>


Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-22 Thread Mike Trienis
Hi All,

I have cluster of four nodes (three workers and one master, with one core
each) which consumes data from Kinesis at 15 second intervals using two
streams (i.e. receivers). The job simply grabs the latest batch and pushes
it to MongoDB. I believe that the problem is that all tasks are executed on
a single worker node and never distributed to the others. This is true even
after I set the number of concurrentJobs to 3. Overall, I would really like
to increase throughput (i.e. more than 500 records / second) and understand
why all executors are not being utilized.

Here are some parameters I have set:

   -
   - spark.streaming.blockInterval   200
   - spark.locality.wait 500
   - spark.streaming.concurrentJobs  3

This is the code that's actually doing the writing:

def write(rdd: RDD[Data], time:Time) : Unit = {
val result = doSomething(rdd, time)
result.foreachPartition { i =>
i.foreach(record => connection.insert(record))
}
}

def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
rdd.flatMap(MyObject)
}

Any ideas as to how to improve the throughput?

Thanks, Mike.


Spark sql and csv data processing question

2015-05-15 Thread Mike Frampton
Hi 

Im getting the following error when trying to process a csv based data file.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 1 in stage 10.0 failed 4 times, most recent failure: Lost 
task 1.3 in stage 10.0 (TID 262, hc2r1m3.semtech-solutions.co.nz): 
java.lang.ArrayIndexOutOfBoundsException: 0
at 
org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
at 
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

I have made sure that none of my data rows are empty and that they all have 15 
records. I have also physically checked the
data. The error occurs when I run the actual spark sql on the last line. The 
script is as follows. 

  val server= "hdfs://hc2nn.semtech-solutions.co.nz:8020"
  val path  = "/data/spark/h2o/"

  val train_csv =  server + path + "adult.train.data" // 32,562 rows
  val test_csv  =  server + path + "adult.test.data"  // 16,283 rows

  // load the data

  val rawTrainData = sparkCxt.textFile(train_csv)
  val rawTestData  = sparkCxt.textFile(test_csv)

  // create a spark sql schema for the row

  val schemaString = "age workclass fnlwgt education educationalnum 
maritalstatus" +
 " occupation relationship race gender capitalgain 
capitalloss" +
 " hoursperweek nativecountry income"

  val schema = StructType( schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, false)))

  // create an RDD from the raw training data

  val trainRDD  = rawTrainData
 .filter(!_.isEmpty)
 .map(rawRow => Row.fromSeq(rawRow.split(",")
 .filter(_.length == 15)
 .map(_.toString).map(_.trim) ))

  println( "> Raw Training Data Count = " + trainRDD.count() )

  val testRDD   = rawTestData
 .filter(!_.isEmpty)
 .map(rawRow  => Row.fromSeq(rawRow.split(",")
 .filter(_.length == 15)
 .map(_.toString).map(_.trim) ))

  println( "> Raw Testing Data Count = " + testRDD.count() )

  // create a schema RDD

  val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema)
  val testSchemaRDD  = sqlContext.applySchema(testRDD,  schema)

  // register schema RDD as a table

  trainSchemaRDD.registerTempTable("trainingTable")
  testSchemaRDD.registerTempTable("testingTable")

  println( "> Schema RDD Training Data Count = " + trainSchemaRDD.count() )
  println( "> Schema RDD Testing Data Count  = " + testSchemaRDD.count() )

  // now run sql against the table to filter the data

  val schemaRddTrain = sqlContext.sql(
"SELECT "+
   "age,workclass,education,maritalstatus,occupation,relationship,race,"+
   "gender,hoursperweek,nativecountry,income "+
"FROM trainingTable LIMIT 5000")

  println( "> Training Data Count = " + schemaRddTrain.count() )

Any advice is appreciated :)

  

Spark SQL ArrayIndexOutOfBoundsException

2015-05-11 Thread Mike Frampton
Sorry if this email is a duplicate, I realised that I was not registered with 
the mailing list  ...


I am having a problem with a spark sql script which is running on a spark 1.2 

CentOS CDH 5.3 mini 5 node cluster. The script processes some image csv data 

each record/line of which has 28x28 integer elements ending with an integer 
label

value which describes the record.


I have checked the data file rows to ensure that they contain the correct 
number of 

elements. 



[hadoop@hc2r1m2 h2o_spark_1_2]$ cat chk_csv.bash

#!/bin/bash


MYFILE=$1


echo ""

echo "check $MYFILE"

echo ""


cat $MYFILE | while read line

do

  echo -n "Line records = "

  echo $line | tr "," "\n" | wc -l

done



[hadoop@hc2r1m2 h2o_spark_1_2]$ ./chk_csv.bash mnist_train_1x.csv


check mnist_train_1x.csv


Line records = 785

Line records = 785

Line records = 785

Line records = 785

Line records = 785

Line records = 785

Line records = 785

Line records = 785

Line records = 785

Line records = 785



The basic script looks like this .. 



  // create  a spark conf and context


  val sparkMaster = "spark://hc2nn.semtech-solutions.co.nz:7077"

  val appName = "Spark ex3"

  val conf = new SparkConf()


  conf.setMaster(sparkMaster)

  conf.setAppName(appName)


  val sparkCxt = new SparkContext(conf)



  // Initialize SQL context


  import org.apache.spark.sql._

  implicit val sqlContext = new SQLContext(sparkCxt)


  // prep the hdfs based data



  val server= "hdfs://hc2nn.semtech-solutions.co.nz:8020"

  val path  = "/data/spark/h2o/"

  val train_csv =  server + path + "mnist_train_1x.csv"



  val schemaString = getSchema() // string representing schema 28 x 28 int plus 
label int


  val schema = StructType( schemaString.split(" ")

 .map(fieldName => StructField(fieldName, IntegerType, false)))


  val rawTrainData = sparkCxt.textFile(train_csv)


  val trainRDD  = rawTrainData.map( rawRow => Row( rawRow.split(",") 
.map(_.toInt) ) )


  val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema)


  trainSchemaRDD.registerTempTable("trainingTable")


  val resSchemaRddTrain = sqlContext.sql("""SELECT P1,Label FROM 
trainingTable""".stripMargin)


  println( " > sqlResult rows = " + resSchemaRddTrain.count() )


  val resArray = resSchemaRddTrain.collect()


  // collect results in  java.lang.ArrayIndexOutOfBoundsException




No matter what I try I get an ArrayIndexOutOfBoundsException when I try to 
examine the result

of the select even though I know that there are 10 result rows. 


I dont think its a data issue as both the schema and all data rows have 28x28+1 
= 785 elements. 

Advice appreciated



  

Re: Spark + Kinesis + Stream Name + Cache?

2015-05-08 Thread Mike Trienis
Hey Chris!

I was happy to see the documentation outlining that issue :-) However, I
must have got into a pretty terrible state because I had to delete and
recreate the kinesis streams as well as the DynamoDB tables.

Thanks for the reply, everything is sorted.

Mike




On Fri, May 8, 2015 at 7:55 PM, Chris Fregly  wrote:

> hey mike-
>
> as you pointed out here from my docs, changing the stream name is
> sometimes problematic due to the way the Kinesis Client Library manages
> leases and checkpoints, etc in DynamoDB.
>
> I noticed this directly while developing the Kinesis connector which is
> why I highlighted the issue here.
>
> is wiping out the DynamoDB table a suitable workaround for now?  usually
> in production, you wouldn't be changing stream names often, so hopefully
> that's ok for dev.
>
> otherwise, can you share the relevant spark streaming logs that are
> generated when you do this?
>
> I saw a lot of "lease not owned by this Kinesis Client" type of errors,
> from what I remember.
>
> lemme know!
>
> -Chris
>
> On May 8, 2015, at 4:36 PM, Mike Trienis  wrote:
>
>
>- [Kinesis stream name]: The Kinesis stream that this streaming
>application receives from
>   - The application name used in the streaming context becomes the
>   Kinesis application name
>   - The application name must be unique for a given account and
>   region.
>   - The Kinesis backend automatically associates the application name
>   to the Kinesis stream using a DynamoDB table (always in the us-east-1
>   region) created during Kinesis Client Library initialization.
>   - *Changing the application name or stream name can lead to Kinesis
>   errors in some cases. If you see errors, you may need to manually delete
>   the DynamoDB table.*
>
>
> On Fri, May 8, 2015 at 2:06 PM, Mike Trienis 
> wrote:
>
>> Hi All,
>>
>> I am submitting the assembled fat jar file by the command:
>>
>> bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar
>> --class com.xxx.Consumer -0.1-SNAPSHOT.jar
>>
>> It reads the data file from kinesis using the stream name defined in a
>> configuration file. It turns out that it reads the data perfectly fine for
>> one stream name (i.e. the default), however, if I switch the stream name
>> and re-submit the jar, it no longer reads the data. From CloudWatch, I can
>> see that there is data put into the stream and spark is actually sending
>> get requests as well. However, it doesn't seem to be outputting the data.
>>
>> Has anyone else encountered a similar issue? Does spark cache the stream
>> name somewhere? I also have checkpointing enabled as well.
>>
>> Thanks, Mike.
>>
>>
>>
>>
>>
>>
>


Re: Spark + Kinesis + Stream Name + Cache?

2015-05-08 Thread Mike Trienis
   - [Kinesis stream name]: The Kinesis stream that this streaming
   application receives from
  - The application name used in the streaming context becomes the
  Kinesis application name
  - The application name must be unique for a given account and region.
  - The Kinesis backend automatically associates the application name
  to the Kinesis stream using a DynamoDB table (always in the us-east-1
  region) created during Kinesis Client Library initialization.
  - *Changing the application name or stream name can lead to Kinesis
  errors in some cases. If you see errors, you may need to manually delete
  the DynamoDB table.*


On Fri, May 8, 2015 at 2:06 PM, Mike Trienis 
wrote:

> Hi All,
>
> I am submitting the assembled fat jar file by the command:
>
> bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar
> --class com.xxx.Consumer -0.1-SNAPSHOT.jar
>
> It reads the data file from kinesis using the stream name defined in a
> configuration file. It turns out that it reads the data perfectly fine for
> one stream name (i.e. the default), however, if I switch the stream name
> and re-submit the jar, it no longer reads the data. From CloudWatch, I can
> see that there is data put into the stream and spark is actually sending
> get requests as well. However, it doesn't seem to be outputting the data.
>
> Has anyone else encountered a similar issue? Does spark cache the stream
> name somewhere? I also have checkpointing enabled as well.
>
> Thanks, Mike.
>
>
>
>
>
>


Spark + Kinesis + Stream Name + Cache?

2015-05-08 Thread Mike Trienis
Hi All,

I am submitting the assembled fat jar file by the command:

bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar --class
com.xxx.Consumer -0.1-SNAPSHOT.jar

It reads the data file from kinesis using the stream name defined in a
configuration file. It turns out that it reads the data perfectly fine for
one stream name (i.e. the default), however, if I switch the stream name
and re-submit the jar, it no longer reads the data. From CloudWatch, I can
see that there is data put into the stream and spark is actually sending
get requests as well. However, it doesn't seem to be outputting the data.

Has anyone else encountered a similar issue? Does spark cache the stream
name somewhere? I also have checkpointing enabled as well.

Thanks, Mike.


Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-14 Thread Mike Trienis
Richard,

You response was very helpful and actually resolved my issue. In case
others run into a similar issue,  I followed the procedure:

   - Upgraded to spark 1.3.0
   - Add all spark related libraries are "provided"
   - Include spark transitive library dependencies

where my build.sbt file

libraryDependencies ++= {
  Seq(
"org.apache.spark" %% "spark-core" % "1.3.0" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.3.0" % "provided",
"org.apache.spark" %% "spark-streaming-kinesis-asl" % "1.3.0" %
"provided",
"joda-time" % "joda-time" % "2.2",
"org.joda" % "joda-convert" % "1.2",
"com.amazonaws" % "aws-java-sdk" % "1.8.3",
"com.amazonaws" % "amazon-kinesis-client" % "1.2.0")

and submitting a spark job can done via

sh ./spark-1.3.0-bin-cdh4/bin/spark-submit --jars
spark-streaming-kinesis-asl_2.10-1.3.0.jar --verbose --class
com.xxx.MyClass target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar

Thanks again Richard!

Cheers Mike.


On Tue, Apr 14, 2015 at 11:01 AM, Richard Marscher  wrote:

> Hi,
>
> I've gotten an application working with sbt-assembly and spark, thought
> I'd present an option. In my experience, trying to bundle any of the Spark
> libraries in your uber jar is going to be a major pain. There will be a lot
> of deduplication to work through and even if you resolve them it can be
> easy to do it incorrectly. I considered it an intractable problem. So the
> alternative is to not include those jars in your uber jar. For this to work
> you will need the same libraries on the classpath of your Spark cluster and
> your driver program (if you are running that as an application and not just
> using spark-submit).
>
> As for your NoClassDefFoundError, you either are missing Joda Time in your
> runtime classpath or have conflicting versions. It looks like something
> related to AWS wants to use it. Check your uber jar to see if its including
> the org/joda/time as well as the classpath of your spark cluster. For
> example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory
> has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark
> 1.2 I found a conflict between httpclient versions that my uber jar pulled
> in for AWS libraries and the one bundled in the spark uber jar. I hand
> patched the spark uber jar to remove the offending httpclient bytecode to
> resolve the issue. You may be facing a similar situation.
>
> I hope that gives some ideas for resolving your issue.
>
> Regards,
> Rich
>
> On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis 
> wrote:
>
>> Hi Vadim,
>>
>> After removing "provided" from "org.apache.spark" %%
>> "spark-streaming-kinesis-asl" I ended up with huge number of deduplicate
>> errors:
>>
>> https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a
>>
>> It would be nice if you could share some pieces of your mergeStrategy
>> code for reference.
>>
>> Also, after adding "provided" back to "spark-streaming-kinesis-asl" and I
>> submit the spark job with the spark-streaming-kinesis-asl jar file
>>
>> sh /usr/lib/spark/bin/spark-submit --verbose --jars
>> lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer
>> target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar
>>
>> I still end up with the following error...
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/joda/time/format/DateTimeFormat
>> at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44)
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>> at java.lang.Class.newInstance(Class.java:379)
>>
>> Has anyone else run into this issue?
>>
>>
>>
>> On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy <
>> vadim.bichuts...@gmail.com> wrote:
>>
>>> I don't believe the Kinesis asl should be provided. I used mergeStrategy
>>> successfully to produce an "uber jar."
>>>
>>> Fyi, I've been having trouble consuming data out of Kinesis with Spark
>>> with no success :(
>>> Would be curio

Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-14 Thread Mike Trienis
Hi Vadim,

After removing "provided" from "org.apache.spark" %%
"spark-streaming-kinesis-asl" I ended up with huge number of deduplicate
errors:

https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a

It would be nice if you could share some pieces of your mergeStrategy code
for reference.

Also, after adding "provided" back to "spark-streaming-kinesis-asl" and I
submit the spark job with the spark-streaming-kinesis-asl jar file

sh /usr/lib/spark/bin/spark-submit --verbose --jars
lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer
target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar

I still end up with the following error...

Exception in thread "main" java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeFormat
at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)

Has anyone else run into this issue?



On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy <
vadim.bichuts...@gmail.com> wrote:

> I don't believe the Kinesis asl should be provided. I used mergeStrategy
> successfully to produce an "uber jar."
>
> Fyi, I've been having trouble consuming data out of Kinesis with Spark
> with no success :(
> Would be curious to know if you got it working.
>
> Vadim
>
> On Apr 13, 2015, at 9:36 PM, Mike Trienis  wrote:
>
> Hi All,
>
> I have having trouble building a fat jar file through sbt-assembly.
>
> [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
> [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
> [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
> [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
> [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
> [warn] Merging
> 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
> strategy 'discard'
> [warn] Merging
> 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
> with strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
> with strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties'
> with strategy 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/services/java.sql.Driver' with strategy
> 'filterDistinctLines'
> [warn] Merging 'rootdoc.txt' with strategy 'concat'
> [warn] Strategy 'concat' was applied to a file
> [warn] Strategy 'discard' was applied to 17 files
> [warn] Strategy 'filterDistinctLines' was applied to a file
> [warn] Strategy 'rename' was applied to 4 files
>
> When submitting the spark application through the command
>
> sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName
> target/scala-2.10/-snapshot.jar
>
> I end up the the following error,
>
> Exception in thread "main" java.lang.NoClassDefFoundErr

Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Mike Trienis
Thanks Vadim, I can certainly consume data from a Kinesis stream when
running locally. I'm currently in the processes of extending my work to a
proper cluster (i.e. using a spark-submit job via uber jar). Feel free to
add me to gmail chat and maybe we can help each other.

On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy <
vadim.bichuts...@gmail.com> wrote:

> I don't believe the Kinesis asl should be provided. I used mergeStrategy
> successfully to produce an "uber jar."
>
> Fyi, I've been having trouble consuming data out of Kinesis with Spark
> with no success :(
> Would be curious to know if you got it working.
>
> Vadim
>
> On Apr 13, 2015, at 9:36 PM, Mike Trienis  wrote:
>
> Hi All,
>
> I have having trouble building a fat jar file through sbt-assembly.
>
> [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
> [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
> [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
> [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
> [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
> [warn] Merging
> 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
> strategy 'discard'
> [warn] Merging
> 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
> with strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
> with strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties'
> with strategy 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/services/java.sql.Driver' with strategy
> 'filterDistinctLines'
> [warn] Merging 'rootdoc.txt' with strategy 'concat'
> [warn] Strategy 'concat' was applied to a file
> [warn] Strategy 'discard' was applied to 17 files
> [warn] Strategy 'filterDistinctLines' was applied to a file
> [warn] Strategy 'rename' was applied to 4 files
>
> When submitting the spark application through the command
>
> sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName
> target/scala-2.10/-snapshot.jar
>
> I end up the the following error,
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeFormat
> at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:379)
> at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
> at
> com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
> at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
> at
> com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
> at
> com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
> at
> com.amazonaws.Amazo

sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Mike Trienis
Hi All,

I have having trouble building a fat jar file through sbt-assembly.

[warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
[warn] Merging 'META-INF/NOTICE' with strategy 'rename'
[warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF/LICENSE' with strategy 'rename'
[warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
[warn] Merging
'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
strategy 'discard'
[warn] Merging
'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
with strategy 'discard'
[warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
with strategy 'discard'
[warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
'discard'
[warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
[warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/services/java.sql.Driver' with strategy
'filterDistinctLines'
[warn] Merging 'rootdoc.txt' with strategy 'concat'
[warn] Strategy 'concat' was applied to a file
[warn] Strategy 'discard' was applied to 17 files
[warn] Strategy 'filterDistinctLines' was applied to a file
[warn] Strategy 'rename' was applied to 4 files

When submitting the spark application through the command

sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName
target/scala-2.10/-snapshot.jar

I end up the the following error,

Exception in thread "main" java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeFormat
at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
at
com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
at
com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:202)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:175)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:155)
at com.quickstatsengine.aws.AwsProvider$.(AwsProvider.scala:20)
at com.quickstatsengine.aws.AwsProvider$.(AwsProvider.scala)

The snippet from my build.sbt file is:

"org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.2.0" % "provided",
"com.datastax.spark" %% "spark-cassandra-connector" %
"1.2.0-alpha1" % "provided",
"org.apache.spark" %% "spark-streaming-kinesis-asl" % "1.2.0" %
"provided",

And the error is originating from:

val kinesisClient = new AmazonKinesisClient(new
DefaultAWSCredentialsProviderChain())

Am I correct to set spark-streaming-kinesis-asl as a *provided *dependency?
Also, is there a merge strategy I need apply?

Any help would be appreciated, Mike.


Re: MLlib : Gradient Boosted Trees classification confidence

2015-04-13 Thread mike
Thank you Peter.

I just want to be sure.
even if I use the "classification" setting the GBT uses regression trees
and not classification trees?

I know the difference between the two(theoretically) is only in the loss
and impurity functions.
thus in case it uses classification trees doing what you proposed will
result in the classification it self.

Also by looking in the scala API
I found that each Node holds a Predict object which contains "probability
of the label (classification only)" (
https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.mllib.tree.model.Predict
)
** This what i called confidence


So to sum-up my questions and confusion:
1. Does GBT uses classification trees when setting it to classification or
it always uses regression trees ?
2. In case it uses classification trees , How could i efficiently get to
the confidence = Node. Predict.prob ?

Thanks again'
Michael



On Mon, Apr 13, 2015 at 10:13 AM, pprett [via Apache Spark User List] <
ml-node+s1001560n22470...@n3.nabble.com> wrote:

> Hi Mike,
>
> Gradient Boosted Trees (or gradient boosted regression trees) dont store
> probabilities in each leaf node but rather model a continuous function
> which is then transformed via a logistic sigmoid (ie. like in a Logistic
> Regression model).
> If you are just interested in a confidence, you can use this continuous
> function directly: its just the (weighted) sum of the predictions of the
> individual regression trees. Use the absolute value for confidence and the
> sign to determine which class label.
> Here is an example:
>
> def score(features: Vector): Double = {
> val treePredictions = gbdt.trees.map(_.predict(features))
> blas.ddot(gbdt.numTrees, treePredictions, 1, gbdt.treeWeights, 1)
> }
>
> If you are rather interested in probabilities, just pass the function
> value to a logistic sigmoid.
>
> best,
>  Peter
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Gradient-Boosted-Trees-classification-confidence-tp22466p22470.html
>  To unsubscribe from MLlib : Gradient Boosted Trees classification
> confidence, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=22466&code=bWljaGFlbGtyYXNAZ21haWwuY29tfDIyNDY2fDQxMDYzODQ0Mw==>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Gradient-Boosted-Trees-classification-confidence-tp22466p22476.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

  1   2   >