Re: Question about installing Apache Spark [PySpark] computer requirements
856) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:410) at org.apache.spark.rdd.RDD.collect(RDD.scala:1048) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.IOException: Cannot run program "C:\Users\mikej\AppData\Local\Programs\Python\Python312": CreateProcess error=5, Access is denied at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128) at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:181) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more On Mon, Jul 29, 2024 at 4:34 PM
Question about installing Apache Spark [PySpark] computer requirements
Hello, I am trying to run Pyspark on my computer without success. I follow several different directions from online sources and it appears that I need to get a faster computer. I wanted to ask what are some recommendations for computer specifications to run PySpark (Apache Spark). Any help would be greatly appreciated. Thank you, Mike
Dark mode logo
Hi Spark Community, I see that y'all have a logo uploaded to https://www.apache.org/logos/#spark but it has black text. Is there an official, alternate logo with lighter text that would look good on a dark background? Thanks, Mike - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Parallelising JDBC reads in spark
Does anything different happened when you set the isolationLevel to do Dirty Reads i.e. "READ_UNCOMMITTED" On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H wrote: > Hi, > > We are writing a ETL pipeline using Spark, that fetch the data from SQL > server in batch mode (every 15mins). Problem we are facing when we try to > parallelising single table reads into multiple tasks without missing any > data. > > We have tried this, > > >- Use `ROW_NUMBER` window function in the SQL query >- Then do >- > >DataFrame df = >hiveContext >.read() >.jdbc( >**, >query, >"row_num", >1, >, >noOfPartitions, >jdbcOptions); > > > > The problem with this approach is if our tables get updated in between in SQL > Server while tasks are still running then the `ROW_NUMBER` will change and we > may miss some records. > > > Any approach to how to fix this issue ? . Any pointers will be helpful > > > *Note*: I am on spark 1.6 > > > Thanks > > Manjiunath Shetty > >
Re: unsubscribe
Nandan, Please send unsubscribe requests to user-unsubscr...@spark.apache.org On Tue, Nov 26, 2019 at 6:02 AM @Nandan@ wrote: > unsubscribe >
spark.sql.hive.exec.dynamic.partition description
Hi Guys, Does any one have detailed descriptions for hive parameters in spark? like spark.sql.hive.exec.dynamic.partition I couldn't find any reference in my spark 2.3.2 configuration. I'm looking into a problem that Spark cannot understand Hive partition at all. In my Hive table it is partitioned by 1,000; however when I read the same table in spark in RDD it becomes 105 if I query as df.rdd.getNumPartitions() Because create 1 task per partition when read, the reading is painfully slow as 1 task reading many Hive folders in sequential order. My target is spin up more tasks that increase parallelism during read operations. Hope this makes sense. Thank you Best Regards, Mike
Fwd: autoBroadcastJoinThreshold not working as expected
Dear all, I'm on a case that when certain table being exposed to broadcast join, the query will eventually failed with remote block error. Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely 10485760 [image: image.png] Then we proceed to perform query. In the SQL plan, we found that one table that is 25MB in size is broadcast as well. [image: image.png] Also in desc extended the table is 24452111 bytes. It is a Hive table. We always ran into error when this table being broadcast. Below is the sample error Caused by: java.io.IOException: org.apache.spark.SparkException: corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625 != -992055931 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) Also attached the physical plan if you're interested. One thing to note that, if I turn down autoBroadcastJoinThreshold to 5MB, this query will get successfully executed and default.product NOT broadcasted. However, when I change to another query that querying even less columns than pervious one, even in 5MB this table still get broadcasted and failed with the same error. I even changed to 1MB and still the same. Appreciate if you can share any input. Thank you very much. Best Regards, MIke == Physical Plan == *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, bu_name#273, principle_supplier_code#154 AS supplier_code#476, mother_company_name#150 AS supplier_name#477, brand_type_name#117, brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, h1_l1_hierarchy_name#126 AS Category_Name#480, coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 more fields] +- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 41 more fields] : +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], [fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 35 more fields] : : +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], LeftOuter, BuildRight : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 33 more fields] : : : +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], [cast(store_key#155 as double)], LeftOuter, BuildRight : : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt
Fwd: autoBroadcastJoinThreshold not working as expected
Dear all, I'm on a case that when certain table being exposed to broadcast join, the query will eventually failed with remote block error. Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely 10485760 [image: image.png] Then we proceed to perform query. In the SQL plan, we found that one table that is 25MB in size is broadcast as well. [image: image.png] Also in desc extended the table is 24452111 bytes. It is a Hive table. We always ran into error when this table being broadcast. Below is the sample error Caused by: java.io.IOException: org.apache.spark.SparkException: corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625 != -992055931 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) Also attached the physical plan if you're interested. One thing to note that, if I turn down autoBroadcastJoinThreshold to 5MB, this query will get successfully executed and default.product NOT broadcasted. However, when I change to another query that querying even less columns than pervious one, even in 5MB this table still get broadcasted and failed with the same error. I even changed to 1MB and still the same. Appreciate if you can share any input. Thank you very much. Best Regards, MIke == Physical Plan == *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, bu_name#273, principle_supplier_code#154 AS supplier_code#476, mother_company_name#150 AS supplier_name#477, brand_type_name#117, brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, h1_l1_hierarchy_name#126 AS Category_Name#480, coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 more fields] +- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 41 more fields] : +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], [fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 35 more fields] : : +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], LeftOuter, BuildRight : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 33 more fields] : : : +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], [cast(store_key#155 as double)], LeftOuter, BuildRight : : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt
autoBroadcastJoinThreshold not working as expected
Dear all, I'm on a case that when certain table being exposed to broadcast join, the query will eventually failed with remote block error. Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely 10485760 [image: image.png] Then we proceed to perform query. In the SQL plan, we found that one table that is 25MB in size is broadcast as well. [image: image.png] Also in desc extended the table is 24452111 bytes. It is a Hive table. We always ran into error when this table being broadcast. Below is the sample error Caused by: java.io.IOException: org.apache.spark.SparkException: corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625 != -992055931 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) Also attached the physical plan if you're interested. One thing to note that, if I turn down autoBroadcastJoinThreshold to 5MB, this query will get successfully executed and default.product NOT broadcasted. However, when I change to another query that querying even less columns than pervious one, even in 5MB this table still get broadcasted and failed with the same error. I even changed to 1MB and still the same. Appreciate if you can share any input. Thank you very much. Best Regards, MIke == Physical Plan == *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, bu_name#273, principle_supplier_code#154 AS supplier_code#476, mother_company_name#150 AS supplier_name#477, brand_type_name#117, brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, h1_l1_hierarchy_name#126 AS Category_Name#480, coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 more fields] +- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 41 more fields] : +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], [fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 35 more fields] : : +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], LeftOuter, BuildRight : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 33 more fields] : : : +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], [cast(store_key#155 as double)], LeftOuter, BuildRight : : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt
Question about Spark, Inner Join and Delegation to a Parquet Table
I have a question about Spark and how it delegates filters to a Parquet-based table. I have two tables in Hive in Parquet format. Table1 has with four columns of type double and table2 has two columns of type double. I am doing an INNER JOIN of the following: SELECT table1.name FROM table1 INNER JOIN table2 ON table2.x BETWEEN table1.xmin AND table1.xmax AND table2.y BETWEEN table1.ymin AND table1.ymax I noticed that the execution plan as reported by Spark is only delegating the IsNull filter to the tables and not any other filters: == Physical Plan == *Project [name#0] +- BroadcastNestedLoopJoin BuildRight, Inner, x#36 >= xmin#13) && (x#36 <= xmax#15)) && (y#37 >= ymin#14)) && (y#37 <= ymax#16)) :- *Project [name#0, xmin#13, ymin#14, xmax#15, ymax#16] : +- *Filter (((isnotnull(xmin#13) && isnotnull(ymin#14)) && isnotnull(ymax#16)) && isnotnull(xmax#15)) : +- *FileScan parquet [name#0,xmin#13,ymin#14,xmax#15,ymax#16] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table1, PartitionFilters: [], PushedFilters: [IsNotNull(xmin), IsNotNull(ymin), IsNotNull(ymax), IsNotNull(xmax)], ReadSchema: struct +- BroadcastExchange IdentityBroadcastMode +- *Project [x#36, y#37] +- *Filter (isnotnull(y#37) && isnotnull(x#36)) +- *FileScan parquet [x#36,y#37] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table2], PartitionFilters: [], PushedFilters: [IsNotNull(y), IsNotNull(x)], ReadSchema: struct However, when doing a filter against a single table the filter is delegated to the table: SELECT name FROM table1 where table1.xmin > -73.4454183678 == Physical Plan == CollectLimit 21 +- *Project [pbkey#150] +- *Filter (isnotnull(xmin#163) && (xmin#163 > -73.4454183678)) +- *FileScan parquet [pbkey#150,xmin#163] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://...:8020/apps/hive/warehouse/table1, PartitionFilters: [], PushedFilters: [IsNotNull(xmin), GreaterThan(xmin,-73.4454183678)], ReadSchema: struct So the question is: does Spark delegate filters in a join condition to a Parquet table or is this an error in the "explain plan" output? Thanks.
Spark SQL within a DStream map function
Hello, I have a web application that publishes JSON messages on to a messaging queue that contain metadata and a link to a CSV document on S3. I'd like to iterate over these JSON messages, and for each one pull the CSV document into spark SQL to transform it (based on the metadata in the JSON message) and output the results to a search index. Each file on S3 has different headers, potentially different delimiters, and differing numbers of rows. Basically what I'm trying to do is something like this: JavaDStream parsedMetadataAndRows = queueStream.map(new Function() { @Override ParsedDocument call(String metadata) throws Exception { Map gson = new Gson().fromJson(metadata, Map.class) // get metadata from gson String s3Url = gson.url String delimiter = gson.delimiter // etc... // read s3Url Dataset dataFrame = sqlContext.read() .format("com.databricks.spark.csv") .option("delimiter", delimiter) .option("header", true) .option("inferSchema", true) .load(url) // process document, ParsedDocument docPlusRows = //... return docPlusRows }) JavaEsSparkStreaming.saveToEs(parsedMetadataAndRows, "index/type" // ... But it appears I cannot get access to the sqlContext when I run this on the spark cluster because that code is executing in the executor not in the driver. Is there a way I can access or create a SqlContext to be able to pull the file down from S3 in my map function? Or do you have any recommendations as to how I could set up a streaming job in a different way that would allow me to accept metadata on the stream of records coming in and pull each file down from s3 for processing? Thanks in advance for your help! Mike
Parquet Read Speed: Spark SQL vs Parquet MR
Hi Spark User, I have run into some situation where Spark SQL is much slower than Parquet MR for processing parquet files. Can you provide some guidance on optimization? Suppose I have a table "person" with columns: gender, age, name, address, etc, which is stored in parquet files. I tried two ways to read the table in spark: 1) define case class Person(gender: String, age: Int, etc), then I use Spark SQL Dataset API: val ds = spark.read.parquet("...").as[Person] 2) define avrò record "record Person {string gender, int age, etc}". Then use parquet-avro <https://github.com/apache/parquet-mr/tree/master/parquet-avro> and newapiHadoopFile: val rdd = sc.newAPIHadoopFile( path, classOf[ParquetInputFormat[avro.Person]], classOf[Void], classOf[avro.Person], job.getConfiguration).values Then I compare 3 actions (spark 2.1.1, parquet-avro 1.8.1): a) ds.filter(" gender='female' and age > 50").count // 1 min b) ds.filter(" gender='female'").filter(_.age > 50).count// 15 min c) rdd.filter(r => r.gender == "female" && r.age > 50).count // 7 min I can understand a) is faster than c) because a) is limited to sql query so Spark can do a lot things to optimize (such as not fully deserialize the objects). But I don't understand b) is much slower than c) because I assume both requires full deserialization. Is there anything I can try to improve b)? Thanks, Mike
Re: Convert camelCase to snake_case when saving Dataframe/Dataset to parquet?
Cool. Thanks a lot in advance. On Mon, May 22, 2017 at 2:12 PM, Bryan Jeffrey wrote: > Mike, > > I have code to do that. I'll share it tomorrow. > > Get Outlook for Android <https://aka.ms/ghei36> > > > > > On Mon, May 22, 2017 at 4:53 PM -0400, "Mike Wheeler" < > rotationsymmetr...@gmail.com> wrote: > > Hi Spark User, >> >> For Scala case class, we usually use camelCase (carType) for member >> fields. However, many data system use snake_case (car_type) for column >> names. When saving a Dataset of case class to parquet, is there any way to >> automatically convert camelCase to snake_case (carType -> car_type)? >> >> Thanks, >> >> Mike >> >> >>
Convert camelCase to snake_case when saving Dataframe/Dataset to parquet?
Hi Spark User, For Scala case class, we usually use camelCase (carType) for member fields. However, many data system use snake_case (car_type) for column names. When saving a Dataset of case class to parquet, is there any way to automatically convert camelCase to snake_case (carType -> car_type)? Thanks, Mike
Best Practice for Enum in Spark SQL
Hi Spark Users, I want to store Enum type (such as Vehicle Type: Car, SUV, Wagon) in my data. My storage format will be parquet and I need to access the data from Spark-shell, Spark SQL CLI, and hive. My questions: 1) Should I store my Enum type as String or store it as numeric encoding (aka 1=Car, 2=SUV, 3=Wagon)? 2) If I choose String, any penalty in hard drive space or memory? Thank you! Mike
Re: Schema Evolution for nested Dataset[T]
Hi Michael, Thank you for the suggestions. I am wondering how I can make `withColumn` to handle nested structure? For example, below is my code to generate the data. I basically add the `age` field to `Person2`, which is nested in an Array for Course2. Then I want to fill in 0 for age with age is null. case class Person1(name: String) case class Person2(name: String, age: Int) case class Course1(id: Int, students: Array[Person1]) case class Course2(id: Int, students: Array[Person2]) Seq(Course1(10, Array(Person1("a"), Person1("b".toDF.write.parquet("data1") Seq(Course2(20, Array(Person2("c",20), Person2("d",10.toDF.write.parquet("data2") val allData = spark.read.option("mergeSchema", "true").parquet("data1", "data2") allData.show +---++ | id|students| +---++ | 20|[[c,20], [d,10]]| | 10|[[a,null], [b,null]]| +---++ *My first try:* allData.withColumn("students.age", coalesce($"students.age", lit(0))) It returns the exception: org.apache.spark.sql.AnalysisException: cannot resolve 'coalesce(`students`.`age`, 0)' due to data type mismatch: input to function coalesce should all be the same type, but it's [array, int];; *My second try: * allData.withColumn("students.age", coalesce($"students.age", array(lit(0), lit(0.show +---+++ | id|students|students.age| +---+++ | 20|[[c,20], [d,10]]|[20, 10]| | 10|[[a,null], [b,null]]|[null, null]| +---+++ It creates a new column "students.age" instead of imputing the value age nested in students. Thank you very much in advance. Mike On Mon, May 1, 2017 at 10:31 AM, Michael Armbrust wrote: > Oh, and if you want a default other than null: > > import org.apache.spark.sql.functions._ > df.withColumn("address", coalesce($"address", lit()) > > On Mon, May 1, 2017 at 10:29 AM, Michael Armbrust > wrote: > >> The following should work: >> >> val schema = implicitly[org.apache.spark.sql.Encoder[Course]].schema >> spark.read.schema(schema).parquet("data.parquet").as[Course] >> >> Note this will only work for nullable files (i.e. if you add a primitive >> like Int you need to make it an Option[Int]) >> >> On Sun, Apr 30, 2017 at 9:12 PM, Mike Wheeler < >> rotationsymmetr...@gmail.com> wrote: >> >>> Hi Spark Users, >>> >>> Suppose I have some data (stored in parquet for example) generated as >>> below: >>> >>> package com.company.entity.old >>> case class Course(id: Int, students: List[Student]) >>> case class Student(name: String) >>> >>> Then usually I can access the data by >>> >>> spark.read.parquet("data.parquet").as[Course] >>> >>> Now I want to add a new field `address` to Student: >>> >>> package com.company.entity.new >>> case class Course(id: Int, students: List[Student]) >>> case class Student(name: String, address: String) >>> >>> Then obviously running `spark.read.parquet("data.parquet").as[Course]` >>> on data generated by the old entity/schema will fail because `address` >>> is missing. >>> >>> In this case, what is the best practice to read data generated with >>> the old entity/schema to the new entity/schema, with the missing field >>> set to some default value? I know I can manually write a function to >>> do the transformation from the old to the new. But it is kind of >>> tedious. Any automatic methods? >>> >>> Thanks, >>> >>> Mike >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >> >
Schema Evolution for nested Dataset[T]
Hi Spark Users, Suppose I have some data (stored in parquet for example) generated as below: package com.company.entity.old case class Course(id: Int, students: List[Student]) case class Student(name: String) Then usually I can access the data by spark.read.parquet("data.parquet").as[Course] Now I want to add a new field `address` to Student: package com.company.entity.new case class Course(id: Int, students: List[Student]) case class Student(name: String, address: String) Then obviously running `spark.read.parquet("data.parquet").as[Course]` on data generated by the old entity/schema will fail because `address` is missing. In this case, what is the best practice to read data generated with the old entity/schema to the new entity/schema, with the missing field set to some default value? I know I can manually write a function to do the transformation from the old to the new. But it is kind of tedious. Any automatic methods? Thanks, Mike - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Combining reading from Kafka and HDFS w/ Spark Streaming
(Sorry if this is a duplicate. I got a strange error message when I first tried to send it earlier) I want to pull HDFS paths from Kafka and build text streams based on those paths. I currently have: val lines = KafkaUtils.createStream(/* params here */).map(_._2) val buffer = new ArrayBuffer[String]() lines.foreachRDD(rdd => { if (!rdd.partitions.isEmpty) { rdd.collect().foreach(line => { buffer += line }) } }) buffer.foreach(path => { streamingContext.textFileStream(path).foreachRDD(rdd => { println(s"${path} => ${rdd.count()}") }) }) streamingContext.start streamingContext.awaitTermination It's not actually counting any of the files in the paths, and I know the paths are valid. Can someone tell me if this is possible and if so, give me a pointer on how to fix this? Thanks, Mike
Combining reading from Kafka and HDFS w/ Spark Streaming
I want to pull HDFS paths from Kafka and build text streams based on those paths. I currently have: val lines = KafkaUtils.createStream(/* params here */).map(_._2) val buffer = new ArrayBuffer[String]() lines.foreachRDD(rdd => { if (!rdd.partitions.isEmpty) { rdd.collect().foreach(line => { buffer += line }) } }) buffer.foreach(path => { streamingContext.textFileStream(path).foreachRDD(rdd => { println(s"${path} => ${rdd.count()}") }) }) streamingContext.start streamingContext.awaitTermination It's not actually counting any of the files in the paths, and I know the paths are valid. Can someone tell me if this is possible and if so, give me a pointer on how to fix this? Thanks, Mike
RE: Best way to process lookup ETL with Dataframes
Thanks a lot Nicholas. RE: Upgrading, I was afraid someone would suggest that. ☺ Yes we have an upgrade planned, but due to politics, we have to finish this first round of ETL before we can do the upgrade. I can’t confirm for sure that this issue would be fixed in Spark >= 1.6 without doing the upgrade first, so I won’t be able to win the argument for upgrading yet… You see the problem… :( Anyway, the good news is we just had a memory upgrade, so I should be able to do more persisting of the dataframes. I am currently only persisting the join table (the table I am joining to, not the input data). Although I do cache the input at some point before the join, it is not every time I do a split+merge. I’ll have to persist the input data better. Thinking on it now, is it even necessary to cache the table I am joining to? Probably only if it is used more than once, right? Thanks, -Mike From: Nicholas Hakobian [mailto:nicholas.hakob...@rallyhealth.com] Sent: Friday, December 30, 2016 5:50 PM To: Sesterhenn, Mike Cc: ayan guha; user@spark.apache.org Subject: Re: Best way to process lookup ETL with Dataframes Yep, sequential joins is what I have done in the past with similar requirements. Splitting and merging DataFrames is most likely killing performance if you do not cache the DataFrame pre-split. If you do, it will compute the lineage prior to the cache statement once (at first invocation), then use the cached result to perform the additional join, then union the results. Without the cache, you are most likely computing the full lineage twice, all the way back to the raw data import and having double the read I/O. The most optimal path will most likely depend on the size of the tables you are joining to. If both are small (compared to the primary data source) and can be broadcasted, doing the sequential join will most likely be the easiest and most efficient approach. If one (or both) of the tables you are joining to are significantly large enough that they cannot be efficiently broadcasted, going through the join / cache / split / second join / union path is likely to be faster. It also depends on how much memory you can dedicate to caching...the possibilities are endless. I tend to approach this type of problem by weighing the cost of extra development time for a more complex join vs the extra execution time vs frequency of execution. For something that will execute daily (or more frequently) the cost of more development to have faster execution time (even if its only 2x faster) might be worth it. It might also be worth investigating if a newer version of Spark (1.6 at the least, or 2.0 if possible) is feasible to install. There are lots of performance improvements in those versions, if you have the option of upgrading. -Nick Nicholas Szandor Hakobian, Ph.D. Senior Data Scientist Rally Health nicholas.hakob...@rallyhealth.com<mailto:nicholas.hakob...@rallyhealth.com> On Fri, Dec 30, 2016 at 3:35 PM, Sesterhenn, Mike mailto:msesterh...@cars.com>> wrote: Thanks Nicholas. It looks like for some of my use cases, I might be able to use do sequential joins, and then use coalesce() (or in combination with withColumn(when()...)) to sort out the results. Splitting and merging dataframes seems to really kills my app performance. I'm not sure if it's a spark 1.5 thing or what, but I just refactored one column to do one less split/merge, and it saved me almost half the time on my job. But for some use cases I don't seem to be able to avoid them. It is important in some cases to NOT do a join under certain conditions for a row because bad data will result. Any other thoughts? From: Nicholas Hakobian mailto:nicholas.hakob...@rallyhealth.com>> Sent: Friday, December 30, 2016 2:12:40 PM To: Sesterhenn, Mike Cc: ayan guha; user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: Best way to process lookup ETL with Dataframes It looks like Spark 1.5 has the coalesce function, which is like NVL, but a bit more flexible. From Ayan's example you should be able to use: coalesce(b.col, c.col, 'some default') If that doesn't have the flexibility you want, you can always use nested case or if statements, but its just harder to read. Nicholas Szandor Hakobian, Ph.D. Senior Data Scientist Rally Health nicholas.hakob...@rallyhealth.com<mailto:nicholas.hakob...@rallyhealth.com> On Fri, Dec 30, 2016 at 7:46 AM, Sesterhenn, Mike mailto:msesterh...@cars.com>> wrote: Thanks, but is nvl() in Spark 1.5? I can't find it in spark.sql.functions (http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.functions$) Reading about the Oracle nvl function, it seems it is similar to the na functions. Not sure it will help though, because what I need is to join after the first join fails. Fr
Re: Best way to process lookup ETL with Dataframes
Thanks Nicholas. It looks like for some of my use cases, I might be able to use do sequential joins, and then use coalesce() (or in combination with withColumn(when()...)) to sort out the results. Splitting and merging dataframes seems to really kills my app performance. I'm not sure if it's a spark 1.5 thing or what, but I just refactored one column to do one less split/merge, and it saved me almost half the time on my job. But for some use cases I don't seem to be able to avoid them. It is important in some cases to NOT do a join under certain conditions for a row because bad data will result. Any other thoughts? From: Nicholas Hakobian Sent: Friday, December 30, 2016 2:12:40 PM To: Sesterhenn, Mike Cc: ayan guha; user@spark.apache.org Subject: Re: Best way to process lookup ETL with Dataframes It looks like Spark 1.5 has the coalesce function, which is like NVL, but a bit more flexible. From Ayan's example you should be able to use: coalesce(b.col, c.col, 'some default') If that doesn't have the flexibility you want, you can always use nested case or if statements, but its just harder to read. Nicholas Szandor Hakobian, Ph.D. Senior Data Scientist Rally Health nicholas.hakob...@rallyhealth.com<mailto:nicholas.hakob...@rallyhealth.com> On Fri, Dec 30, 2016 at 7:46 AM, Sesterhenn, Mike mailto:msesterh...@cars.com>> wrote: Thanks, but is nvl() in Spark 1.5? I can't find it in spark.sql.functions (http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.functions$) Reading about the Oracle nvl function, it seems it is similar to the na functions. Not sure it will help though, because what I need is to join after the first join fails. From: ayan guha mailto:guha.a...@gmail.com>> Sent: Thursday, December 29, 2016 11:06 PM To: Sesterhenn, Mike Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: Best way to process lookup ETL with Dataframes How about this - select a.*, nvl(b.col,nvl(c.col,'some default')) from driving_table a left outer join lookup1 b on a.id<http://a.id>=b.id<http://b.id> left outer join lookup2 c on a.id<http://a.id>=c,id ? On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike mailto:msesterh...@cars.com>> wrote: Hi all, I'm writing an ETL process with Spark 1.5, and I was wondering the best way to do something. A lot of the fields I am processing require an algorithm similar to this: Join input dataframe to a lookup table. if (that lookup fails (the joined fields are null)) { Lookup into some other table to join some other fields. } With Dataframes, it seems the only way to do this is to do something like this: Join input dataframe to a lookup table. if (that lookup fails (the joined fields are null)) { *SPLIT the dataframe into two DFs via DataFrame.filter(), one group with successful lookup, the other failed).* For failed lookup: { Lookup into some other table to grab some other fields. } *MERGE the dataframe splits back together via DataFrame.unionAll().* } I'm seeing some really large execution plans as you might imagine in the Spark Ui, and the processing time seems way out of proportion with the size of the dataset. (~250GB in 9 hours). Is this the best approach to implement an algorithm like this? Note also that some fields I am implementing require multiple staged split/merge steps due to cascading lookup joins. Thanks, Michael Sesterhenn msesterh...@cars.com<mailto:msesterh...@cars.com> -- Best Regards, Ayan Guha
Re: Best way to process lookup ETL with Dataframes
Thanks, but is nvl() in Spark 1.5? I can't find it in spark.sql.functions (http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.functions$) Reading about the Oracle nvl function, it seems it is similar to the na functions. Not sure it will help though, because what I need is to join after the first join fails. From: ayan guha Sent: Thursday, December 29, 2016 11:06 PM To: Sesterhenn, Mike Cc: user@spark.apache.org Subject: Re: Best way to process lookup ETL with Dataframes How about this - select a.*, nvl(b.col,nvl(c.col,'some default')) from driving_table a left outer join lookup1 b on a.id<http://a.id>=b.id<http://b.id> left outer join lookup2 c on a.id<http://a.id>=c,id ? On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike mailto:msesterh...@cars.com>> wrote: Hi all, I'm writing an ETL process with Spark 1.5, and I was wondering the best way to do something. A lot of the fields I am processing require an algorithm similar to this: Join input dataframe to a lookup table. if (that lookup fails (the joined fields are null)) { Lookup into some other table to join some other fields. } With Dataframes, it seems the only way to do this is to do something like this: Join input dataframe to a lookup table. if (that lookup fails (the joined fields are null)) { *SPLIT the dataframe into two DFs via DataFrame.filter(), one group with successful lookup, the other failed).* For failed lookup: { Lookup into some other table to grab some other fields. } *MERGE the dataframe splits back together via DataFrame.unionAll().* } I'm seeing some really large execution plans as you might imagine in the Spark Ui, and the processing time seems way out of proportion with the size of the dataset. (~250GB in 9 hours). Is this the best approach to implement an algorithm like this? Note also that some fields I am implementing require multiple staged split/merge steps due to cascading lookup joins. Thanks, Michael Sesterhenn msesterh...@cars.com<mailto:msesterh...@cars.com> -- Best Regards, Ayan Guha
Best way to process lookup ETL with Dataframes
Hi all, I'm writing an ETL process with Spark 1.5, and I was wondering the best way to do something. A lot of the fields I am processing require an algorithm similar to this: Join input dataframe to a lookup table. if (that lookup fails (the joined fields are null)) { Lookup into some other table to join some other fields. } With Dataframes, it seems the only way to do this is to do something like this: Join input dataframe to a lookup table. if (that lookup fails (the joined fields are null)) { *SPLIT the dataframe into two DFs via DataFrame.filter(), one group with successful lookup, the other failed).* For failed lookup: { Lookup into some other table to grab some other fields. } *MERGE the dataframe splits back together via DataFrame.unionAll().* } I'm seeing some really large execution plans as you might imagine in the Spark Ui, and the processing time seems way out of proportion with the size of the dataset. (~250GB in 9 hours). Is this the best approach to implement an algorithm like this? Note also that some fields I am implementing require multiple staged split/merge steps due to cascading lookup joins. Thanks, Michael Sesterhenn msesterh...@cars.com
Re: Is spark a right tool for updating a dataframe repeatedly
I've not done this in Scala yet, but in PySpark I've run into a similar issue where having too many dataframes cached does cause memory issues. Unpersist by itself did not clear the memory usage, but rather setting the variable equal to None allowed all the references to be cleared and the memory issues went away. I do not full understand Scala yet, but you may be able to set one of your dataframes to null to accomplish the same. Mike On Mon, Oct 17, 2016 at 8:38 PM, Mungeol Heo wrote: > First of all, Thank you for your comments. > Actually, What I mean "update" is generate a new data frame with modified > data. > The more detailed while loop will be something like below. > > var continue = 1 > var dfA = "a data frame" > dfA.persist > > while (continue > 0) { > val temp = "modified dfA" > temp.persist > temp.count > dfA.unpersist > > dfA = "modified temp" > dfA.persist > dfA.count > temp.unperist > > if ("dfA is not modifed") { > continue = 0 > } > } > > The problem is it will cause OOM finally. > And, the number of skipped stages will increase ever time, even though > I am not sure whether this is the reason causing OOM. > Maybe, I need to check the source code of one of the spark ML algorithms. > Again, thank you all. > > > On Mon, Oct 17, 2016 at 10:54 PM, Thakrar, Jayesh > wrote: > > Yes, iterating over a dataframe and making changes is not uncommon. > > > > Ofcourse RDDs, dataframes and datasets are immultable, but there is some > > optimization in the optimizer that can potentially help to dampen the > > effect/impact of creating a new rdd, df or ds. > > > > Also, the use-case you cited is similar to what is done in regression, > > clustering and other algorithms. > > > > I.e. you iterate making a change to a dataframe/dataset until the desired > > condition. > > > > E.g. see this - > > https://spark.apache.org/docs/1.6.1/ml-classification- > regression.html#linear-regression > > and the setting of the iteration ceiling > > > > > > > > // instantiate the base classifier > > > > val classifier = new LogisticRegression() > > > > .setMaxIter(params.maxIter) > > > > .setTol(params.tol) > > > > .setFitIntercept(params.fitIntercept) > > > > > > > > Now the impact of that depends on a variety of things. > > > > E.g. if the data is completely contained in memory and there is no spill > > over to disk, it might not be a big issue (ofcourse there will still be > > memory, CPU and network overhead/latency). > > > > If you are looking at storing the data on disk (e.g. as part of a > checkpoint > > or explicit storage), then there can be substantial I/O activity. > > > > > > > > > > > > > > > > From: Xi Shen > > Date: Monday, October 17, 2016 at 2:54 AM > > To: Divya Gehlot , Mungeol Heo > > > > Cc: "user @spark" > > Subject: Re: Is spark a right tool for updating a dataframe repeatedly > > > > > > > > I think most of the "big data" tools, like Spark and Hive, are not > designed > > to edit data. They are only designed to query data. I wonder in what > > scenario you need to update large volume of data repetitively. > > > > > > > > > > > > On Mon, Oct 17, 2016 at 2:00 PM Divya Gehlot > > wrote: > > > > If my understanding is correct about your query > > > > In spark Dataframes are immutable , cant update the dataframe. > > > > you have to create a new dataframe to update the current dataframe . > > > > > > > > > > > > Thanks, > > > > Divya > > > > > > > > > > > > On 17 October 2016 at 09:50, Mungeol Heo wrote: > > > > Hello, everyone. > > > > As I mentioned at the tile, I wonder that is spark a right tool for > > updating a data frame repeatedly until there is no more date to > > update. > > > > For example. > > > > while (if there was a updating) { > > update a data frame A > > } > > > > If it is the right tool, then what is the best practice for this kind of > > work? > > Thank you. > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > > > -- > > > > > > Thanks, > > David S. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Time-unit of RDD.countApprox timeout parameter
It only exists in the latest docs, not in versions <= 1.6. From: Sean Owen Sent: Tuesday, October 4, 2016 1:51:49 PM To: Sesterhenn, Mike; user@spark.apache.org Subject: Re: Time-unit of RDD.countApprox timeout parameter The API docs already say: "maximum time to wait for the job, in milliseconds" On Tue, Oct 4, 2016 at 7:14 PM Sesterhenn, Mike mailto:msesterh...@cars.com>> wrote: Nevermind. Through testing it seems it is MILLISECONDS. This should be added to the docs. From: Sesterhenn, Mike mailto:msesterh...@cars.com>> Sent: Tuesday, October 4, 2016 1:02:25 PM To: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Time-unit of RDD.countApprox timeout parameter Hi all, Does anyone know what the unit is on the 'timeout' parameter to the RDD.countApprox() function? (ie. is that seconds, milliseconds, nanoseconds, ...?) I was searching through the source but it got hairy pretty quickly. Thanks
Re: Time-unit of RDD.countApprox timeout parameter
Nevermind. Through testing it seems it is MILLISECONDS. This should be added to the docs. From: Sesterhenn, Mike Sent: Tuesday, October 4, 2016 1:02:25 PM To: user@spark.apache.org Subject: Time-unit of RDD.countApprox timeout parameter Hi all, Does anyone know what the unit is on the 'timeout' parameter to the RDD.countApprox() function? (ie. is that seconds, milliseconds, nanoseconds, ...?) I was searching through the source but it got hairy pretty quickly. Thanks
Time-unit of RDD.countApprox timeout parameter
Hi all, Does anyone know what the unit is on the 'timeout' parameter to the RDD.countApprox() function? (ie. is that seconds, milliseconds, nanoseconds, ...?) I was searching through the source but it got hairy pretty quickly. Thanks
Re: Issue with rogue data in csv file used in Spark application
Hi Mich - Can you run a filter command on df1 prior to your map for any rows where p(3).toString != '-' then run your map command? Thanks Mike On Tue, Sep 27, 2016 at 5:06 PM, Mich Talebzadeh wrote: > Thanks guys > > Actually these are the 7 rogue rows. The column 0 is the Volume column > which means there was no trades on those days > > > *cat stock.csv|grep ",0"*SAP SE,SAP, 23-Dec-11,-,-,-,40.56,0 > SAP SE,SAP, 21-Apr-11,-,-,-,45.85,0 > SAP SE,SAP, 30-Dec-10,-,-,-,38.10,0 > SAP SE,SAP, 23-Dec-10,-,-,-,38.36,0 > SAP SE,SAP, 30-Apr-08,-,-,-,32.39,0 > SAP SE,SAP, 29-Apr-08,-,-,-,33.05,0 > SAP SE,SAP, 28-Apr-08,-,-,-,32.60,0 > > So one way would be to exclude the rows that there was no volume of trade > that day when cleaning up the csv file > > *cat stock.csv|grep -v **",0"* > > and that works. Bearing in mind that putting 0s in place of "-" will skew > the price plot. > > BTW I am using Spark csv as well > > val df1 = spark.read.option("header", true).csv(location) > > This is the class and the mapping > > > case class columns(Stock: String, Ticker: String, TradeDate: String, Open: > Float, High: Float, Low: Float, Close: Float, Volume: Integer) > val df2 = df1.map(p => columns(p(0).toString, p(1).toString, > p(2).toString, p(3).toString.toFloat, p(4).toString.toFloat, > p(5).toString.toFloat, p(6).toString.toFloat, p(7).toString.toInt)) > > > In here I have > > p(3).toString.toFloat > > How can one check for rogue data in p(3)? > > > Thanks > > > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 27 September 2016 at 21:49, Mich Talebzadeh > wrote: > >> >> I have historical prices for various stocks. >> >> Each csv file has 10 years trade one row per each day. >> >> These are the columns defined in the class >> >> case class columns(Stock: String, Ticker: String, TradeDate: String, >> Open: Float, High: Float, Low: Float, Close: Float, Volume: Integer) >> >> The issue is with Open, High, Low, Close columns that all are defined as >> Float. >> >> Most rows are OK like below but the red one with "-" defined as Float >> causes issues >> >> Date Open High Low Close Volume >> 27-Sep-16 80.91 80.93 79.87 80.85 1873158 >> 23-Dec-11 - --40.56 0 >> >> Because the prices are defined as Float, these rows cause the application >> to crash >> scala> val rs = df2.filter(changeToDate("TradeDate") >= >> monthsago).select((changeToDate("TradeDate").as("TradeDate") >> ),(('Close+'Open)/2).as("AverageDailyPrice"), 'Low.as("Day's Low"), >> 'High.as("Day's High")).orderBy("TradeDate").collect >> 16/09/27 21:48:53 ERROR Executor: Exception in task 0.0 in stage 61.0 >> (TID 260) >> java.lang.NumberFormatException: For input string: "-" >> >> >> One way is to define the prices as Strings but that is not >> meaningful. Alternatively do the clean up before putting csv in HDFS but >> that becomes tedious and error prone. >> >> Any ideas will be appreciated. >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> > >
Re: Total Shuffle Read and Write Size of Spark workload
While the SparkListener method is likely all around better, if you just need this quickly you should be able to do a SSH local port redirection over putty. In the putty configuration: - Go to Connection: SSH: Tunnels - In the Source port field, enter 4040 (or another unused port on your machine) - In the Destination field, enter ipaddress:4040 where ipaddress is the IP you'd normally access of the spark server. If it's the same server you're SSH'ing to it can be 127.0.0.1 - Make sure the "Local" and "Auto" radio buttons are checked and click "Add" - Go back to the Session section and enter the IP / etc configuration - If you're going to use this often, enter a name and save the configuration. Otherwise click open and login as normal. Once the session is established, you should be able to open a web browser to http://localhost:4040 which will redirect over the SSH session to the remote server. Note that any link that references a non-accessible IP address can't be reached (though you can also setup putty / SSH as a proxy to get around that if needed). Thanks Mike On Mon, Sep 19, 2016 at 4:43 AM, Cristina Rozee wrote: > I Mich, > > I do not have access to UI as I am running jobs on remote system and I can > access it using putty only so only console or logs files are available to > me. > > Thanks > > On Mon, Sep 19, 2016 at 11:36 AM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Spark UI on port 4040 by default >> >> HTH >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> On 19 September 2016 at 10:34, Cristina Rozee >> wrote: >> >>> Could you please explain a little bit? >>> >>> >>> On Sun, Sep 18, 2016 at 10:19 PM, Jacek Laskowski >>> wrote: >>> >>>> SparkListener perhaps? >>>> >>>> Jacek >>>> >>>> On 15 Sep 2016 1:41 p.m., "Cristina Rozee" >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> I am running a spark application and I would like to know the total >>>>> amount of shuffle data (read + write ) so could anyone let me know how to >>>>> get this information? >>>>> >>>>> Thank you >>>>> Cristina. >>>>> >>>> >>> >> >
Re: year out of range
My guess is there's some row that does not match up with the expected data. While slower, I've found RDDs to be easier to troubleshoot this kind of thing until you sort out exactly what's happening. Something like: raw_data = sc.textFile("") rowcounts = raw_data.map(lambda x: (len(x.split(",")), 1)).reduceByKey(lambda x,y: x+y) rowcounts.take(5) badrows = raw_data.filter(lambda x: len(x.split(",")) != ) if badrows.count() > 0: badrows.saveAsTextFile("") You should be able to tell if there are any rows with column counts that don't match up (the thing that usually bites me with CSV conversions). Assuming these all match to what you want, I'd try mapping the unparsed date column out to separate fields and try to see if a year field isn't matching the expected values. Thanks Mike On Thu, Sep 8, 2016 at 8:15 AM, Daniel Lopes wrote: > Thanks, > > I *tested* the function offline and works > Tested too with select * from after convert the data and see the new data > good > *but* if I *register as temp table* to *join other table* stilll shows *the > same error*. > > ValueError: year out of range > > Best, > > *Daniel Lopes* > Chief Data and Analytics Officer | OneMatch > c: +55 (18) 99764-2733 | https://www.linkedin.com/in/dslopes > > www.onematch.com.br > <http://www.onematch.com.br/?utm_source=EmailSignature&utm_term=daniel-lopes> > > On Thu, Sep 8, 2016 at 9:43 AM, Marco Mistroni > wrote: > >> Daniel >> Test the parse date offline to make sure it returns what you expect >> If it does in spark shell create a df with 1 row only and run ur UDF. U >> should b able to see issue >> If not send me a reduced CSV file at my email and I give it a try this >> eve hopefully someone else will b able to assist in meantime >> U don't need to run a full spark app to debug issue >> Ur problem. Is either in the parse date or in what gets passed to the UDF >> Hth >> >> On 8 Sep 2016 1:31 pm, "Daniel Lopes" wrote: >> >>> Thanks Marco for your response. >>> >>> The field came encoded by SQL Server in locale pt_BR. >>> >>> The code that I am formating is: >>> >>> -- >>> def parse_date(argument, format_date='%Y-%m%d %H:%M:%S'): >>> try: >>> locale.setlocale(locale.LC_TIME, 'pt_BR.utf8') >>> return datetime.strptime(argument, format_date) >>> except: >>> return None >>> >>> convert_date = funcspk.udf(lambda x: parse_date(x, '%b %d %Y %H:%M'), >>> TimestampType()) >>> >>> transacoes = transacoes.withColumn('tr_Vencimento', >>> convert_date(transacoes.*tr_Vencimento*)) >>> >>> -- >>> >>> the sample is >>> >>> - >>> +-++-+-- >>> --+--+---+-+ >>> -+--+--+ >>> +-+-+--+ >>> +++- >>> ---+--+++--+ >>> -+--+ >>> |tr_NumeroContrato|tr_TipoDocumento|*tr_Vencimento*|tr_Valor|tr_Dat >>> aRecebimento|tr_TaxaMora|tr_DescontoMaximo|tr_DescontoMaximo >>> Corr|tr_ValorAtualizado|tr_ComGarantia|tr_ValorDesconto|tr_ >>> ValorJuros|tr_ValorMulta|tr_DataDevolucaoCheque|tr_ValorCorrigidoContratante| >>> tr_DataNotificacao|tr_Banco|tr_Praca|tr_DescricaoAlinea|tr_ >>> Enquadramento|tr_Linha|tr_Arquivo|tr_DataImportacao|tr_Agencia| >>> +-++-+-- >>> --+--+---+-+ >>> -+--+--+ >>> +-+-+--+ >>> +++- >>> ---+--+++--+ >>> -+--+ >>> | 992600153001||*Jul 20 2015 12:00*| 254.35| >>>null| null| null| null| >>>null| 0|null| null| null| >>>null| 254.35|2015-07-20 12:00:...|null| >>>null| null|
Re: Best ID Generator for ID field in parquet ?
Hi Kevin - There's not really a race condition as the 64 bit value is split into a 31 bit partition id (the upper portion) and a 33 bit incrementing id. In other words, as long as each partition contains fewer than 8 billion entries there should be no overlap and there is not any communication between executors to get the next id. Depending on what you mean by duplication, there shouldn't be any within a column as long as you maintain some sort of state (ie, the startval Mich shows, a previous maxid, etc.) While these ids are unique in that sense, they are not the same as a uuid / guid which are generally unique across all entries assuming enough randomness. Think of the monotonically increasing id as an auto-incrementing column (with potentially massive gaps in ids) from a relational database. Thanks Mike On Sun, Sep 4, 2016 at 6:41 PM, Kevin Tran wrote: > Hi Mich, > Thank you for your input. > Does monotonically incremental ensure about race condition and does it > duplicates the ids at some points with multi threads, multi instances, ... ? > > Even System.currentTimeMillis() still has duplication? > > Cheers, > Kevin. > > On Mon, Sep 5, 2016 at 12:30 AM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> You can create a monotonically incrementing ID column on your table >> >> scala> val ll_18740868 = spark.table("accounts.ll_18740868") >> scala> val startval = 1 >> scala> val df = ll_18740868.withColumn("id", >> *monotonically_increasing_id()+* startval).show (2) >> +---+---+-+-+--- >> ---+---++---+---+ >> |transactiondate|transactiontype| sortcode|accountnumber|transac >> tiondescription|debitamount|creditamount|balance| id| >> +---+---+-+-+--- >> ---+---++---+---+ >> | 2011-12-30|DEB|'30-64-72| 18740868| WWW.GFT.COM >> CD 4628 | 50.0|null| 304.89| 1| >> | 2011-12-30|DEB|'30-64-72| 18740868| >> TDA.CONFECC.D.FRE...| 19.01|null| 354.89| 2| >> +---+---+-+-+--- >> ---+---++---+---+ >> >> >> Now you have a new ID column >> >> HTH >> >> >> >> >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> On 4 September 2016 at 12:43, Kevin Tran wrote: >> >>> Hi everyone, >>> Please give me your opinions on what is the best ID Generator for ID >>> field in parquet ? >>> >>> UUID.randomUUID(); >>> AtomicReference currentTime = new AtomicReference<>(System.curre >>> ntTimeMillis()); >>> AtomicLong counter = new AtomicLong(0); >>> >>> >>> Thanks, >>> Kevin. >>> >>> >>> >>> https://issues.apache.org/jira/browse/SPARK-8406 (Race condition when >>> writing Parquet files) >>> https://github.com/apache/spark/pull/6864/files >>> >> >> >
Re: Spark 2.0 - Insert/Update to a DataFrame
Pyspark example based on the data you provided (obviously your dataframes will come from whatever source you have, not entered directly). This uses an intermediary dataframe with grouped data for clarity, but you could pull this off in other ways. -- Code -- from pyspark.sql.types import * from pyspark.sql.functions import col fcst_schema = StructType([ StructField('Product', StringType(), nullable=False), StructField('fcst_qty', IntegerType(), nullable=False) ]) fcst_data = sc.parallelize([Row("A", 100), Row("B", 50)]) so_schema = StructType([ StructField('OrderNum', IntegerType(), nullable=False), StructField('ItemNum', StringType(), nullable=False), StructField('Sales_qty', IntegerType(), nullable=False) ]) so_data = sc.parallelize([ Row(101, "A", 10), Row(101, "B", 5), Row(102, "A", 5), Row(102, "B", 10)]) fcst_df = sqlContext.createDataFrame(fcst_data, fcst_schema) so_df = sqlContext.createDataFrame(so_data, so_schema) fcst_df.show() so_df.show() orderTotals_df = so_df.groupBy('ItemNum').sum('Sales_qty').select('ItemNum',col('sum(Sales_qty)').alias('Sales_qty')) orderTotals_df.show() fcst_df.join(orderTotals_df, fcst_df.Product == orderTotals_df.ItemNum, 'left_outer').select(fcst_df.Product, (fcst_df.fcst_qty - orderTotals_df.Sales_qty).alias('fcst_qty')).show() -- Output examples (fcst_df, so_df, orderTotals_df, and the resultant df) -- +---++ |Product|fcst_qty| +---++ | A| 100| | B| 50| +---++ ++---+-+ |OrderNum|ItemNum|Sales_qty| ++---+-+ | 101| A| 10| | 101| B| 5| | 102| A| 5| | 102| B| 10| ++---+-+ +---+-+ |ItemNum|Sales_qty| +---+-+ | B| 15| | A| 15| +---+-+ +---++ |Product|fcst_qty| +---++ | B| 35| | A| 85| +---++ The other languages should work similarly. Honestly, I'd probably just setup the dataframes and write it in SQL, possibly with a UDF, to keep things a little more clear. Thanks Mike On Fri, Aug 26, 2016 at 4:45 PM, Subhajit Purkayastha wrote: > So the data in the fcst dataframe is like this > > > > Product, fcst_qty > > A 100 > > B 50 > > > > Sales DF has data like this > > > > Order# Item#Sales qty > > 101 A 10 > > 101 B 5 > > 102 A 5 > > 102 B 10 > > > > I want to update the FCSt DF data, based on Product=Item# > > > > So the resultant FCST DF should have data > > Product, fcst_qty > > A 85 > > B 35 > > > > Hope it helps > > > > If I join the data between the 2 DFs (based on Product# and item#), I will > get a cartesion join and my result will not be what I want > > > > Thanks for your help > > > > > > *From:* Mike Metzger [mailto:m...@flexiblecreations.com] > *Sent:* Friday, August 26, 2016 2:12 PM > > *To:* Subhajit Purkayastha > *Cc:* user @spark > *Subject:* Re: Spark 2.0 - Insert/Update to a DataFrame > > > > Without seeing exactly what you were wanting to accomplish, it's hard to > say. A Join is still probably the method I'd suggest using something like: > > > > select (FCST.quantity - SO.quantity) as quantity > > > > from FCST > > LEFT OUTER JOIN > > SO ON FCST.productid = SO.productid > > WHERE > > > > > > with specifics depending on the layout and what language you're using. > > > > Thanks > > > > Mike > > > > On Fri, Aug 26, 2016 at 3:29 PM, Subhajit Purkayastha > wrote: > > Mike, > > > > The grains of the dataFrame are different. > > > > I need to reduce the forecast qty (which is in the FCST DF) based on the > sales qty (coming from the sales order DF) > > > > Hope it helps > > > > Subhajit > > > > *From:* Mike Metzger [mailto:m...@flexiblecreations.com] > *Sent:* Friday, August 26, 2016 1:13 PM > *To:* Subhajit Purkayastha > *Cc:* user @spark > *Subject:* Re: Spark 2.0 - Insert/Update to a DataFrame > > > > Without seeing the makeup of the Dataframes nor what your logic is for > updating them, I'd suggest doing a join of the Forecast DF with the > appropriate columns from the SalesOrder DF. > > > > Mike > > > > On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha > wrote: > > I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need > to update the Forecast Dataframe record(s), based on the SaleOrder DF > record. What is the best way to achieve this functionality > > > > >
Re: Please assist: Building Docker image containing spark 2.0
I would also suggest building the container manually first and setup everything you specifically need. Once done, you can then grab the history file, pull out the invalid commands and build out the completed Dockerfile. Trying to troubleshoot an installation via Dockerfile is often an exercise in futility. Thanks Mike On Fri, Aug 26, 2016 at 5:14 PM, Michael Gummelt wrote: > Run with "-X -e" like the error message says. See what comes out. > > On Fri, Aug 26, 2016 at 2:23 PM, Tal Grynbaum > wrote: > >> Did you specify -Dscala-2.10 >> As in >> ./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4 >> -Dscala-2.10 -DskipTests clean package >> If you're building with scala 2.10 >> >> On Sat, Aug 27, 2016, 00:18 Marco Mistroni wrote: >> >>> Hello Michael >>> uhm i celebrated too soon >>> Compilation of spark on docker image went near the end and then it >>> errored out with this message >>> >>> INFO] BUILD FAILURE >>> [INFO] >>> >>> [INFO] Total time: 01:01 h >>> [INFO] Finished at: 2016-08-26T21:12:25+00:00 >>> [INFO] Final Memory: 69M/324M >>> [INFO] >>> >>> [ERROR] Failed to execute goal >>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile >>> (scala-compile-first) on project spark-mllib_2.11: Execution >>> scala-compile-first of goal >>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile >>> failed. CompileFailed -> [Help 1] >>> [ERROR] >>> [ERROR] To see the full stack trace of the errors, re-run Maven with the >>> -e switch. >>> [ERROR] Re-run Maven using the -X switch to enable full debug logging. >>> [ERROR] >>> [ERROR] For more information about the errors and possible solutions, >>> please read the following articles: >>> [ERROR] [Help 1] http://cwiki.apache.org/conflu >>> ence/display/MAVEN/PluginExecutionException >>> [ERROR] >>> [ERROR] After correcting the problems, you can resume the build with the >>> command >>> [ERROR] mvn -rf :spark-mllib_2.11 >>> The command '/bin/sh -c ./build/mvn -Pyarn -Phadoop-2.4 >>> -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero code: >>> 1 >>> >>> what am i forgetting? >>> once again, last command i launched on the docker file is >>> >>> >>> RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests >>> clean package >>> >>> kr >>> >>> >>> >>> On Fri, Aug 26, 2016 at 6:18 PM, Michael Gummelt >> > wrote: >>> >>>> :) >>>> >>>> On Thu, Aug 25, 2016 at 2:29 PM, Marco Mistroni >>>> wrote: >>>> >>>>> No i wont accept that :) >>>>> I can't believe i have wasted 3 hrs for a space! >>>>> >>>>> Many thanks MIchael! >>>>> >>>>> kr >>>>> >>>>> On Thu, Aug 25, 2016 at 10:01 PM, Michael Gummelt < >>>>> mgumm...@mesosphere.io> wrote: >>>>> >>>>>> You have a space between "build" and "mvn" >>>>>> >>>>>> On Thu, Aug 25, 2016 at 1:31 PM, Marco Mistroni >>>>>> wrote: >>>>>> >>>>>>> HI all >>>>>>> sorry for the partially off-topic, i hope there's someone on the >>>>>>> list who has tried the same and encountered similar issuse >>>>>>> >>>>>>> Ok so i have created a Docker file to build an ubuntu container >>>>>>> which inlcudes spark 2.0, but somehow when it gets to the point where it >>>>>>> has to kick off ./build/mvn command, it errors out with the following >>>>>>> >>>>>>> ---> Running in 8c2aa6d59842 >>>>>>> /bin/sh: 1: ./build: Permission denied >>>>>>> The command '/bin/sh -c ./build mvn -Pyarn -Phadoop-2.4 >>>>>>> -Dhadoop.version=2.4.0 -DskipTests clean package' returned a non-zero >>>>>>> code: >>>>>>> 126 >>>>>>> >>>>>>> I am puzzled as i am root when i build the container, so i should >>>>>>> not encounter this issue (btw, if instead of running mvn from the build >>>>>>> directory i use the mvn which i installed on the container, it works >>>>>>> fine >>>>>>> but it's painfully slow) >>>>>>> >>>>>>> here are the details of my Spark command( scala 2.10, java 1.7 , mvn >>>>>>> 3.3.9 and git have already been installed) >>>>>>> >>>>>>> # Spark >>>>>>> RUN echo "Installing Apache spark 2.0" >>>>>>> RUN git clone git://github.com/apache/spark.git >>>>>>> WORKDIR /spark >>>>>>> RUN ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 >>>>>>> -DskipTests clean package >>>>>>> >>>>>>> >>>>>>> Could anyone assist pls? >>>>>>> >>>>>>> kindest regarsd >>>>>>> Marco >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Michael Gummelt >>>>>> Software Engineer >>>>>> Mesosphere >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Michael Gummelt >>>> Software Engineer >>>> Mesosphere >>>> >>> >>> > > > -- > Michael Gummelt > Software Engineer > Mesosphere >
Re: Spark 2.0 - Insert/Update to a DataFrame
Without seeing exactly what you were wanting to accomplish, it's hard to say. A Join is still probably the method I'd suggest using something like: select (FCST.quantity - SO.quantity) as quantity from FCST LEFT OUTER JOIN SO ON FCST.productid = SO.productid WHERE with specifics depending on the layout and what language you're using. Thanks Mike On Fri, Aug 26, 2016 at 3:29 PM, Subhajit Purkayastha wrote: > Mike, > > > > The grains of the dataFrame are different. > > > > I need to reduce the forecast qty (which is in the FCST DF) based on the > sales qty (coming from the sales order DF) > > > > Hope it helps > > > > Subhajit > > > > *From:* Mike Metzger [mailto:m...@flexiblecreations.com] > *Sent:* Friday, August 26, 2016 1:13 PM > *To:* Subhajit Purkayastha > *Cc:* user @spark > *Subject:* Re: Spark 2.0 - Insert/Update to a DataFrame > > > > Without seeing the makeup of the Dataframes nor what your logic is for > updating them, I'd suggest doing a join of the Forecast DF with the > appropriate columns from the SalesOrder DF. > > > > Mike > > > > On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha > wrote: > > I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need > to update the Forecast Dataframe record(s), based on the SaleOrder DF > record. What is the best way to achieve this functionality > > >
Re: Spark 2.0 - Insert/Update to a DataFrame
Without seeing the makeup of the Dataframes nor what your logic is for updating them, I'd suggest doing a join of the Forecast DF with the appropriate columns from the SalesOrder DF. Mike On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha wrote: > I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need > to update the Forecast Dataframe record(s), based on the SaleOrder DF > record. What is the best way to achieve this functionality >
Re: UDF on lpad
Is this what you're after? def padString(id: Int, chars: String, length: Int): String = chars * length + id.toString padString(123, "0", 10) res4: String = 00123 Mike On Thu, Aug 25, 2016 at 12:39 PM, Mich Talebzadeh wrote: > Thanks Mike. > > Can one turn the first example into a generic UDF similar to the output > from below where 10 "0" are padded to the left of 123 > > def padString(id: Int, chars: String, length: Int): String = > (0 until length).map(_ => chars(Random.nextInt(chars.length))).mkString > + id.toString > > scala> padString(123, "0", 10) > res6: String = 00123 > > Cheers > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 25 August 2016 at 17:29, Mike Metzger > wrote: > >> Are you trying to always add x numbers of digits / characters or are you >> trying to pad to a specific length? If the latter, try using format >> strings: >> >> // Pad to 10 0 characters >> val c = 123 >> f"$c%010d" >> >> // Result is 000123 >> >> >> // Pad to 10 total characters with 0's >> val c = 123.87 >> f"$c%010.2f" >> >> // Result is 123.87 >> >> >> You can also do inline operations on the values before formatting. I've >> used this specifically to pad for hex digits from strings. >> >> val d = "100" >> val hexstring = f"0x${d.toInt}%08X" >> >> // hexstring is 0x0064 >> >> >> Thanks >> >> Mike >> >> On Thu, Aug 25, 2016 at 9:27 AM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Ok I tried this >>> >>> def padString(s: String, chars: String, length: Int): String = >>> | (0 until length).map(_ => >>> chars(Random.nextInt(chars.length))).mkString >>> + s >>> >>> padString: (s: String, chars: String, length: Int)String >>> And use it like below: >>> >>> Example left pad the figure 12345.87 with 10 "0"s >>> >>> padString("12345.87", "0", 10) >>> res79: String = 0012345.87 >>> >>> Any better way? >>> >>> Thanks >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> On 25 August 2016 at 12:06, Mich Talebzadeh >>> wrote: >>> >>>> Hi, >>>> >>>> This UDF on substring works >>>> >>>> scala> val SubstrUDF = udf { (s: String, start: Int, end: Int) => >>>> s.substring(start, end) } >>>> SubstrUDF: org.apache.spark.sql.expressions.UserDefinedFunction = >>>> UserDefinedFunction(,StringType,Some(List(StringType, >>>> IntegerType, IntegerType))) >>>> >>>> I want something similar to this >>>> >>>> scala> sql("""select lpad("str", 10, "0")""").show >>>> ++ >>>> |lpad(str, 10, 0)| >>>> ++ >>>> | 000str| >>>> ++ >>>> >>>> scala> val SubstrUDF = udf { (s: String, len: Int, chars: String) => >>>> lpad(s, len, chars) } >>>> :40: error: type mismatch; >>>> found : String >>>> required: org.apache.spark.sql.Column >>>>val SubstrUDF = udf { (s: String, len: Int, chars: String) => >>>> lpad(s, len, chars) } >>>> >>>> >>>> Any ideas? >>>> >>>> Thanks >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>> >>> >> >
Re: UDF on lpad
Are you trying to always add x numbers of digits / characters or are you trying to pad to a specific length? If the latter, try using format strings: // Pad to 10 0 characters val c = 123 f"$c%010d" // Result is 000123 // Pad to 10 total characters with 0's val c = 123.87 f"$c%010.2f" // Result is 123.87 You can also do inline operations on the values before formatting. I've used this specifically to pad for hex digits from strings. val d = "100" val hexstring = f"0x${d.toInt}%08X" // hexstring is 0x0064 Thanks Mike On Thu, Aug 25, 2016 at 9:27 AM, Mich Talebzadeh wrote: > Ok I tried this > > def padString(s: String, chars: String, length: Int): String = > | (0 until length).map(_ => > chars(Random.nextInt(chars.length))).mkString > + s > > padString: (s: String, chars: String, length: Int)String > And use it like below: > > Example left pad the figure 12345.87 with 10 "0"s > > padString("12345.87", "0", 10) > res79: String = 0012345.87 > > Any better way? > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 25 August 2016 at 12:06, Mich Talebzadeh > wrote: > >> Hi, >> >> This UDF on substring works >> >> scala> val SubstrUDF = udf { (s: String, start: Int, end: Int) => >> s.substring(start, end) } >> SubstrUDF: org.apache.spark.sql.expressions.UserDefinedFunction = >> UserDefinedFunction(,StringType,Some(List(StringType, >> IntegerType, IntegerType))) >> >> I want something similar to this >> >> scala> sql("""select lpad("str", 10, "0")""").show >> ++ >> |lpad(str, 10, 0)| >> ++ >> | 000str| >> ++ >> >> scala> val SubstrUDF = udf { (s: String, len: Int, chars: String) => >> lpad(s, len, chars) } >> :40: error: type mismatch; >> found : String >> required: org.apache.spark.sql.Column >>val SubstrUDF = udf { (s: String, len: Int, chars: String) => >> lpad(s, len, chars) } >> >> >> Any ideas? >> >> Thanks >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> > >
Re: Sum array values by row in new column
Assuming you know the number of elements in the list, this should work: df.withColumn('total', df["_1"].getItem(0) + df["_1"].getItem(1) + df["_1"].getItem(2)) Mike On Mon, Aug 15, 2016 at 12:02 PM, Javier Rey wrote: > Hi everyone, > > I have one dataframe with one column this column is an array of numbers, > how can I sum each array by row a obtain a new column with sum? in pyspark. > > Example: > > ++ > | numbers| > ++ > |[10, 20, 30]| > |[40, 50, 60]| > |[70, 80, 90]| > ++ > > The idea is obtain the same df with a new column with totals: > > ++-- > | numbers| | > ++-- > |[10, 20, 30]|60 | > |[40, 50, 60]|150 | > |[70, 80, 90]|240 | > ++-- > > Regards! > > Samir > > > >
Re: Generating unique id for a column in Row without breaking into RDD and joining back
If I understand your question correctly, the current implementation doesn't allow a starting value, but it's easy enough to pull off with something like: val startval = 1 df.withColumn('id', monotonicallyIncreasingId + startval) Two points - your test shows what happens with a single partition. With multiple partitions, the id values will inherently be much higher (due to the partition id being the upper 31 bits of the value.) The other note is that the startval in this case would need to be communicated along with the job. It may be worth defining it as a broadcast variable and referencing it that way so there's less cluster communication involved. Honestly I doubt there's a lot of variance with this small of a value but it's a good habit to get into. Thanks Mike On Fri, Aug 5, 2016 at 11:33 AM, Mich Talebzadeh wrote: > Thanks Mike for this. > > This is Scala. As expected it adds the id column to the end of the column > list starting from 0 0 > > scala> val df = ll_18740868.withColumn("id", > monotonically_increasing_id()).show > (2) > +---+---+-+-+--- > ---+---++---+---+ > |transactiondate|transactiontype| sortcode|accountnumber|transac > tiondescription|debitamount|creditamount|balance| id| > +---+---+-+-+--- > ---+---++---+---+ > | 2009-12-31|CPT|'30-64-72| 18740868| LTSB STH > KENSINGT...| 90.0|null| 400.0| 0| > | 2009-12-31|CPT|'30-64-72| 18740868| LTSB CHELSEA > (309...| 10.0|null| 490.0| 1| > +---+---+-+-+--- > ---+---++---+---+ > > Can one provide the starting value say 1? > > Cheers > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 5 August 2016 at 16:45, Mike Metzger > wrote: > >> You can use the monotonically_increasing_id method to generate guaranteed >> unique (but not necessarily consecutive) IDs. Calling something like: >> >> df.withColumn("id", monotonically_increasing_id()) >> >> You don't mention which language you're using but you'll need to pull in >> the sql.functions library. >> >> Mike >> >> On Aug 5, 2016, at 9:11 AM, Tony Lane wrote: >> >> Ayan - basically i have a dataset with structure, where bid are unique >> string values >> >> bid: String >> val : integer >> >> I need unique int values for these string bid''s to do some processing in >> the dataset >> >> like >> >> id:int (unique integer id for each bid) >> bid:String >> val:integer >> >> >> >> -Tony >> >> On Fri, Aug 5, 2016 at 6:35 PM, ayan guha wrote: >> >>> Hi >>> >>> Can you explain a little further? >>> >>> best >>> Ayan >>> >>> On Fri, Aug 5, 2016 at 10:14 PM, Tony Lane >>> wrote: >>> >>>> I have a row with structure like >>>> >>>> identifier: String >>>> value: int >>>> >>>> All identifier are unique and I want to generate a unique long id for >>>> the data and get a row object back for further processing. >>>> >>>> I understand using the zipWithUniqueId function on RDD, but that would >>>> mean first converting to RDD and then joining back the RDD and dataset >>>> >>>> What is the best way to do this ? >>>> >>>> -Tony >>>> >>>> >>> >>> >>> -- >>> Best Regards, >>> Ayan Guha >>> >> >> >
Re: Generating unique id for a column in Row without breaking into RDD and joining back
Should be pretty much the same code for Scala - import java.util.UUID UUID.randomUUID If you need it as a UDF, just wrap it accordingly. Mike On Fri, Aug 5, 2016 at 11:38 AM, Mich Talebzadeh wrote: > On the same token can one generate a UUID like below in Hive > > hive> select reflect("java.util.UUID", "randomUUID"); > OK > 587b1665-b578-4124-8bf9-8b17ccb01fe7 > > thx > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 5 August 2016 at 17:34, Mike Metzger > wrote: > >> Tony - >> >>From my testing this is built with performance in mind. It's a 64-bit >> value split between the partition id (upper 31 bits ~1billion) and the id >> counter within a partition (lower 33 bits ~8 billion). There shouldn't be >> any added communication between the executors and the driver for that. >> >> I've been toying with an implementation that allows you to specify the >> split for better control along with a start value. >> >> Thanks >> >> Mike >> >> On Aug 5, 2016, at 11:07 AM, Tony Lane wrote: >> >> Mike. >> >> I have figured how to do this . Thanks for the suggestion. It works >> great. I am trying to figure out the performance impact of this. >> >> thanks again >> >> >> On Fri, Aug 5, 2016 at 9:25 PM, Tony Lane wrote: >> >>> @mike - this looks great. How can i do this in java ? what is the >>> performance implication on a large dataset ? >>> >>> @sonal - I can't have a collision in the values. >>> >>> On Fri, Aug 5, 2016 at 9:15 PM, Mike Metzger >> > wrote: >>> >>>> You can use the monotonically_increasing_id method to generate >>>> guaranteed unique (but not necessarily consecutive) IDs. Calling something >>>> like: >>>> >>>> df.withColumn("id", monotonically_increasing_id()) >>>> >>>> You don't mention which language you're using but you'll need to pull >>>> in the sql.functions library. >>>> >>>> Mike >>>> >>>> On Aug 5, 2016, at 9:11 AM, Tony Lane wrote: >>>> >>>> Ayan - basically i have a dataset with structure, where bid are unique >>>> string values >>>> >>>> bid: String >>>> val : integer >>>> >>>> I need unique int values for these string bid''s to do some processing >>>> in the dataset >>>> >>>> like >>>> >>>> id:int (unique integer id for each bid) >>>> bid:String >>>> val:integer >>>> >>>> >>>> >>>> -Tony >>>> >>>> On Fri, Aug 5, 2016 at 6:35 PM, ayan guha wrote: >>>> >>>>> Hi >>>>> >>>>> Can you explain a little further? >>>>> >>>>> best >>>>> Ayan >>>>> >>>>> On Fri, Aug 5, 2016 at 10:14 PM, Tony Lane >>>>> wrote: >>>>> >>>>>> I have a row with structure like >>>>>> >>>>>> identifier: String >>>>>> value: int >>>>>> >>>>>> All identifier are unique and I want to generate a unique long id for >>>>>> the data and get a row object back for further processing. >>>>>> >>>>>> I understand using the zipWithUniqueId function on RDD, but that >>>>>> would mean first converting to RDD and then joining back the RDD and >>>>>> dataset >>>>>> >>>>>> What is the best way to do this ? >>>>>> >>>>>> -Tony >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards, >>>>> Ayan Guha >>>>> >>>> >>>> >>> >> >
Re: Generating unique id for a column in Row without breaking into RDD and joining back
Not that I've seen, at least not in any worker independent way. To guarantee consecutive values you'd have to create a udf or some such that provided a new row id. This probably isn't an issue on small data sets but would cause a lot of added communication on larger clusters / datasets. Mike > On Aug 5, 2016, at 11:21 AM, janardhan shetty wrote: > > Mike, > > Any suggestions on doing it for consequitive id's? > >> On Aug 5, 2016 9:08 AM, "Tony Lane" wrote: >> Mike. >> >> I have figured how to do this . Thanks for the suggestion. It works great. >> I am trying to figure out the performance impact of this. >> >> thanks again >> >> >>> On Fri, Aug 5, 2016 at 9:25 PM, Tony Lane wrote: >>> @mike - this looks great. How can i do this in java ? what is the >>> performance implication on a large dataset ? >>> >>> @sonal - I can't have a collision in the values. >>> >>>> On Fri, Aug 5, 2016 at 9:15 PM, Mike Metzger >>>> wrote: >>>> You can use the monotonically_increasing_id method to generate guaranteed >>>> unique (but not necessarily consecutive) IDs. Calling something like: >>>> >>>> df.withColumn("id", monotonically_increasing_id()) >>>> >>>> You don't mention which language you're using but you'll need to pull in >>>> the sql.functions library. >>>> >>>> Mike >>>> >>>>> On Aug 5, 2016, at 9:11 AM, Tony Lane wrote: >>>>> >>>>> Ayan - basically i have a dataset with structure, where bid are unique >>>>> string values >>>>> >>>>> bid: String >>>>> val : integer >>>>> >>>>> I need unique int values for these string bid''s to do some processing in >>>>> the dataset >>>>> >>>>> like >>>>> >>>>> id:int (unique integer id for each bid) >>>>> bid:String >>>>> val:integer >>>>> >>>>> >>>>> >>>>> -Tony >>>>> >>>>>> On Fri, Aug 5, 2016 at 6:35 PM, ayan guha wrote: >>>>>> Hi >>>>>> >>>>>> Can you explain a little further? >>>>>> >>>>>> best >>>>>> Ayan >>>>>> >>>>>>> On Fri, Aug 5, 2016 at 10:14 PM, Tony Lane >>>>>>> wrote: >>>>>>> I have a row with structure like >>>>>>> >>>>>>> identifier: String >>>>>>> value: int >>>>>>> >>>>>>> All identifier are unique and I want to generate a unique long id for >>>>>>> the data and get a row object back for further processing. >>>>>>> >>>>>>> I understand using the zipWithUniqueId function on RDD, but that would >>>>>>> mean first converting to RDD and then joining back the RDD and dataset >>>>>>> >>>>>>> What is the best way to do this ? >>>>>>> >>>>>>> -Tony >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards, >>>>>> Ayan Guha
Re: Generating unique id for a column in Row without breaking into RDD and joining back
Tony - From my testing this is built with performance in mind. It's a 64-bit value split between the partition id (upper 31 bits ~1billion) and the id counter within a partition (lower 33 bits ~8 billion). There shouldn't be any added communication between the executors and the driver for that. I've been toying with an implementation that allows you to specify the split for better control along with a start value. Thanks Mike > On Aug 5, 2016, at 11:07 AM, Tony Lane wrote: > > Mike. > > I have figured how to do this . Thanks for the suggestion. It works great. > I am trying to figure out the performance impact of this. > > thanks again > > >> On Fri, Aug 5, 2016 at 9:25 PM, Tony Lane wrote: >> @mike - this looks great. How can i do this in java ? what is the >> performance implication on a large dataset ? >> >> @sonal - I can't have a collision in the values. >> >>> On Fri, Aug 5, 2016 at 9:15 PM, Mike Metzger >>> wrote: >>> You can use the monotonically_increasing_id method to generate guaranteed >>> unique (but not necessarily consecutive) IDs. Calling something like: >>> >>> df.withColumn("id", monotonically_increasing_id()) >>> >>> You don't mention which language you're using but you'll need to pull in >>> the sql.functions library. >>> >>> Mike >>> >>>> On Aug 5, 2016, at 9:11 AM, Tony Lane wrote: >>>> >>>> Ayan - basically i have a dataset with structure, where bid are unique >>>> string values >>>> >>>> bid: String >>>> val : integer >>>> >>>> I need unique int values for these string bid''s to do some processing in >>>> the dataset >>>> >>>> like >>>> >>>> id:int (unique integer id for each bid) >>>> bid:String >>>> val:integer >>>> >>>> >>>> >>>> -Tony >>>> >>>>> On Fri, Aug 5, 2016 at 6:35 PM, ayan guha wrote: >>>>> Hi >>>>> >>>>> Can you explain a little further? >>>>> >>>>> best >>>>> Ayan >>>>> >>>>>> On Fri, Aug 5, 2016 at 10:14 PM, Tony Lane >>>>>> wrote: >>>>>> I have a row with structure like >>>>>> >>>>>> identifier: String >>>>>> value: int >>>>>> >>>>>> All identifier are unique and I want to generate a unique long id for >>>>>> the data and get a row object back for further processing. >>>>>> >>>>>> I understand using the zipWithUniqueId function on RDD, but that would >>>>>> mean first converting to RDD and then joining back the RDD and dataset >>>>>> >>>>>> What is the best way to do this ? >>>>>> >>>>>> -Tony >>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards, >>>>> Ayan Guha >
Re: Generating unique id for a column in Row without breaking into RDD and joining back
You can use the monotonically_increasing_id method to generate guaranteed unique (but not necessarily consecutive) IDs. Calling something like: df.withColumn("id", monotonically_increasing_id()) You don't mention which language you're using but you'll need to pull in the sql.functions library. Mike > On Aug 5, 2016, at 9:11 AM, Tony Lane wrote: > > Ayan - basically i have a dataset with structure, where bid are unique string > values > > bid: String > val : integer > > I need unique int values for these string bid''s to do some processing in the > dataset > > like > > id:int (unique integer id for each bid) > bid:String > val:integer > > > > -Tony > >> On Fri, Aug 5, 2016 at 6:35 PM, ayan guha wrote: >> Hi >> >> Can you explain a little further? >> >> best >> Ayan >> >>> On Fri, Aug 5, 2016 at 10:14 PM, Tony Lane wrote: >>> I have a row with structure like >>> >>> identifier: String >>> value: int >>> >>> All identifier are unique and I want to generate a unique long id for the >>> data and get a row object back for further processing. >>> >>> I understand using the zipWithUniqueId function on RDD, but that would mean >>> first converting to RDD and then joining back the RDD and dataset >>> >>> What is the best way to do this ? >>> >>> -Tony >> >> >> >> -- >> Best Regards, >> Ayan Guha >
Re: Add column sum as new column in PySpark dataframe
This is a little ugly, but it may do what you're after - df.withColumn('total', expr("+".join([col for col in df.columns]))) I believe this will handle null values ok, but will likely error if there are any string columns present. Mike On Thu, Aug 4, 2016 at 8:41 AM, Javier Rey wrote: > Hi everybody, > > Sorry, I sent last mesage it was imcomplete this is complete: > > I'm using PySpark and I have a Spark dataframe with a bunch of numeric > columns. I want to add a column that is the sum of all the other columns. > > Suppose my dataframe had columns "a", "b", and "c". I know I can do this: > > df.withColumn('total_col', df.a + df.b + df.c) > > The problem is that I don't want to type out each column individually and > add them, especially if I have a lot of columns. I want to be able to do > this automatically or by specifying a list of column names that I want to > add. Is there another way to do this? > > I find this solution: > > df.withColumn('total', sum(df[col] for col in df.columns)) > > But I get this error: > > "AttributeError: 'generator' object has no attribute '_get_object_id" > > Additionally I want to sum onlt not nulls values. > > Thanks in advance, > > Samir >
Python memory included YARN-monitored memory?
Hi everyone, More of a YARN/OS question than a Spark one, but would be good to clarify this on the docs somewhere once I get an answer. We use PySpark for all our Spark applications running on EMR. Like many users, we're accustomed to seeing the occasional ExecutorLostFailure after YARN kills a container using more memory than it was allocated. We're beginning to tune spark.yarn.executor.memoryOverhead, but before messing around with that I wanted to check if YARN is monitoring the memory usage of both the executor JVM and the spawned pyspark.daemon process or just the JVM? Inspecting things on one of the YARN nodes would seem to indicate this isn't the case since the spawned daemon gets a separate process ID and process group, but I wanted to check to confirm as it could make a big difference to pyspark users hoping to tune things. Thanks, Mike
RE: SparkR query
Thanks, I’m just using RStudio. Running locally is fine, just issue with having cluster in Linux and workers looking for Windows path, Which must be being passed through by the driver I guess. I checked the spark-env.sh on each node and the appropriate SPARK_HOME is set correctly…. From: Sun Rui [mailto:sunrise_...@163.com] Sent: 17 May 2016 11:32 To: Mike Lewis Cc: user@spark.apache.org Subject: Re: SparkR query Lewis, 1. Could you check the values of “SPARK_HOME” environment on all of your worker nodes? 2. How did you start your SparkR shell? On May 17, 2016, at 18:07, Mike Lewis mailto:mle...@nephilaadvisors.co.uk>> wrote: Hi, I have a SparkR driver process that connects to a master running on Linux, I’ve tried to do a simple test, e.g. sc <- sparkR.init(master="spark://my-linux-host.dev.local:7077", sparkEnvir=list(spark.cores.max="4")) x <- SparkR:::parallelize(sc,1:100,2) y <- count(x) But I can see that the worker nodes are failing, they are looking for the Windows (rather than linux path) to Daemon.R 16/05/17 06:52:33 INFO BufferedStreamThread: Fatal error: cannot open file 'E:/Dev/spark/spark-1.4.0-bin-hadoop2.6/R/lib/SparkR/worker/daemon.R': No such file or directory Is this a configuration setting that I’m missing, the worker nodes (linux) shouldn’t be looking in the spark home of the driver (windows) ? If so, I’d appreciate someone letting me know what I need to change/set. Thanks, Mike Lewis -- This email has been sent to you on behalf of Nephila Advisors LLC (“Advisors”). Advisors provides consultancy services to Nephila Capital Ltd. (“Capital”), an investment advisor managed and carrying on business in Bermuda. Advisors and its employees do not act as agents for Capital or the funds it advises and do not have the authority to bind Capital or such funds to any transaction or agreement. The information in this e-mail, and any attachment therein, is confidential and for use by the addressee only. Any use, disclosure, reproduction, modification or distribution of the contents of this e-mail, or any part thereof, other than by the intended recipient, is strictly prohibited. If you are not the intended recipient, please return the e-mail to the sender and delete it from your computer. This email is for information purposes only, nothing contained herein constitutes an offer to sell or buy securities, as such an offer may only be made from a properly authorized offering document. Although Nephila attempts to sweep e-mail and attachments for viruses, it does not guarantee that either are virus-free and accepts no liability for any damage sustained as a result of viruses. -- -- This email has been sent to you on behalf of Nephila Advisors UK (“Advisors UK”). Advisors UK provides consultancy services to Nephila Capital Ltd. (“Capital”), an investment advisor managed and carrying on business in Bermuda. Advisors UK and its employees do not act as agents for Capital or the funds it advises and do not have the authority to bind Capital or such funds to any transaction or agreement. The information in this e-mail, and any attachment therein, is confidential and for use by the addressee only. Any use, disclosure, reproduction, modification or distribution of the contents of this e-mail, or any part thereof, other than by the intended recipient, is strictly prohibited. If you are not the intended recipient, please return the e-mail to the sender and delete it from your computer. This email is for information purposes only, nothing contained herein constitutes an offer to sell or buy securities, as such an offer may only be made from a properly authorized offering document. Although Nephila attempts to sweep e-mail and attachments for viruses, it does not guarantee that either are virus-free and accepts no liability for any damage sustained as a result of viruses. -- -- This email has been sent to you on behalf of Nephila Advisors LLC (“Advisors”). Advisors provides consultancy services to Nephila Capital Ltd. (“Capital”), an investment advisor managed and carrying on business in Bermuda. Advisors and its employees do not act as agents for Capital or the funds it advises and do not have the authority to bind Capital or such funds to any transaction or agreement. The information in this e-mail, and any attach
SparkR query
Hi, I have a SparkR driver process that connects to a master running on Linux, I’ve tried to do a simple test, e.g. sc <- sparkR.init(master="spark://my-linux-host.dev.local:7077", sparkEnvir=list(spark.cores.max="4")) x <- SparkR:::parallelize(sc,1:100,2) y <- count(x) But I can see that the worker nodes are failing, they are looking for the Windows (rather than linux path) to Daemon.R 16/05/17 06:52:33 INFO BufferedStreamThread: Fatal error: cannot open file 'E:/Dev/spark/spark-1.4.0-bin-hadoop2.6/R/lib/SparkR/worker/daemon.R': No such file or directory Is this a configuration setting that I’m missing, the worker nodes (linux) shouldn’t be looking in the spark home of the driver (windows) ? If so, I’d appreciate someone letting me know what I need to change/set. Thanks, Mike Lewis -- This email has been sent to you on behalf of Nephila Advisors LLC (“Advisors”). Advisors provides consultancy services to Nephila Capital Ltd. (“Capital”), an investment advisor managed and carrying on business in Bermuda. Advisors and its employees do not act as agents for Capital or the funds it advises and do not have the authority to bind Capital or such funds to any transaction or agreement. The information in this e-mail, and any attachment therein, is confidential and for use by the addressee only. Any use, disclosure, reproduction, modification or distribution of the contents of this e-mail, or any part thereof, other than by the intended recipient, is strictly prohibited. If you are not the intended recipient, please return the e-mail to the sender and delete it from your computer. This email is for information purposes only, nothing contained herein constitutes an offer to sell or buy securities, as such an offer may only be made from a properly authorized offering document. Although Nephila attempts to sweep e-mail and attachments for viruses, it does not guarantee that either are virus-free and accepts no liability for any damage sustained as a result of viruses. -- -- This email has been sent to you on behalf of Nephila Advisors UK (“Advisors UK”). Advisors UK provides consultancy services to Nephila Capital Ltd. (“Capital”), an investment advisor managed and carrying on business in Bermuda. Advisors UK and its employees do not act as agents for Capital or the funds it advises and do not have the authority to bind Capital or such funds to any transaction or agreement. The information in this e-mail, and any attachment therein, is confidential and for use by the addressee only. Any use, disclosure, reproduction, modification or distribution of the contents of this e-mail, or any part thereof, other than by the intended recipient, is strictly prohibited. If you are not the intended recipient, please return the e-mail to the sender and delete it from your computer. This email is for information purposes only, nothing contained herein constitutes an offer to sell or buy securities, as such an offer may only be made from a properly authorized offering document. Although Nephila attempts to sweep e-mail and attachments for viruses, it does not guarantee that either are virus-free and accepts no liability for any damage sustained as a result of viruses. --
Re: executor delay in Spark
Hi Raghava, I'm terribly sorry about the end of my last email; that garbled sentence was garbled because it wasn't meant to exist; I wrote it on my phone, realized I wouldn't realistically have time to look into another set of logs deeply enough, and then mistook myself for having deleted it. Again, I'm very sorry for my error here. I did peek at your code, though, and think you could try the following: 0. The actions in your main method are many, and it will be hard to isolate a problem; I would recommend only examing *one* RDD at first, rather than six. 1. There is a lot of repetition for reading RDDs from textfiles sequentially; if you put those lines into two methods depending on RDD type, you will at least have one entry point to work with once you make a simplified test program. 2. In one part you persist, count, immediately unpersist, and then count again an RDD.. I'm not acquainted with this idiom, and I don't understand what that is to achieve. It strikes me suspect for triggering unusual garbage collection, which would, I think, only complicate your trace debugging. I've attached a python script that dumps relevant info from the Spark JSON logs into a CSV for easier analysis in you language of choice; hopefully it can aid in finer grained debugging (the headers of the fields it prints are listed in one of the functions). Mike On 4/25/16, Raghava Mutharaju wrote: > Mike, > > We ran our program with 16, 32 and 64 partitions. The behavior was same as > before with 8 partitions. It was mixed -- for some RDDs we see an > all-nothing skew, but for others we see them getting split across the 2 > worker nodes. In some cases, they start with even split and in later > iterations it goes to all-nothing split. Please find the logs attached. > > our program source code: > https://github.com/raghavam/sparkel/blob/275ecbb901a58592d8a70a8568dd95c839d46ecc/src/main/scala/org/daselab/sparkel/SparkELDAGAnalysis.scala > > We put in persist() statements for different RDDs just to check their skew. > > @Jeff, setting minRegisteredResourcesRatio did not help. Behavior was same > as before. > > Thank you for your time. > > Regards, > Raghava. > > > On Sun, Apr 24, 2016 at 7:17 PM, Mike Hynes <91m...@gmail.com> wrote: > >> Could you change numPartitions to {16, 32, 64} and run your program for >> each to see how many partitions are allocated to each worker? Let's see >> if >> you experience an all-nothing imbalance that way; if so, my guess is that >> something else is odd in your program logic or spark runtime environment, >> but if not and your executors all receive at least *some* partitions, >> then >> I still wouldn't rule out effects of scheduling delay. It's a simple >> test, >> but it could give some insight. >> >> Mike >> >> his could still be a scheduling If only one has *all* partitions, and >> email me the log file? (If it's 10+ MB, just the first few thousand lines >> are fine). >> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" >> wrote: >> >>> Mike, All, >>> >>> It turns out that the second time we encountered the uneven-partition >>> issue is not due to spark-submit. It was resolved with the change in >>> placement of count(). >>> >>> Case-1: >>> >>> val numPartitions = 8 >>> // read uAxioms from HDFS, use hash partitioner on it and persist it >>> // read type1Axioms from HDFS, use hash partitioner on it and persist it >>> currDeltaURule1 = type1Axioms.join(uAxioms) >>> .values >>> .distinct(numPartitions) >>> .partitionBy(hashPartitioner) >>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter) >>> >>> .persist(StorageLevel.MEMORY_AND_DISK) >>>.count() >>> >>> >>> >>> currDeltaURule1 RDD results in all the data on one node (there are 2 >>> worker nodes). If we move count(), the uneven partition issue is >>> resolved. >>> >>> Case-2: >>> >>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter) >>> >>> .persist(StorageLevel.MEMORY_AND_DISK) >>> >>> >>> >>> >>> -- this rdd depends on currDeltaURule1 and it gets >>> executed. This resolved the uneven partitioning issue. >>> >>> I don't see why the moving of an action to a later part in the code >>> would >&
Re: executor delay in Spark
Could you change numPartitions to {16, 32, 64} and run your program for each to see how many partitions are allocated to each worker? Let's see if you experience an all-nothing imbalance that way; if so, my guess is that something else is odd in your program logic or spark runtime environment, but if not and your executors all receive at least *some* partitions, then I still wouldn't rule out effects of scheduling delay. It's a simple test, but it could give some insight. Mike his could still be a scheduling If only one has *all* partitions, and email me the log file? (If it's 10+ MB, just the first few thousand lines are fine). On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" wrote: > Mike, All, > > It turns out that the second time we encountered the uneven-partition > issue is not due to spark-submit. It was resolved with the change in > placement of count(). > > Case-1: > > val numPartitions = 8 > // read uAxioms from HDFS, use hash partitioner on it and persist it > // read type1Axioms from HDFS, use hash partitioner on it and persist it > currDeltaURule1 = type1Axioms.join(uAxioms) > .values > .distinct(numPartitions) > .partitionBy(hashPartitioner) > currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter) > > .persist(StorageLevel.MEMORY_AND_DISK) >.count() > > > > currDeltaURule1 RDD results in all the data on one node (there are 2 > worker nodes). If we move count(), the uneven partition issue is resolved. > > Case-2: > > currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter) > > .persist(StorageLevel.MEMORY_AND_DISK) > > > > > -- this rdd depends on currDeltaURule1 and it gets executed. > This resolved the uneven partitioning issue. > > I don't see why the moving of an action to a later part in the code would > affect the partitioning. Are there other factors at play here that affect > the partitioning? > > (Inconsistent) uneven partitioning leads to one machine getting over > burdened (memory and number of tasks). We see a clear improvement in > runtime when the partitioning is even (happens when count is moved). > > Any pointers in figuring out this issue is much appreciated. > > Regards, > Raghava. > > > > > On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote: > >> Glad to hear that the problem was solvable! I have not seen delays of >> this type for later stages in jobs run by spark-submit, but I do not think >> it impossible if your stage has no lineage dependence on other RDDs. >> >> I'm CC'ing the dev list to report of other users observing load imbalance >> caused by unusual initial task scheduling. I don't know of ways to avoid >> this other than creating a dummy task to synchronize the executors, but >> hopefully someone from there can suggest other possibilities. >> >> Mike >> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" >> wrote: >> >>> Mike, >>> >>> It turns out the executor delay, as you mentioned, is the cause. After >>> we introduced a dummy stage, partitioning was working fine. Does this delay >>> happen during later stages as well? We noticed the same behavior >>> (partitioning happens on spark-shell but not through spark-submit) at a >>> later stage also. >>> >>> Apart from introducing a dummy stage or running it from spark-shell, is >>> there any other option to fix this? >>> >>> Regards, >>> Raghava. >>> >>> >>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote: >>> >>>> When submitting a job with spark-submit, I've observed delays (up to >>>> 1--2 seconds) for the executors to respond to the driver in order to >>>> receive tasks in the first stage. The delay does not persist once the >>>> executors have been synchronized. >>>> >>>> When the tasks are very short, as may be your case (relatively small >>>> data and a simple map task like you have described), the 8 tasks in >>>> your stage may be allocated to only 1 executor in 2 waves of 4, since >>>> the second executor won't have responded to the master before the >>>> first 4 tasks on the first executor have completed. >>>> >>>> To see if this is the cause in your particular case, you could try the >>>> following to confirm: >>>&g
Re: executor delay in Spark
Glad to hear that the problem was solvable! I have not seen delays of this type for later stages in jobs run by spark-submit, but I do not think it impossible if your stage has no lineage dependence on other RDDs. I'm CC'ing the dev list to report of other users observing load imbalance caused by unusual initial task scheduling. I don't know of ways to avoid this other than creating a dummy task to synchronize the executors, but hopefully someone from there can suggest other possibilities. Mike On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" wrote: > Mike, > > It turns out the executor delay, as you mentioned, is the cause. After we > introduced a dummy stage, partitioning was working fine. Does this delay > happen during later stages as well? We noticed the same behavior > (partitioning happens on spark-shell but not through spark-submit) at a > later stage also. > > Apart from introducing a dummy stage or running it from spark-shell, is > there any other option to fix this? > > Regards, > Raghava. > > > On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote: > >> When submitting a job with spark-submit, I've observed delays (up to >> 1--2 seconds) for the executors to respond to the driver in order to >> receive tasks in the first stage. The delay does not persist once the >> executors have been synchronized. >> >> When the tasks are very short, as may be your case (relatively small >> data and a simple map task like you have described), the 8 tasks in >> your stage may be allocated to only 1 executor in 2 waves of 4, since >> the second executor won't have responded to the master before the >> first 4 tasks on the first executor have completed. >> >> To see if this is the cause in your particular case, you could try the >> following to confirm: >> 1. Examine the starting times of the tasks alongside their >> executor >> 2. Make a "dummy" stage execute before your real stages to >> synchronize the executors by creating and materializing any random RDD >> 3. Make the tasks longer, i.e. with some silly computational work. >> >> Mike >> >> >> On 4/17/16, Raghava Mutharaju wrote: >> > Yes its the same data. >> > >> > 1) The number of partitions are the same (8, which is an argument to the >> > HashPartitioner). In the first case, these partitions are spread across >> > both the worker nodes. In the second case, all the partitions are on the >> > same node. >> > 2) What resources would be of interest here? Scala shell takes the >> default >> > parameters since we use "bin/spark-shell --master " to run >> the >> > scala-shell. For the scala program, we do set some configuration options >> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo >> > serializer. >> > >> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB >> > RAM.1 executor runs on each worker node. Following configuration options >> > are set for the scala program -- perhaps we should move it to the spark >> > config file. >> > >> > Driver memory and executor memory are set to 12GB >> > parallelism is set to 8 >> > Kryo serializer is used >> > Number of retainedJobs and retainedStages has been increased to check >> them >> > in the UI. >> > >> > What information regarding Spark Context would be of interest here? >> > >> > Regards, >> > Raghava. >> > >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar >> wrote: >> > >> >> If the data file is same then it should have similar distribution of >> >> keys. >> >> Few queries- >> >> >> >> 1. Did you compare the number of partitions in both the cases? >> >> 2. Did you compare the resource allocation for Spark Shell vs Scala >> >> Program being submitted? >> >> >> >> Also, can you please share the details of Spark Context, Environment >> and >> >> Executors when you run via Scala program? >> >> >> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju < >> >> m.vijayaragh...@gmail.com> wrote: >> >> >> >>> Hello All, >> >>> >> >>> We are using HashPartitioner in the following way on a 3 node cluster >> (1 >> >>> master and 2 worker nodes). >> >>> >> >>> val u = >> >>> sc.textFile("hdfs://x.x.x.x:8020/us
Re: strange HashPartitioner behavior in Spark
A HashPartitioner will indeed partition based on the key, but you cannot know on *which* node that key will appear. Again, the RDD partitions will not necessarily be distributed evenly across your nodes because of the greedy scheduling of the first wave of tasks, particularly if those tasks have durations less than the initial executor delay. I recommend you look at your logs to verify if this is happening to you. Mike On 4/18/16, Anuj Kumar wrote: > Good point Mike +1 > > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote: > >> When submitting a job with spark-submit, I've observed delays (up to >> 1--2 seconds) for the executors to respond to the driver in order to >> receive tasks in the first stage. The delay does not persist once the >> executors have been synchronized. >> >> When the tasks are very short, as may be your case (relatively small >> data and a simple map task like you have described), the 8 tasks in >> your stage may be allocated to only 1 executor in 2 waves of 4, since >> the second executor won't have responded to the master before the >> first 4 tasks on the first executor have completed. >> >> To see if this is the cause in your particular case, you could try the >> following to confirm: >> 1. Examine the starting times of the tasks alongside their >> executor >> 2. Make a "dummy" stage execute before your real stages to >> synchronize the executors by creating and materializing any random RDD >> 3. Make the tasks longer, i.e. with some silly computational >> work. >> >> Mike >> >> >> On 4/17/16, Raghava Mutharaju wrote: >> > Yes its the same data. >> > >> > 1) The number of partitions are the same (8, which is an argument to >> > the >> > HashPartitioner). In the first case, these partitions are spread across >> > both the worker nodes. In the second case, all the partitions are on >> > the >> > same node. >> > 2) What resources would be of interest here? Scala shell takes the >> default >> > parameters since we use "bin/spark-shell --master " to run >> the >> > scala-shell. For the scala program, we do set some configuration >> > options >> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo >> > serializer. >> > >> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB >> > RAM.1 executor runs on each worker node. Following configuration >> > options >> > are set for the scala program -- perhaps we should move it to the spark >> > config file. >> > >> > Driver memory and executor memory are set to 12GB >> > parallelism is set to 8 >> > Kryo serializer is used >> > Number of retainedJobs and retainedStages has been increased to check >> them >> > in the UI. >> > >> > What information regarding Spark Context would be of interest here? >> > >> > Regards, >> > Raghava. >> > >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar >> > wrote: >> > >> >> If the data file is same then it should have similar distribution of >> >> keys. >> >> Few queries- >> >> >> >> 1. Did you compare the number of partitions in both the cases? >> >> 2. Did you compare the resource allocation for Spark Shell vs Scala >> >> Program being submitted? >> >> >> >> Also, can you please share the details of Spark Context, Environment >> >> and >> >> Executors when you run via Scala program? >> >> >> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju < >> >> m.vijayaragh...@gmail.com> wrote: >> >> >> >>> Hello All, >> >>> >> >>> We are using HashPartitioner in the following way on a 3 node cluster >> (1 >> >>> master and 2 worker nodes). >> >>> >> >>> val u = >> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int, >> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => >> >>> (y.toInt, >> >>> x.toInt) } }).partitionBy(new >> HashPartitioner(8)).setName("u").persist() >> >>> >> >>> u.count() >> >>> >> >>> If we run this from the spark shell, the data (52 MB) is split across >> >>> the >> >>> two worker nodes. But if we put this in a scala program and run it, >> then >> >>> all the data goes to only one node. We have run it multiple times, >> >>> but >> >>> this >> >>> behavior does not change. This seems strange. >> >>> >> >>> Is there some problem with the way we use HashPartitioner? >> >>> >> >>> Thanks in advance. >> >>> >> >>> Regards, >> >>> Raghava. >> >>> >> >> >> >> >> > >> > >> > -- >> > Regards, >> > Raghava >> > http://raghavam.github.io >> > >> >> >> -- >> Thanks, >> Mike >> > -- Thanks, Mike - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: strange HashPartitioner behavior in Spark
When submitting a job with spark-submit, I've observed delays (up to 1--2 seconds) for the executors to respond to the driver in order to receive tasks in the first stage. The delay does not persist once the executors have been synchronized. When the tasks are very short, as may be your case (relatively small data and a simple map task like you have described), the 8 tasks in your stage may be allocated to only 1 executor in 2 waves of 4, since the second executor won't have responded to the master before the first 4 tasks on the first executor have completed. To see if this is the cause in your particular case, you could try the following to confirm: 1. Examine the starting times of the tasks alongside their executor 2. Make a "dummy" stage execute before your real stages to synchronize the executors by creating and materializing any random RDD 3. Make the tasks longer, i.e. with some silly computational work. Mike On 4/17/16, Raghava Mutharaju wrote: > Yes its the same data. > > 1) The number of partitions are the same (8, which is an argument to the > HashPartitioner). In the first case, these partitions are spread across > both the worker nodes. In the second case, all the partitions are on the > same node. > 2) What resources would be of interest here? Scala shell takes the default > parameters since we use "bin/spark-shell --master " to run the > scala-shell. For the scala program, we do set some configuration options > such as driver memory (12GB), parallelism is set to 8 and we use Kryo > serializer. > > We are running this on Azure D3-v2 machines which have 4 cores and 14GB > RAM.1 executor runs on each worker node. Following configuration options > are set for the scala program -- perhaps we should move it to the spark > config file. > > Driver memory and executor memory are set to 12GB > parallelism is set to 8 > Kryo serializer is used > Number of retainedJobs and retainedStages has been increased to check them > in the UI. > > What information regarding Spark Context would be of interest here? > > Regards, > Raghava. > > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar wrote: > >> If the data file is same then it should have similar distribution of >> keys. >> Few queries- >> >> 1. Did you compare the number of partitions in both the cases? >> 2. Did you compare the resource allocation for Spark Shell vs Scala >> Program being submitted? >> >> Also, can you please share the details of Spark Context, Environment and >> Executors when you run via Scala program? >> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju < >> m.vijayaragh...@gmail.com> wrote: >> >>> Hello All, >>> >>> We are using HashPartitioner in the following way on a 3 node cluster (1 >>> master and 2 worker nodes). >>> >>> val u = >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int, >>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt, >>> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist() >>> >>> u.count() >>> >>> If we run this from the spark shell, the data (52 MB) is split across >>> the >>> two worker nodes. But if we put this in a scala program and run it, then >>> all the data goes to only one node. We have run it multiple times, but >>> this >>> behavior does not change. This seems strange. >>> >>> Is there some problem with the way we use HashPartitioner? >>> >>> Thanks in advance. >>> >>> Regards, >>> Raghava. >>> >> >> > > > -- > Regards, > Raghava > http://raghavam.github.io > -- Thanks, Mike - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD Partitions not distributed evenly to executors
mbalance arises due to the greedy nature of the scheduler when data locality is not a factor? 2a. Why are the task times in plot 1 decreasing so dramatically, but not in plot 2? 2b. Could the decrease in time be due to just-in-time compilation? 2c. If so, Why would the JIT occur only for the first case with many partitions when the same amount of computational work is to be done in both cases? 3. If an RDD is to be created in such a manner (i.e. initialized for, say, an iterative algorithm, rather than by reading data from disk or hdfs), what is the best practice to promote good load balancing? My first idea would be to create the full RDD with 2x as many partitions but then coalesce it down to half the number of partitions with the shuffle flag set to true. Would that be reasonable? Thank you very much for your time, and I very much hope that someone from the dev community who is familiar with the scheduler may be able to clarify the above observations and questions. Thanks, Mike P.S. Koert Kuipers: neither spark-defaults.sh setting impacted the observed behaviour, but thank you kindly for your suggestions. On 4/5/16, Khaled Ammar wrote: > I have a similar experience. > > Using 32 machines, I can see than number of tasks (partitions) assigned to > executors (machines) is not even. Moreover, the distribution change every > stage (iteration). > > I wonder why Spark needs to move partitions around any way, should not the > scheduler reduce network (and other IO) overhead by reducing such > relocation. > > Thanks, > -Khaled > > > > > On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers wrote: > >> can you try: >> spark.shuffle.reduceLocality.enabled=false >> >> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91m...@gmail.com> wrote: >> >>> Dear all, >>> >>> Thank you for your responses. >>> >>> Michael Slavitch: >>> > Just to be sure: Has spark-env.sh and spark-defaults.conf been >>> correctly propagated to all nodes? Are they identical? >>> Yes; these files are stored on a shared memory directory accessible to >>> all nodes. >>> >>> Koert Kuipers: >>> > we ran into similar issues and it seems related to the new memory >>> > management. can you try: >>> > spark.memory.useLegacyMode = true >>> I reran the exact same code with a restarted cluster using this >>> modification, and did not observe any difference. The partitioning is >>> still imbalanced. >>> >>> Ted Yu: >>> > If the changes can be ported over to 1.6.1, do you mind reproducing >>> > the >>> issue there ? >>> Since the spark.memory.useLegacyMode setting did not impact my code >>> execution, I will have to change the Spark dependency back to earlier >>> versions to see if the issue persists and get back to you. >>> >>> Meanwhile, if anyone else has any other ideas or experience, please let >>> me know. >>> >>> Mike >>> >>> On 4/4/16, Koert Kuipers wrote: >>> > we ran into similar issues and it seems related to the new memory >>> > management. can you try: >>> > spark.memory.useLegacyMode = true >>> > >>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote: >>> > >>> >> [ CC'ing dev list since nearly identical questions have occurred in >>> >> user list recently w/o resolution; >>> >> c.f.: >>> >> >>> >> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html >>> >> >>> >> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html >>> >> ] >>> >> >>> >> Hello, >>> >> >>> >> In short, I'm reporting a problem concerning load imbalance of RDD >>> >> partitions across a standalone cluster. Though there are 16 cores >>> >> available per node, certain nodes will have >16 partitions, and some >>> >> will correspondingly have <16 (and even 0). >>> >> >>> >> In more detail: I am running some scalability/performance tests for >>> >> vector-type operations. The RDDs I'm considering are simple block >>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs >>> >> are generated with a fixed number of elements given by some multiple >>> >> of the available cores, and subsequently hash-partitioned by their >>>
Re: RDD Partitions not distributed evenly to executors
Dear all, Thank you for your responses. Michael Slavitch: > Just to be sure: Has spark-env.sh and spark-defaults.conf been correctly > propagated to all nodes? Are they identical? Yes; these files are stored on a shared memory directory accessible to all nodes. Koert Kuipers: > we ran into similar issues and it seems related to the new memory > management. can you try: > spark.memory.useLegacyMode = true I reran the exact same code with a restarted cluster using this modification, and did not observe any difference. The partitioning is still imbalanced. Ted Yu: > If the changes can be ported over to 1.6.1, do you mind reproducing the issue > there ? Since the spark.memory.useLegacyMode setting did not impact my code execution, I will have to change the Spark dependency back to earlier versions to see if the issue persists and get back to you. Meanwhile, if anyone else has any other ideas or experience, please let me know. Mike On 4/4/16, Koert Kuipers wrote: > we ran into similar issues and it seems related to the new memory > management. can you try: > spark.memory.useLegacyMode = true > > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote: > >> [ CC'ing dev list since nearly identical questions have occurred in >> user list recently w/o resolution; >> c.f.: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html >> ] >> >> Hello, >> >> In short, I'm reporting a problem concerning load imbalance of RDD >> partitions across a standalone cluster. Though there are 16 cores >> available per node, certain nodes will have >16 partitions, and some >> will correspondingly have <16 (and even 0). >> >> In more detail: I am running some scalability/performance tests for >> vector-type operations. The RDDs I'm considering are simple block >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs >> are generated with a fixed number of elements given by some multiple >> of the available cores, and subsequently hash-partitioned by their >> integer block index. >> >> I have verified that the hash partitioning key distribution, as well >> as the keys themselves, are both correct; the problem is truly that >> the partitions are *not* evenly distributed across the nodes. >> >> For instance, here is a representative output for some stages and >> tasks in an iterative program. This is a very simple test with 2 >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two >> examples stages from the stderr log are stages 7 and 9: >> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272 >> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639 >> >> When counting the location of the partitions on the compute nodes from >> the stderr logs, however, you can clearly see the imbalance. Examples >> lines are: >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196, >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)& >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197, >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)& >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198, >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)& >> >> Grep'ing the full set of above lines for each hostname, himrod-?, >> shows the problem occurs in each stage. Below is the output, where the >> number of partitions stored on each node is given alongside its >> hostname as in (himrod-?,num_partitions): >> Stage 7: (himrod-1,0) (himrod-2,64) >> Stage 9: (himrod-1,16) (himrod-2,48) >> Stage 12: (himrod-1,0) (himrod-2,64) >> Stage 14: (himrod-1,16) (himrod-2,48) >> The imbalance is also visible when the executor ID is used to count >> the partitions operated on by executors. >> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch >> (but the modifications do not touch the scheduler, and are irrelevant >> for these particular tests). Has something changed radically in 1.6+ >> that would make a previously (<=1.5) correct configuration go haywire? >> Have new configuration settings been added of which I'm unaware that >> could lead to this problem? >> >> Please let me know if others in the community have observed this, and >> thank you for your time, >> Mike >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > -- Thanks, Mike - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD Partitions not distributed evenly to executors
[ CC'ing dev list since nearly identical questions have occurred in user list recently w/o resolution; c.f.: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html ] Hello, In short, I'm reporting a problem concerning load imbalance of RDD partitions across a standalone cluster. Though there are 16 cores available per node, certain nodes will have >16 partitions, and some will correspondingly have <16 (and even 0). In more detail: I am running some scalability/performance tests for vector-type operations. The RDDs I'm considering are simple block vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs are generated with a fixed number of elements given by some multiple of the available cores, and subsequently hash-partitioned by their integer block index. I have verified that the hash partitioning key distribution, as well as the keys themselves, are both correct; the problem is truly that the partitions are *not* evenly distributed across the nodes. For instance, here is a representative output for some stages and tasks in an iterative program. This is a very simple test with 2 nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two examples stages from the stderr log are stages 7 and 9: 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639 When counting the location of the partitions on the compute nodes from the stderr logs, however, you can clearly see the imbalance. Examples lines are: 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196, himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)& 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197, himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)& 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198, himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)& Grep'ing the full set of above lines for each hostname, himrod-?, shows the problem occurs in each stage. Below is the output, where the number of partitions stored on each node is given alongside its hostname as in (himrod-?,num_partitions): Stage 7: (himrod-1,0) (himrod-2,64) Stage 9: (himrod-1,16) (himrod-2,48) Stage 12: (himrod-1,0) (himrod-2,64) Stage 14: (himrod-1,16) (himrod-2,48) The imbalance is also visible when the executor ID is used to count the partitions operated on by executors. I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch (but the modifications do not touch the scheduler, and are irrelevant for these particular tests). Has something changed radically in 1.6+ that would make a previously (<=1.5) correct configuration go haywire? Have new configuration settings been added of which I'm unaware that could lead to this problem? Please let me know if others in the community have observed this, and thank you for your time, Mike - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Metrics Framework?
Thanks Silvio, JIRA submitted https://issues.apache.org/jira/browse/SPARK-14332. On Fri, 25 Mar 2016 at 12:46 Silvio Fiorito wrote: > Hi Mike, > > Sorry got swamped with work and didn’t get a chance to reply. > > I misunderstood what you were trying to do. I thought you were just > looking to create custom metrics vs looking for the existing Hadoop Output > Format counters. > > I’m not familiar enough with the Hadoop APIs but I think it would require > a change to the SparkHadoopWriter > <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala> > class since it generates the JobContext which is required to read the > counters. Then it could publish the counters to the Spark metrics system. > > I would suggest going ahead and submitting a JIRA request if there isn’t > one already. > > Thanks, > Silvio > > From: Mike Sukmanowsky > Date: Friday, March 25, 2016 at 10:48 AM > > To: Silvio Fiorito , "user@spark.apache.org" > > Subject: Re: Spark Metrics Framework? > > Pinging again - any thoughts? > > On Wed, 23 Mar 2016 at 09:17 Mike Sukmanowsky > wrote: > >> Thanks Ted and Silvio. I think I'll need a bit more hand holding here, >> sorry. The way we use ES Hadoop is in pyspark via >> org.elasticsearch.hadoop.mr.EsOutputFormat in a saveAsNewAPIHadoopFile >> call. Given the Hadoop interop, I wouldn't assume that the EsOutputFormat >> class >> <https://github.com/elastic/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java> >> could be modified to define a new Source and register it via >> MetricsSystem.createMetricsSystem. This feels like a good feature request >> for Spark actually: "Support Hadoop Counters in Input/OutputFormats as >> Spark metrics" but I wanted some feedback first to see if that makes sense. >> >> That said, some of the custom RDD classes >> <https://github.com/elastic/elasticsearch-hadoop/tree/master/spark/core/main/scala/org/elasticsearch/spark/rdd> >> could >> probably be modified to register a new Source when they perform >> reading/writing from/to Elasticsearch. >> >> On Tue, 22 Mar 2016 at 15:17 Silvio Fiorito < >> silvio.fior...@granturing.com> wrote: >> >>> Hi Mike, >>> >>> It’s been a while since I worked on a custom Source but I think all you >>> need to do is make your Source in the org.apache.spark package. >>> >>> Thanks, >>> Silvio >>> >>> From: Mike Sukmanowsky >>> Date: Tuesday, March 22, 2016 at 3:13 PM >>> To: Silvio Fiorito , " >>> user@spark.apache.org" >>> Subject: Re: Spark Metrics Framework? >>> >>> The Source class is private >>> <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/source/Source.scala#L22-L25> >>> to the spark package and any new Sources added to the metrics registry must >>> be of type Source >>> <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L144-L152>. >>> So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1 >>> code, but the same is true in 1.6.1. >>> >>> On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito < >>> silvio.fior...@granturing.com> wrote: >>> >>>> You could use the metric sources and sinks described here: >>>> http://spark.apache.org/docs/latest/monitoring.html#metrics >>>> >>>> If you want to push the metrics to another system you can define a >>>> custom sink. You can also extend the metrics by defining a custom source. >>>> >>>> From: Mike Sukmanowsky >>>> Date: Monday, March 21, 2016 at 11:54 AM >>>> To: "user@spark.apache.org" >>>> Subject: Spark Metrics Framework? >>>> >>>> We make extensive use of the elasticsearch-hadoop library for >>>> Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be >>>> very handy to have access to some of the many metrics >>>> <https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html> >>>> that the library makes available when running in map reduce mode. The >>>> library's >>>> author noted >>>> <https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913> >>>> that Spark doesn't offer any kind of a similar metrics API where by these >>>> metrics could be reported or aggregated on. >>>> >>>> Are there any plans to bring a metrics framework similar to Hadoop's >>>> Counter system to Spark or is there an alternative means for us to grab >>>> metrics exposed when using Hadoop APIs to load/save RDDs? >>>> >>>> Thanks, >>>> Mike >>>> >>>
Re: Spark Metrics Framework?
Pinging again - any thoughts? On Wed, 23 Mar 2016 at 09:17 Mike Sukmanowsky wrote: > Thanks Ted and Silvio. I think I'll need a bit more hand holding here, > sorry. The way we use ES Hadoop is in pyspark via > org.elasticsearch.hadoop.mr.EsOutputFormat in a saveAsNewAPIHadoopFile > call. Given the Hadoop interop, I wouldn't assume that the EsOutputFormat > class > <https://github.com/elastic/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java> > could be modified to define a new Source and register it via > MetricsSystem.createMetricsSystem. This feels like a good feature request > for Spark actually: "Support Hadoop Counters in Input/OutputFormats as > Spark metrics" but I wanted some feedback first to see if that makes sense. > > That said, some of the custom RDD classes > <https://github.com/elastic/elasticsearch-hadoop/tree/master/spark/core/main/scala/org/elasticsearch/spark/rdd> > could > probably be modified to register a new Source when they perform > reading/writing from/to Elasticsearch. > > On Tue, 22 Mar 2016 at 15:17 Silvio Fiorito > wrote: > >> Hi Mike, >> >> It’s been a while since I worked on a custom Source but I think all you >> need to do is make your Source in the org.apache.spark package. >> >> Thanks, >> Silvio >> >> From: Mike Sukmanowsky >> Date: Tuesday, March 22, 2016 at 3:13 PM >> To: Silvio Fiorito , " >> user@spark.apache.org" >> Subject: Re: Spark Metrics Framework? >> >> The Source class is private >> <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/source/Source.scala#L22-L25> >> to the spark package and any new Sources added to the metrics registry must >> be of type Source >> <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L144-L152>. >> So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1 >> code, but the same is true in 1.6.1. >> >> On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito < >> silvio.fior...@granturing.com> wrote: >> >>> You could use the metric sources and sinks described here: >>> http://spark.apache.org/docs/latest/monitoring.html#metrics >>> >>> If you want to push the metrics to another system you can define a >>> custom sink. You can also extend the metrics by defining a custom source. >>> >>> From: Mike Sukmanowsky >>> Date: Monday, March 21, 2016 at 11:54 AM >>> To: "user@spark.apache.org" >>> Subject: Spark Metrics Framework? >>> >>> We make extensive use of the elasticsearch-hadoop library for >>> Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be >>> very handy to have access to some of the many metrics >>> <https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html> >>> that the library makes available when running in map reduce mode. The >>> library's >>> author noted >>> <https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913> >>> that Spark doesn't offer any kind of a similar metrics API where by these >>> metrics could be reported or aggregated on. >>> >>> Are there any plans to bring a metrics framework similar to Hadoop's >>> Counter system to Spark or is there an alternative means for us to grab >>> metrics exposed when using Hadoop APIs to load/save RDDs? >>> >>> Thanks, >>> Mike >>> >>
Re: Spark Metrics Framework?
Thanks Ted and Silvio. I think I'll need a bit more hand holding here, sorry. The way we use ES Hadoop is in pyspark via org.elasticsearch.hadoop.mr.EsOutputFormat https://github.com/elastic/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java> in a saveAsNewAPIHadoopFile call. Given the Hadoop interop, I wouldn't assume that the EsOutputFormat class <https://github.com/elastic/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java> could be modified to define a new Source and register it via MetricsSystem.createMetricsSystem. This feels like a good feature request for Spark actually: "Support Hadoop Counters in Input/OutputFormats as Spark metrics" but I wanted some feedback first to see if that makes sense. That said, some of the custom RDD classes <https://github.com/elastic/elasticsearch-hadoop/tree/master/spark/core/main/scala/org/elasticsearch/spark/rdd> could probably be modified to register a new Source when they perform reading/writing from/to Elasticsearch. On Tue, 22 Mar 2016 at 15:17 Silvio Fiorito wrote: > Hi Mike, > > It’s been a while since I worked on a custom Source but I think all you > need to do is make your Source in the org.apache.spark package. > > Thanks, > Silvio > > From: Mike Sukmanowsky > Date: Tuesday, March 22, 2016 at 3:13 PM > To: Silvio Fiorito , "user@spark.apache.org" > > Subject: Re: Spark Metrics Framework? > > The Source class is private > <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/source/Source.scala#L22-L25> > to the spark package and any new Sources added to the metrics registry must > be of type Source > <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L144-L152>. > So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1 > code, but the same is true in 1.6.1. > > On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito > wrote: > >> You could use the metric sources and sinks described here: >> http://spark.apache.org/docs/latest/monitoring.html#metrics >> >> If you want to push the metrics to another system you can define a custom >> sink. You can also extend the metrics by defining a custom source. >> >> From: Mike Sukmanowsky >> Date: Monday, March 21, 2016 at 11:54 AM >> To: "user@spark.apache.org" >> Subject: Spark Metrics Framework? >> >> We make extensive use of the elasticsearch-hadoop library for >> Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be >> very handy to have access to some of the many metrics >> <https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html> >> that the library makes available when running in map reduce mode. The >> library's >> author noted >> <https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913> >> that Spark doesn't offer any kind of a similar metrics API where by these >> metrics could be reported or aggregated on. >> >> Are there any plans to bring a metrics framework similar to Hadoop's >> Counter system to Spark or is there an alternative means for us to grab >> metrics exposed when using Hadoop APIs to load/save RDDs? >> >> Thanks, >> Mike >> >
Re: Spark Metrics Framework?
The Source class is private <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/source/Source.scala#L22-L25> to the spark package and any new Sources added to the metrics registry must be of type Source <https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L144-L152>. So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1 code, but the same is true in 1.6.1. On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito wrote: > You could use the metric sources and sinks described here: > http://spark.apache.org/docs/latest/monitoring.html#metrics > > If you want to push the metrics to another system you can define a custom > sink. You can also extend the metrics by defining a custom source. > > From: Mike Sukmanowsky > Date: Monday, March 21, 2016 at 11:54 AM > To: "user@spark.apache.org" > Subject: Spark Metrics Framework? > > We make extensive use of the elasticsearch-hadoop library for > Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be > very handy to have access to some of the many metrics > <https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html> > that the library makes available when running in map reduce mode. The > library's > author noted > <https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913> > that Spark doesn't offer any kind of a similar metrics API where by these > metrics could be reported or aggregated on. > > Are there any plans to bring a metrics framework similar to Hadoop's > Counter system to Spark or is there an alternative means for us to grab > metrics exposed when using Hadoop APIs to load/save RDDs? > > Thanks, > Mike >
Spark Metrics Framework?
We make extensive use of the elasticsearch-hadoop library for Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be very handy to have access to some of the many metrics <https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html> that the library makes available when running in map reduce mode. The library's author noted <https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913> that Spark doesn't offer any kind of a similar metrics API where by these metrics could be reported or aggregated on. Are there any plans to bring a metrics framework similar to Hadoop's Counter system to Spark or is there an alternative means for us to grab metrics exposed when using Hadoop APIs to load/save RDDs? Thanks, Mike
Spark Job Server with Yarn and Kerberos
Has anyone used Spark Job Server on a "kerberized" cluster in YARN-Client mode? When Job Server contacts the YARN resource manager, we see a "Cannot impersonate root" error and am not sure what we have misconfigured. Thanks. ___ *Mike Wright* Principal Architect, Software Engineering S&P Capital IQ and SNL 434-951-7816 *p* 434-244-4466 *f* 540-470-0119 *m* mwri...@snl.com
Re: Questions on Kerberos usage with YARN and JDBC
Kerberos seems to be working otherwise ... for example, we're using it successfully to control access to HDFS and it's linked to AD ... we're using Ranger if that helps. I'm not a systems admin guy so this is really not my area of expertise. ___ *Mike Wright* Principal Architect, Software Engineering S&P Capital IQ and SNL 434-951-7816 *p* 434-244-4466 *f* 540-470-0119 *m* mwri...@snl.com On Fri, Dec 11, 2015 at 4:36 PM, Todd Simmer wrote: > hey Mike, > > Are these part of an Active Directory Domain? If so are they pointed at > the AD domain controllers that hosts the Kerberos server? Windows AD create > SRV records in DNS to help windows clients find the Kerberos server for > their domain. If you look you can see if you have a kdc record in Windows > DNS and what it's pointing at. Can you do a > > kinit *username * > > on that host? It should tell you if it can find the KDC. > > Let me know if that's helpful at all. > > Todd > > On Fri, Dec 11, 2015 at 1:50 PM, Mike Wright wrote: > >> As part of our implementation, we are utilizing a full "Kerberized" >> cluster built on the Hortonworks suite. We're using Job Server as the front >> end to initiate short-run jobs directly from our client-facing product >> suite. >> >> 1) We believe we have configured the job server to start with the >> appropriate credentials, specifying a principal and keytab. We switch to >> YARN-CLIENT mode and can see Job Server attempt to connect to the resource >> manager, and the result is that whatever the principal name is, it "cannot >> impersonate root." We have been unable to solve this. >> >> 2) We are primarily a Windows shop, hence our cluelessness here. That >> said, we're using the JDBC driver version 4.2 and want to use JavaKerberos >> authentication to connect to SQL Server. The queries performed by the job >> are done in the driver, and hence would be running on the Job Server, which >> we confirmed is running as the principal we have designated. However, when >> attempting to connect with this option enabled I receive a "Unable to >> obtain Principal Name for authentication" exception. >> >> Reading this: >> >> https://msdn.microsoft.com/en-us/library/ms378428.aspx >> >> We have Kerberos working on the machine and thus have krb5.conf setup >> correctly. However the section, " >> >> Enabling the Domain Configuration File and the Login Module Configuration >> File" seems to indicate we've missed a step somewhere. >> >> Forgive my ignorance here ... I've been on Windows for 20 years and this >> is all new to. >> >> Thanks for any guidance you can provide. >> > >
Re: is Multiple Spark Contexts is supported in spark 1.5.0 ?
Thanks for the insight! ___ *Mike Wright* Principal Architect, Software Engineering S&P Capital IQ and SNL 434-951-7816 *p* 434-244-4466 *f* 540-470-0119 *m* mwri...@snl.com On Fri, Dec 11, 2015 at 2:38 PM, Michael Armbrust wrote: > The way that we do this is to have a single context with a server in front > that multiplexes jobs that use that shared context. Even if you aren't > sharing data this is going to give you the best fine grained sharing of the > resources that the context is managing. > > On Fri, Dec 11, 2015 at 10:55 AM, Mike Wright wrote: > >> Somewhat related - What's the correct implementation when you have a >> single cluster to support multiple jobs that are unrelated and NOT sharing >> data? I was directed to figure out, via job server, to support "multiple >> contexts" and explained that multiple contexts per JVM is not really >> supported. So, via job server, how does one support multiple contexts in >> DIFFERENT JVM's? I specify multiple contexts in the conf file and the >> initialization of the subsequent contexts fail. >> >> >> >> On Fri, Dec 4, 2015 at 3:37 PM, Michael Armbrust >> wrote: >> >>> On Fri, Dec 4, 2015 at 11:24 AM, Anfernee Xu >>> wrote: >>> >>>> If multiple users are looking at the same data set, then it's good >>>> choice to share the SparkContext. >>>> >>>> But my usercases are different, users are looking at different data(I >>>> use custom Hadoop InputFormat to load data from my data source based on the >>>> user input), the data might not have any overlap. For now I'm taking below >>>> approach >>>> >>> >>> Still if you want fine grained sharing of compute resources as well, you >>> want to using single SparkContext. >>> >> >> >
Re: is Multiple Spark Contexts is supported in spark 1.5.0 ?
Somewhat related - What's the correct implementation when you have a single cluster to support multiple jobs that are unrelated and NOT sharing data? I was directed to figure out, via job server, to support "multiple contexts" and explained that multiple contexts per JVM is not really supported. So, via job server, how does one support multiple contexts in DIFFERENT JVM's? I specify multiple contexts in the conf file and the initialization of the subsequent contexts fail. On Fri, Dec 4, 2015 at 3:37 PM, Michael Armbrust wrote: > On Fri, Dec 4, 2015 at 11:24 AM, Anfernee Xu > wrote: > >> If multiple users are looking at the same data set, then it's good choice >> to share the SparkContext. >> >> But my usercases are different, users are looking at different data(I use >> custom Hadoop InputFormat to load data from my data source based on the >> user input), the data might not have any overlap. For now I'm taking below >> approach >> > > Still if you want fine grained sharing of compute resources as well, you > want to using single SparkContext. >
Questions on Kerberos usage with YARN and JDBC
As part of our implementation, we are utilizing a full "Kerberized" cluster built on the Hortonworks suite. We're using Job Server as the front end to initiate short-run jobs directly from our client-facing product suite. 1) We believe we have configured the job server to start with the appropriate credentials, specifying a principal and keytab. We switch to YARN-CLIENT mode and can see Job Server attempt to connect to the resource manager, and the result is that whatever the principal name is, it "cannot impersonate root." We have been unable to solve this. 2) We are primarily a Windows shop, hence our cluelessness here. That said, we're using the JDBC driver version 4.2 and want to use JavaKerberos authentication to connect to SQL Server. The queries performed by the job are done in the driver, and hence would be running on the Job Server, which we confirmed is running as the principal we have designated. However, when attempting to connect with this option enabled I receive a "Unable to obtain Principal Name for authentication" exception. Reading this: https://msdn.microsoft.com/en-us/library/ms378428.aspx We have Kerberos working on the machine and thus have krb5.conf setup correctly. However the section, " Enabling the Domain Configuration File and the Login Module Configuration File" seems to indicate we've missed a step somewhere. Forgive my ignorance here ... I've been on Windows for 20 years and this is all new to. Thanks for any guidance you can provide.
Re: New to Spark - Paritioning Question
Thanks for the response! Well, in retrospect each partition doesn't need to be restricted to a single key. But, I cannot have values associated with a key span partitions since they all need to be processed together for a key to facilitate cumulative calcs. So provided an individual key has all its values in a single partition, I'm OK. Additionally, the values will be written to the database, and from what I have read doing this at the partition level is the best compromise between 1) Writing the calculated values for each key (lots of connect/disconnects) and collecting them all at the end and writing them all at once. I am using a groupBy against the filtered RDD the get the grouping I want, but apparently this may not be the most efficient way, and it seems that everything is always in a single partition under this scenario. ___ *Mike Wright* Principal Architect, Software Engineering SNL Financial LC 434-951-7816 *p* 434-244-4466 *f* 540-470-0119 *m* mwri...@snl.com On Tue, Sep 8, 2015 at 5:38 PM, Richard Marscher wrote: > That seems like it could work, although I don't think `partitionByKey` is > a thing, at least for RDD. You might be able to merge step #2 and step #3 > into one step by using the `reduceByKey` function signature that takes in a > Partitioner implementation. > > def reduceByKey(partitioner: Partitioner > <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/Partitioner.html> > , func: (V, V) ⇒ V): RDD > <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html> > [(K, V)] > > Merge the values for each key using an associative reduce function. This > will also perform the merging locally on each mapper before sending results > to a reducer, similarly to a "combiner" in MapReduce. > > The tricky part might be getting the partitioner to know about the number > of partitions, which I think it needs to know upfront in `abstract def > numPartitions: Int`. The `HashPartitioner` for example takes in the > number as a constructor argument, maybe you could use that with an upper > bound size if you don't mind empty partitions. Otherwise you might have to > mess around to extract the exact number of keys if it's not readily > available. > > Aside: what is the requirement to have each partition only contain the > data related to one key? > > On Fri, Sep 4, 2015 at 11:06 AM, mmike87 wrote: > >> Hello, I am new to Apache Spark and this is my company's first Spark >> project. >> Essentially, we are calculating models dealing with Mining data using >> Spark. >> >> I am holding all the source data in a persisted RDD that we will refresh >> periodically. When a "scenario" is passed to the Spark job (we're using >> Job >> Server) the persisted RDD is filtered to the relevant mines. For example, >> we >> may want all mines in Chile and the 1990-2015 data for each. >> >> Many of the calculations are cumulative, that is when we apply user-input >> "adjustment factors" to a value, we also need the "flexed" value we >> calculated for that mine previously. >> >> To ensure that this works, the idea if to: >> >> 1) Filter the superset to relevant mines (done) >> 2) Group the subset by the unique identifier for the mine. So, a group may >> be all the rows for mine "A" for 1990-2015 >> 3) I then want to ensure that the RDD is partitioned by the Mine >> Identifier >> (and Integer). >> >> It's step 3 that is confusing me. I suspect it's very easy ... do I simply >> use PartitionByKey? >> >> We're using Java if that makes any difference. >> >> Thanks! >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/New-to-Spark-Paritioning-Question-tp24580.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > > > -- > *Richard Marscher* > Software Engineer > Localytics > Localytics.com <http://localytics.com/> | Our Blog > <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | > Facebook <http://facebook.com/localytics> | LinkedIn > <http://www.linkedin.com/company/1148792?trk=tyah> >
Re: How to unit test HiveContext without OutOfMemoryError (using sbt)
Thanks for your response Yana, I can increase the MaxPermSize parameter and it will allow me to run the unit test a few more times before I run out of memory. However, the primary issue is that running the same unit test in the same JVM (multiple times) results in increased memory (each run of the unit test) and I believe it has something to do with HiveContext not reclaiming memory after it is finished (or I'm not shutting it down properly). It could very well be related to sbt, however, it's not clear to me. On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska wrote: > The PermGen space error is controlled with MaxPermSize parameter. I run > with this in my pom, I think copied pretty literally from Spark's own > tests... I don't know what the sbt equivalent is but you should be able to > pass it...possibly via SBT_OPTS? > > > > org.scalatest > scalatest-maven-plugin > 1.0 > > > ${project.build.directory}/surefire-reports > false > . > SparkTestSuite.txt > -Xmx3g -XX:MaxPermSize=256m > -XX:ReservedCodeCacheSize=512m > > > true > 1 > false > > true > > > > > test > > test > > > > > > > > On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis > wrote: > >> Hello, >> >> I am using sbt and created a unit test where I create a `HiveContext` and >> execute some query and then return. Each time I run the unit test the JVM >> will increase it's memory usage until I get the error: >> >> Internal error when running tests: java.lang.OutOfMemoryError: PermGen >> space >> Exception in thread "Thread-2" java.io.EOFException >> >> As a work-around, I can fork a new JVM each time I run the unit test, >> however, it seems like a bad solution as takes a while to run the unit >> test. >> >> By the way, I tried to importing the TestHiveContext: >> >>- import org.apache.spark.sql.hive.test.TestHiveContext >> >> However, it suffers from the same memory issue. Has anyone else suffered >> from the same problem? Note that I am running these unit tests on my mac. >> >> Cheers, Mike. >> >> >
How to unit test HiveContext without OutOfMemoryError (using sbt)
Hello, I am using sbt and created a unit test where I create a `HiveContext` and execute some query and then return. Each time I run the unit test the JVM will increase it's memory usage until I get the error: Internal error when running tests: java.lang.OutOfMemoryError: PermGen space Exception in thread "Thread-2" java.io.EOFException As a work-around, I can fork a new JVM each time I run the unit test, however, it seems like a bad solution as takes a while to run the unit test. By the way, I tried to importing the TestHiveContext: - import org.apache.spark.sql.hive.test.TestHiveContext However, it suffers from the same memory issue. Has anyone else suffered from the same problem? Note that I am running these unit tests on my mac. Cheers, Mike.
Spark SQL window functions (RowsBetween)
Hi All, I would like some clarification regarding window functions for Apache Spark 1.4.0 - https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html In particular, the "rowsBetween" * {{{ * val w = Window.partitionBy("name").orderBy("id") * df.select( * sum("price").over(w.rangeBetween(Long.MinValue, 2)), * avg("price").over(w.rowsBetween(0, 4)) * ) * }}} Are any of the window functions available without a hive context? If the answer is no, then is there any other way to accomplish this without using hive? I need to compare the the i[th] row with the [i-1]th row of col2 (sorted by col1). If item_i of the i[th] row and the item_[i-1] of the [i-1]th row are different then I need to increment the count of item_[i-1] by 1. col1| col2 -- 1| item_1 2| item_1 3| item_2 4| item_1 5| item_2 6| item_1 In the above example, if we scan two rows at a time downwards, we see that row 2 and row 3 are different therefore we add one to item_1. Next, we see that row 3 is different from row 4, then add one to item_2. Continue until we end up with: col2 | col3 --- item_1 | 2 item_2 | 2 Thanks, Mike.
PySpark concurrent jobs using single SparkContext
Hi all, We're using Spark 1.3.0 via a small YARN cluster to do some log processing. The jobs are pretty simple, for a number of customers and a number of days, fetch some event log data, build aggregates and store those aggregates into a data store. The way our script is written right now does something akin to: with SparkContext() as sc: for customer in customers: for day in days: logs = sc.textFile(get_logs(customer, day)) aggregate = make_aggregate(logs) # This function contains the action saveAsNewAPIHadoopFile which # triggers a save save_aggregate(aggregate) So we have a Spark job per customer, per day. I tried doing some parallel job submission with something similar to: def make_and_save_aggregate(customer, day, spark_context): # Without a separate threading.Lock() here or better yet, one guarding the # Spark context, multiple customer/day transformations and actions could # be interweaved sc = spark_context logs = sc.textFile(get_logs(customer, day)) aggregate = make_aggregate(logs) save_aggregate(aggregate) with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor: for customer in customers: for day in days: executor.submit(make_and_save_aggregate, customer, day, sc) The problem is, with no locks on a SparkContext except during initialization <https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L214-L241> and shutdown <https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L296-L307>, operations on the context could (if I understand correctly) be interweaved leading to DAG which contains transformations out of order and from different customer, day periods. One solution is instead to launch multiple Spark jobs via spark-submit and let YARN/Spark's dynamic executor allocation take care of fair scheduling. In practice, this doesn't seem to yield very fast computation perhaps due to some additional overhead with YARN. Is there any safe way to launch concurrent jobs like this using a single PySpark context? -- Mike Sukmanowsky Aspiring Digital Carpenter *e*: mike.sukmanow...@gmail.com LinkedIn <http://www.linkedin.com/profile/view?id=10897143> | github <https://github.com/msukmanowsky>
Optimal way to implement a small lookup table for identifiers in an RDD
Hi All, I have an RDD of case class objects. scala> case class Entity( | value: String, | identifier: String | ) defined class Entity scala> Entity("hello", "id1") res25: Entity = Entity(hello,id1) During a map operation, I'd like to return a new RDD that contains all of the data of the original RDD with the addition of new data that was looked up based on the identifiers provided. The lookup table table in Cassandra looks something like... id| type -+- id1 | action id2 | view The end result would be an RDD of EntityExtended case class EntityExtended( value: String, identifier: String type: String ) I believe that it would make sense to use a broadcast variable. However, I'm not sure what the best way would be to incorporate it during a map operation. rdd.map(MyObject.extendEntity) object MyObject { def extendEntity(entity: Entity): EntityExtended = { val id = entity.identifier // lookup identifier in broadcast variable? } } Thanks, Mike.
control the number of reducers for groupby in data frame
Hi, Does anyone know how I could control the number of reducer when we do operation such as groupie For data frame? I could set spark.sql.shuffle.partitions in sql but not sure how to do in df.groupBy("XX") api. Thanks, Mike
streamingContext.stop(true,true) doesn't end the job
Hi all, I have a streaming job which reads messages from kafka via directStream, transforms it and writes it out to Cassandra. Also I use a large broadcast variable (~300MB) I tried to implement a stopping mechanism for this job , something like this : When I test it in local mode, the job doesn't finish. All I see in the log is this : Can someone point me , to what I'm doing wrong ? Thanks, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streamingContext-stop-true-true-doesn-t-end-the-job-tp24064.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data frames select and where clause dependency
Definitely, thanks Mohammed. On Mon, Jul 20, 2015 at 5:47 PM, Mohammed Guller wrote: > Thanks, Harish. > > > > Mike – this would be a cleaner version for your use case: > > df.filter(df("filter_field") === "value").select("field1").show() > > > > Mohammed > > > > *From:* Harish Butani [mailto:rhbutani.sp...@gmail.com] > *Sent:* Monday, July 20, 2015 5:37 PM > *To:* Mohammed Guller > *Cc:* Michael Armbrust; Mike Trienis; user@spark.apache.org > > *Subject:* Re: Data frames select and where clause dependency > > > > Yes via: org.apache.spark.sql.catalyst.optimizer.ColumnPruning > > See DefaultOptimizer.batches for list of logical rewrites. > > > > You can see the optimized plan by printing: df.queryExecution.optimizedPlan > > > > On Mon, Jul 20, 2015 at 5:22 PM, Mohammed Guller > wrote: > > Michael, > > How would the Catalyst optimizer optimize this version? > > df.filter(df("filter_field") === "value").select("field1").show() > > Would it still read all the columns in df or would it read only > “filter_field” and “field1” since only two columns are used (assuming other > columns from df are not used anywhere else)? > > > > Mohammed > > > > *From:* Michael Armbrust [mailto:mich...@databricks.com] > *Sent:* Friday, July 17, 2015 1:39 PM > *To:* Mike Trienis > *Cc:* user@spark.apache.org > *Subject:* Re: Data frames select and where clause dependency > > > > Each operation on a dataframe is completely independent and doesn't know > what operations happened before it. When you do a selection, you are > removing other columns from the dataframe and so the filter has nothing to > operate on. > > > > On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis > wrote: > > I'd like to understand why the where field must exist in the select > clause. > > > > For example, the following select statement works fine > >- df.select("field1", "filter_field").filter(df("filter_field") === >"value").show() > > However, the next one fails with the error "in operator !Filter > (filter_field#60 = value);" > >- df.select("field1").filter(df("filter_field") === "value").show() > > As a work-around, it seems that I can do the following > >- df.select("field1", "filter_field").filter(df("filter_field") === >"value").drop("filter_field").show() > > > > Thanks, Mike. > > > > >
[General Question] [Hadoop + Spark at scale] Spark Rack Awareness ?
I wanted to ask a general question about Hadoop/Yarn and Apache Spark integration. I know that Hadoop on a physical cluster has rack awareness. i.e. It attempts to minimise network traffic by saving replicated blocks within a rack. i.e. I wondered whether, when Spark is configured to use Yarn as a cluster manager, it is able to use this feature to also minimise network traffic to a degree. Sorry if this questionn is not quite accurate but I think you can generally see what I mean ?
Data frames select and where clause dependency
I'd like to understand why the where field must exist in the select clause. For example, the following select statement works fine - df.select("field1", "filter_field").filter(df("filter_field") === "value").show() However, the next one fails with the error "in operator !Filter (filter_field#60 = value);" - df.select("field1").filter(df("filter_field") === "value").show() As a work-around, it seems that I can do the following - df.select("field1", "filter_field").filter(df("filter_field") === "value").drop("filter_field").show() Thanks, Mike.
Splitting dataframe using Spark 1.4 for nested json input
Hello, I am having issues with splitting contents of a dataframe column using Spark 1.4. The dataframe was created by reading a nested complex json file. I used df.explode but keep getting error message. scala> val df = sqlContext.read.json("/Users/xx/target/statsfile.json") scala> df.show() +++ | mi|neid| +++ |[900,["pmEs","pmS...|[SubNetwork=ONRM_...| |[900,["pmIcmpInEr...|[SubNetwork=ONRM_...| |[900,pmUnsuccessf...|[SubNetwork=ONRM_...| +++ scala> df.printSchema() root |-- mi: struct (nullable = true) ||-- gp: long (nullable = true) ||-- mt: string (nullable = true) ||-- mts: string (nullable = true) ||-- mv: string (nullable = true) |-- neid: struct (nullable = true) ||-- nedn: string (nullable = true) ||-- nesw: string (nullable = true) ||-- neun: string (nullable = true) scala> val df1=df.select("mi.mv²) df1: org.apache.spark.sql.DataFrame = [mv: string] scala> val df1=df.select("mi.mv").show() ++ | mv| ++ |[{"r":[0,0,0],"mo...| |{"r":[0,4,0,4],"m...| |{"r":5,"moid":"Ma...| ++ scala> df1.explode("mv","mvnew")(mv => mv.split(",")) :28: error: value split is not a member of Nothing df1.explode("mv","mvnew")(mv => mv.split(",")) The json file format looks like [ { "neid":{ }, "mi":{ "mts":"20100609071500Z", "gp":"900", "tMOID":"Aal2Ap", "mt":[ ], "mv":[ { "moid":"ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1552q", "r": [ .] }, { "moid":"ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1556q", "r": [ .] } ] } } ] Am i doing something wrong? I need to extract data under mi.mv in separate columns so i can apply some transformations. Regards Mike
Error with splitting contents of a dataframe column using Spark 1.4 for nested complex json file
Hello, I am having issues with splitting contents of a dataframe column using Spark 1.4. The dataframe was created by reading a nested complex json file. I used df.explode but keep getting error message. The json file format looks like [ { "neid":{ }, "mi":{ "mts":"20100609071500Z", "gp":"900", "tMOID":"Aal2Ap", "mt":[ ], "mv":[ { "moid":"ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1552q", "r": [ 1, 2, 5 ] }, { "moid":"ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1542q", "r": [ 1, 2, 5 ] } ] } }, { "neid":{ "neun":"RC003", "nedn":"SubNetwork=ONRM_RootMo_R,SubNetwork=RC003,MeContext=RC003", "nesw":"CP90831_R9YC/11" }, "mi":{ "mts":"20100609071500Z", "gp":"900", "tMOID":"PlugInUnit", "mt":"pmProcessorLoad", "mv":[ { "moid":"ManagedElement=1,Equipment=1,Subrack=MS,Slot=6,PlugInUnit=1", "r": [ 1, 2, 5 ] }, { "moid":"ManagedElement=1,Equipment=1,Subrack=ES-1,Slot=1,PlugInUnit=1", "r": [ 1, 2, 5 ] } ] } } ] scala> val df = sqlContext.read.json("/Users/xx/target/statsfile.json") scala> df.show() +++ | mi|neid| +++ |[900,["pmEs","pmS...|[SubNetwork=ONRM_...| |[900,["pmIcmpInEr...|[SubNetwork=ONRM_...| |[900,pmUnsuccessf...|[SubNetwork=ONRM_...| |[900,["pmBwErrBlo...|[SubNetwork=ONRM_...| |[900,["pmSctpStat...|[SubNetwork=ONRM_...| |[900,["pmLinkInSe...|[SubNetwork=ONRM_...| |[900,["pmGrFc","p...|[SubNetwork=ONRM_...| |[900,["pmReceived...|[SubNetwork=ONRM_...| |[900,["pmIvIma","...|[SubNetwork=ONRM_...| |[900,["pmEs","pmS...|[SubNetwork=ONRM_...| |[900,["pmEs","pmS...|[SubNetwork=ONRM_...| |[900,["pmExisOrig...|[SubNetwork=ONRM_...| |[900,["pmHDelayVa...|[SubNetwork=ONRM_...| |[900,["pmReceived...|[SubNetwork=ONRM_...| |[900,["pmReceived...|[SubNetwork=ONRM_...| |[900,["pmAverageR...|[SubNetwork=ONRM_...| |[900,["pmDchFrame...|[SubNetwork=ONRM_...| |[900,["pmReceived...|[SubNetwork=ONRM_...| |[900,["pmNegative...|[SubNetwork=ONRM_...| |[900,["pmUsedTbsQ...|[SubNetwork=ONRM_...| +++ scala> df.printSchema() root |-- mi: struct (nullable = true) ||-- gp: long (nullable = true) ||-- mt: string (nullable = true) ||-- mts: string (nullable = true) ||-- mv: string (nullable = true) |-- neid: struct (nullable = true) ||-- nedn: string (nullable = true) ||-- nesw: string (nullable = true) ||-- neun: string (nullable = true) scala> val df1=df.select("mi.mv²) df1: org.apache.spark.sql.DataFrame = [mv: string] scala> val df1=df.select("mi.mv").show() ++ | mv| ++ |[{"r":[0,0,0],"mo...| |{"r":[0,4,0,4],"m...| |{"r":5,"moid":"Ma...| |[{"r":[2147483647...| |{"r":[225,1112986...| |[{"r":[83250,0,0,...| |[{"r":[1,2,529982...| |[{"r":[26998564,0...| |[{"r":[0,0,0,0,0,...| |[{"r":[0,0,0],"mo...| |[{"r":[0,0,0],"mo...| |{"r":[0,0,0,0,0,0...| |{"r":[0,0,1],"moi...| |{"r":[4587,4587],...| |[{"r":[180,180],"...| |[{"r":["0,0,0,0,0...| |{"r":[0,35101,0,0...| |[{"r":["0,0,0,0,0...| |[{"r":[0,1558],"m...| |[{"r":["7484,4870...| ++ scala> df1.explode("mv","mvnew")(mv => mv.split(",")) :28: error: value split is not a member of Nothing df1.explode("mv","mvnew")(mv => mv.split(",")) Am i doing something wrong? I need to extract data under mi.mv in separate columns so i can apply some transformations. Regards Mike
[Spark 1.3.1] Spark HiveQL -> CDH 5.3 Hive 0.13 UDF's
Hi I have a five node CDH 5.3 cluster running on CentOS 6.5, I also have a separate install of Spark 1.3.1. ( The CDH 5.3 install has Spark 1.2 but I wanted a newer version. ) I managed to write some Scala based code using a Hive Context to connect to Hive and create/populate tables etc. I compiled my application using sbt and ran it with spark-submit in local mode. My question concerns UDF's, specifically the function row_sequence function in the hive-contrib jar file i.e. hiveContext.sql(""" ADD JAR /opt/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/jars/hive-contrib-0.13.1-cdh5.3.3.jar """) hiveContext.sql(""" CREATE TEMPORARY FUNCTION row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'; """) val resRDD = hiveContext.sql(""" SELECT row_sequence(),t1.edu FROM ( SELECT DISTINCT education AS edu FROM adult3 ) t1 ORDER BY t1.edu """) This seems to generate its sequence in the map (?) phase of execution because no matter how I fiddle with the main SQL I could not get an ascending index for dimension data. i.e. I always get 1 val1 1 val2 1 val3 instead of 1 val1 2 val2 3 val3 Im well aware that I can play with scala and get around this issue and I have but I wondered whether others have come across this and solved it ? cheers Mike F
Aggregating metrics using Cassandra and Spark streaming
Hello, I'd like to understand how other people have been aggregating metrics using Spark Streaming and Cassandra database. Currently I have design some data models that will stored the rolled up metrics. There are two models that I am considering: CREATE TABLE rollup_using_counters ( metric_1 text, metric_1_value counter ); The model above is nice because I only need to write and execute a single query when updating the counter value. The problem is that I need these counter values to be fairly accurate and based on some discussions from the Cassandra folks it sounds like there is some potential for over and under counting if the database is under load. CREATE TABLE rollup_using_update ( metric_1 text, metric_1_value int ); This model on the other hand will require the metric values to be updated and therefore the operation is idempotent. The problem is that I will need to read the metrics into the Spark streaming application and perform the addition prior to the writing the result to Cassandra. I believe that ensures that the metrics are accurate but I believe it also introduces a lot of complexity and possibly latency into my Spark streaming application. Has any one else run into this problem before and how did you solve it? Thanks, Mike. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: understanding on the "waiting batches" and "scheduling delay" in Streaming UI
Hi Das, Thanks for your reply. Somehow I missed it.. I am using Spark 1.3. The data source is from kafka. Yeah, not sure why the delay is 0. I'll run against 1.4 and give a screenshot. Thanks, Mike From: Akhil Das mailto:ak...@sigmoidanalytics.com>> Date: Thursday, June 18, 2015 at 6:05 PM To: Mike Fang mailto:chyfan...@gmail.com>> Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" mailto:user@spark.apache.org>> Subject: Re: understanding on the "waiting batches" and "scheduling delay" in Streaming UI Which version of spark? and what is your data source? For some reason, your processing delay is exceeding the batch duration. And its strange that you are not seeing any scheduling delay. Thanks Best Regards On Thu, Jun 18, 2015 at 7:29 AM, Mike Fang mailto:chyfan...@gmail.com>> wrote: Hi, I have a spark streaming program running for ~ 25hrs. When I check the Streaming UI tab. I found the "Waiting batches" is 144. But the "scheduling delay" is 0. I am a bit confused. If the "waiting batches" is 144, that means many batches are waiting in the queue to be processed? If this is the case, the scheduling delay should be high rather than 0. Am I missing anything? Thanks, Mike
[Spark 1.3.1 SQL] Using Hive
Hi Is it true that if I want to use Spark SQL ( for Spark 1.3.1 ) against Apache Hive I need to build a source version of Spark ? Im using CDH 5.3 on CentOS Linux 6.5 which uses Hive 0.13.0 ( I think ). cheers Mike F
understanding on the "waiting batches" and "scheduling delay" in Streaming UI
Hi, I have a spark streaming program running for ~ 25hrs. When I check the Streaming UI tab. I found the “Waiting batches” is 144. But the “scheduling delay” is 0. I am a bit confused. If the “waiting batches” is 144, that means many batches are waiting in the queue to be processed? If this is the case, the scheduling delay should be high rather than 0. Am I missing anything? Thanks, Mike
questions on the "waiting batches" and "scheduling delay" in Streaming UI
Hi, I have a spark streaming program running for ~ 25hrs. When I check the Streaming UI tab. I found the "Waiting batches" is 144. But the "scheduling delay" is 0. I am a bit confused. If the "waiting batches" is 144, that means many batches are waiting in the queue to be processed? If this is the case, the scheduling delay should be high rather than 0. Am I missing anything? Thanks, Mike
spark stream twitter question ..
Hi I have a question about Spark Twitter stream processing in Spark 1.3.1, the code sample below just opens up a twitter stream, uses auth keys, splits out has tags and creates a temp table. However, when I try to compile it using sbt ( CentOS 6.5) I get the error [error] /home/hadoop/spark/twitter1/src/main/scala/twitter1.scala:54: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] [error] val dfHashTags = rdd.map(hashT => Row(hashT) ).toDF() I know that I need to "import sqlContext.implicits._" which is what Ive tried but I still get the error. Can anyone advise ? import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.sql._ import org.apache.spark.sql.types.{StructType,StructField,StringType} object twitter1 { def main(args: Array[String]) { // create a spark conf and context val appName = "Twitter example 1" val conf= new SparkConf() conf.setAppName(appName) val sc = new SparkContext(conf) // set twitter auth key values val consumerKey = "QQxxx" val consumerSecret = "0HFxxx" val accessToken = "32394xxx" val accessTokenSecret = "IlQvscxxx" // set twitter auth properties // https://apps.twitter.com/ System.setProperty("twitter4j.oauth.consumerKey", consumerKey) System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) System.setProperty("twitter4j.oauth.accessToken", accessToken) System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) val ssc= new StreamingContext(sc, Seconds(5) ) val stream = TwitterUtils.createStream(ssc,None) val hashTags = stream.flatMap( status => status.getText.split(" ").filter(_.startsWith("#"))) // val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ hashTags.foreachRDD{ rdd => val dfHashTags = rdd.map(hashT => Row(hashT) ).toDF() dfHashTags.registerTempTable("tweets") } // extra stuff here ssc.start() ssc.awaitTermination() } // end main } // end twitter1 cheers Mike F
Re: Managing spark processes via supervisord
Thanks Ignor, I managed to find a fairly simple solution. It seems that the shell scripts (e.g. .start-master.sh, start-slave.sh) end up executing /bin/spark-class which is always run in the foreground. Here is a solution I provided on stackoverflow: - http://stackoverflow.com/questions/30672648/how-to-autostart-an-apache-spark-cluster-using-supervisord/30676844#30676844 Cheers Mike On Wed, Jun 3, 2015 at 12:29 PM, Igor Berman wrote: > assuming you are talking about standalone cluster > imho, with workers you won't get any problems and it's straightforward > since they are usually foreground processes > with master it's a bit more complicated, ./sbin/start-master.sh goes > background which is not good for supervisor, but anyway I think it's > doable(going to setup it too in a few days) > > On 3 June 2015 at 21:46, Mike Trienis wrote: > >> Hi All, >> >> I am curious to know if anyone has successfully deployed a spark cluster >> using supervisord? >> >>- http://supervisord.org/ >> >> Currently I am using the cluster launch scripts which are working >> greater, however, every time I reboot my VM or development environment I >> need to re-launch the cluster. >> >> I am considering using supervisord to control all the processes (worker, >> master, ect.. ) in order to have the cluster up an running after boot-up; >> although I'd like to understand if it will cause more issues than it >> solves. >> >> Thanks, Mike. >> >> >
Managing spark processes via supervisord
Hi All, I am curious to know if anyone has successfully deployed a spark cluster using supervisord? - http://supervisord.org/ Currently I am using the cluster launch scripts which are working greater, however, every time I reboot my VM or development environment I need to re-launch the cluster. I am considering using supervisord to control all the processes (worker, master, ect.. ) in order to have the cluster up an running after boot-up; although I'd like to understand if it will cause more issues than it solves. Thanks, Mike.
Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)
Yup, and since I have only one core per executor it explains why there was only one executor utilized. I'll need to investigate which EC2 instance type is going to be the best fit. Thanks Evo. On Fri, May 22, 2015 at 3:47 PM, Evo Eftimov wrote: > A receiver occupies a cpu core, an executor is simply a jvm instance and > as such it can be granted any number of cores and ram > > So check how many cores you have per executor > > > Sent from Samsung Mobile > > > Original message > From: Mike Trienis > Date:2015/05/22 21:51 (GMT+00:00) > To: user@spark.apache.org > Subject: Re: Spark Streaming: all tasks running on one executor (Kinesis + > Mongodb) > > I guess each receiver occupies a executor. So there was only one executor > available for processing the job. > > On Fri, May 22, 2015 at 1:24 PM, Mike Trienis > wrote: > >> Hi All, >> >> I have cluster of four nodes (three workers and one master, with one core >> each) which consumes data from Kinesis at 15 second intervals using two >> streams (i.e. receivers). The job simply grabs the latest batch and pushes >> it to MongoDB. I believe that the problem is that all tasks are executed on >> a single worker node and never distributed to the others. This is true even >> after I set the number of concurrentJobs to 3. Overall, I would really like >> to increase throughput (i.e. more than 500 records / second) and understand >> why all executors are not being utilized. >> >> Here are some parameters I have set: >> >>- >>- spark.streaming.blockInterval 200 >>- spark.locality.wait 500 >>- spark.streaming.concurrentJobs 3 >> >> This is the code that's actually doing the writing: >> >> def write(rdd: RDD[Data], time:Time) : Unit = { >> val result = doSomething(rdd, time) >> result.foreachPartition { i => >> i.foreach(record => connection.insert(record)) >> } >> } >> >> def doSomething(rdd: RDD[Data]) : RDD[MyObject] = { >> rdd.flatMap(MyObject) >> } >> >> Any ideas as to how to improve the throughput? >> >> Thanks, Mike. >> > >
Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)
I guess each receiver occupies a executor. So there was only one executor available for processing the job. On Fri, May 22, 2015 at 1:24 PM, Mike Trienis wrote: > Hi All, > > I have cluster of four nodes (three workers and one master, with one core > each) which consumes data from Kinesis at 15 second intervals using two > streams (i.e. receivers). The job simply grabs the latest batch and pushes > it to MongoDB. I believe that the problem is that all tasks are executed on > a single worker node and never distributed to the others. This is true even > after I set the number of concurrentJobs to 3. Overall, I would really like > to increase throughput (i.e. more than 500 records / second) and understand > why all executors are not being utilized. > > Here are some parameters I have set: > >- >- spark.streaming.blockInterval 200 >- spark.locality.wait 500 >- spark.streaming.concurrentJobs 3 > > This is the code that's actually doing the writing: > > def write(rdd: RDD[Data], time:Time) : Unit = { > val result = doSomething(rdd, time) > result.foreachPartition { i => > i.foreach(record => connection.insert(record)) > } > } > > def doSomething(rdd: RDD[Data]) : RDD[MyObject] = { > rdd.flatMap(MyObject) > } > > Any ideas as to how to improve the throughput? > > Thanks, Mike. >
Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)
Hi All, I have cluster of four nodes (three workers and one master, with one core each) which consumes data from Kinesis at 15 second intervals using two streams (i.e. receivers). The job simply grabs the latest batch and pushes it to MongoDB. I believe that the problem is that all tasks are executed on a single worker node and never distributed to the others. This is true even after I set the number of concurrentJobs to 3. Overall, I would really like to increase throughput (i.e. more than 500 records / second) and understand why all executors are not being utilized. Here are some parameters I have set: - - spark.streaming.blockInterval 200 - spark.locality.wait 500 - spark.streaming.concurrentJobs 3 This is the code that's actually doing the writing: def write(rdd: RDD[Data], time:Time) : Unit = { val result = doSomething(rdd, time) result.foreachPartition { i => i.foreach(record => connection.insert(record)) } } def doSomething(rdd: RDD[Data]) : RDD[MyObject] = { rdd.flatMap(MyObject) } Any ideas as to how to improve the throughput? Thanks, Mike.
Spark sql and csv data processing question
Hi Im getting the following error when trying to process a csv based data file. Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 10.0 failed 4 times, most recent failure: Lost task 1.3 in stage 10.0 (TID 262, hc2r1m3.semtech-solutions.co.nz): java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) I have made sure that none of my data rows are empty and that they all have 15 records. I have also physically checked the data. The error occurs when I run the actual spark sql on the last line. The script is as follows. val server= "hdfs://hc2nn.semtech-solutions.co.nz:8020" val path = "/data/spark/h2o/" val train_csv = server + path + "adult.train.data" // 32,562 rows val test_csv = server + path + "adult.test.data" // 16,283 rows // load the data val rawTrainData = sparkCxt.textFile(train_csv) val rawTestData = sparkCxt.textFile(test_csv) // create a spark sql schema for the row val schemaString = "age workclass fnlwgt education educationalnum maritalstatus" + " occupation relationship race gender capitalgain capitalloss" + " hoursperweek nativecountry income" val schema = StructType( schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, false))) // create an RDD from the raw training data val trainRDD = rawTrainData .filter(!_.isEmpty) .map(rawRow => Row.fromSeq(rawRow.split(",") .filter(_.length == 15) .map(_.toString).map(_.trim) )) println( "> Raw Training Data Count = " + trainRDD.count() ) val testRDD = rawTestData .filter(!_.isEmpty) .map(rawRow => Row.fromSeq(rawRow.split(",") .filter(_.length == 15) .map(_.toString).map(_.trim) )) println( "> Raw Testing Data Count = " + testRDD.count() ) // create a schema RDD val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema) val testSchemaRDD = sqlContext.applySchema(testRDD, schema) // register schema RDD as a table trainSchemaRDD.registerTempTable("trainingTable") testSchemaRDD.registerTempTable("testingTable") println( "> Schema RDD Training Data Count = " + trainSchemaRDD.count() ) println( "> Schema RDD Testing Data Count = " + testSchemaRDD.count() ) // now run sql against the table to filter the data val schemaRddTrain = sqlContext.sql( "SELECT "+ "age,workclass,education,maritalstatus,occupation,relationship,race,"+ "gender,hoursperweek,nativecountry,income "+ "FROM trainingTable LIMIT 5000") println( "> Training Data Count = " + schemaRddTrain.count() ) Any advice is appreciated :)
Spark SQL ArrayIndexOutOfBoundsException
Sorry if this email is a duplicate, I realised that I was not registered with the mailing list ... I am having a problem with a spark sql script which is running on a spark 1.2 CentOS CDH 5.3 mini 5 node cluster. The script processes some image csv data each record/line of which has 28x28 integer elements ending with an integer label value which describes the record. I have checked the data file rows to ensure that they contain the correct number of elements. [hadoop@hc2r1m2 h2o_spark_1_2]$ cat chk_csv.bash #!/bin/bash MYFILE=$1 echo "" echo "check $MYFILE" echo "" cat $MYFILE | while read line do echo -n "Line records = " echo $line | tr "," "\n" | wc -l done [hadoop@hc2r1m2 h2o_spark_1_2]$ ./chk_csv.bash mnist_train_1x.csv check mnist_train_1x.csv Line records = 785 Line records = 785 Line records = 785 Line records = 785 Line records = 785 Line records = 785 Line records = 785 Line records = 785 Line records = 785 Line records = 785 The basic script looks like this .. // create a spark conf and context val sparkMaster = "spark://hc2nn.semtech-solutions.co.nz:7077" val appName = "Spark ex3" val conf = new SparkConf() conf.setMaster(sparkMaster) conf.setAppName(appName) val sparkCxt = new SparkContext(conf) // Initialize SQL context import org.apache.spark.sql._ implicit val sqlContext = new SQLContext(sparkCxt) // prep the hdfs based data val server= "hdfs://hc2nn.semtech-solutions.co.nz:8020" val path = "/data/spark/h2o/" val train_csv = server + path + "mnist_train_1x.csv" val schemaString = getSchema() // string representing schema 28 x 28 int plus label int val schema = StructType( schemaString.split(" ") .map(fieldName => StructField(fieldName, IntegerType, false))) val rawTrainData = sparkCxt.textFile(train_csv) val trainRDD = rawTrainData.map( rawRow => Row( rawRow.split(",") .map(_.toInt) ) ) val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema) trainSchemaRDD.registerTempTable("trainingTable") val resSchemaRddTrain = sqlContext.sql("""SELECT P1,Label FROM trainingTable""".stripMargin) println( " > sqlResult rows = " + resSchemaRddTrain.count() ) val resArray = resSchemaRddTrain.collect() // collect results in java.lang.ArrayIndexOutOfBoundsException No matter what I try I get an ArrayIndexOutOfBoundsException when I try to examine the result of the select even though I know that there are 10 result rows. I dont think its a data issue as both the schema and all data rows have 28x28+1 = 785 elements. Advice appreciated
Re: Spark + Kinesis + Stream Name + Cache?
Hey Chris! I was happy to see the documentation outlining that issue :-) However, I must have got into a pretty terrible state because I had to delete and recreate the kinesis streams as well as the DynamoDB tables. Thanks for the reply, everything is sorted. Mike On Fri, May 8, 2015 at 7:55 PM, Chris Fregly wrote: > hey mike- > > as you pointed out here from my docs, changing the stream name is > sometimes problematic due to the way the Kinesis Client Library manages > leases and checkpoints, etc in DynamoDB. > > I noticed this directly while developing the Kinesis connector which is > why I highlighted the issue here. > > is wiping out the DynamoDB table a suitable workaround for now? usually > in production, you wouldn't be changing stream names often, so hopefully > that's ok for dev. > > otherwise, can you share the relevant spark streaming logs that are > generated when you do this? > > I saw a lot of "lease not owned by this Kinesis Client" type of errors, > from what I remember. > > lemme know! > > -Chris > > On May 8, 2015, at 4:36 PM, Mike Trienis wrote: > > >- [Kinesis stream name]: The Kinesis stream that this streaming >application receives from > - The application name used in the streaming context becomes the > Kinesis application name > - The application name must be unique for a given account and > region. > - The Kinesis backend automatically associates the application name > to the Kinesis stream using a DynamoDB table (always in the us-east-1 > region) created during Kinesis Client Library initialization. > - *Changing the application name or stream name can lead to Kinesis > errors in some cases. If you see errors, you may need to manually delete > the DynamoDB table.* > > > On Fri, May 8, 2015 at 2:06 PM, Mike Trienis > wrote: > >> Hi All, >> >> I am submitting the assembled fat jar file by the command: >> >> bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar >> --class com.xxx.Consumer -0.1-SNAPSHOT.jar >> >> It reads the data file from kinesis using the stream name defined in a >> configuration file. It turns out that it reads the data perfectly fine for >> one stream name (i.e. the default), however, if I switch the stream name >> and re-submit the jar, it no longer reads the data. From CloudWatch, I can >> see that there is data put into the stream and spark is actually sending >> get requests as well. However, it doesn't seem to be outputting the data. >> >> Has anyone else encountered a similar issue? Does spark cache the stream >> name somewhere? I also have checkpointing enabled as well. >> >> Thanks, Mike. >> >> >> >> >> >> >
Re: Spark + Kinesis + Stream Name + Cache?
- [Kinesis stream name]: The Kinesis stream that this streaming application receives from - The application name used in the streaming context becomes the Kinesis application name - The application name must be unique for a given account and region. - The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization. - *Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table.* On Fri, May 8, 2015 at 2:06 PM, Mike Trienis wrote: > Hi All, > > I am submitting the assembled fat jar file by the command: > > bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar > --class com.xxx.Consumer -0.1-SNAPSHOT.jar > > It reads the data file from kinesis using the stream name defined in a > configuration file. It turns out that it reads the data perfectly fine for > one stream name (i.e. the default), however, if I switch the stream name > and re-submit the jar, it no longer reads the data. From CloudWatch, I can > see that there is data put into the stream and spark is actually sending > get requests as well. However, it doesn't seem to be outputting the data. > > Has anyone else encountered a similar issue? Does spark cache the stream > name somewhere? I also have checkpointing enabled as well. > > Thanks, Mike. > > > > > >
Spark + Kinesis + Stream Name + Cache?
Hi All, I am submitting the assembled fat jar file by the command: bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar --class com.xxx.Consumer -0.1-SNAPSHOT.jar It reads the data file from kinesis using the stream name defined in a configuration file. It turns out that it reads the data perfectly fine for one stream name (i.e. the default), however, if I switch the stream name and re-submit the jar, it no longer reads the data. From CloudWatch, I can see that there is data put into the stream and spark is actually sending get requests as well. However, it doesn't seem to be outputting the data. Has anyone else encountered a similar issue? Does spark cache the stream name somewhere? I also have checkpointing enabled as well. Thanks, Mike.
Re: sbt-assembly spark-streaming-kinesis-asl error
Richard, You response was very helpful and actually resolved my issue. In case others run into a similar issue, I followed the procedure: - Upgraded to spark 1.3.0 - Add all spark related libraries are "provided" - Include spark transitive library dependencies where my build.sbt file libraryDependencies ++= { Seq( "org.apache.spark" %% "spark-core" % "1.3.0" % "provided", "org.apache.spark" %% "spark-streaming" % "1.3.0" % "provided", "org.apache.spark" %% "spark-streaming-kinesis-asl" % "1.3.0" % "provided", "joda-time" % "joda-time" % "2.2", "org.joda" % "joda-convert" % "1.2", "com.amazonaws" % "aws-java-sdk" % "1.8.3", "com.amazonaws" % "amazon-kinesis-client" % "1.2.0") and submitting a spark job can done via sh ./spark-1.3.0-bin-cdh4/bin/spark-submit --jars spark-streaming-kinesis-asl_2.10-1.3.0.jar --verbose --class com.xxx.MyClass target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar Thanks again Richard! Cheers Mike. On Tue, Apr 14, 2015 at 11:01 AM, Richard Marscher wrote: > Hi, > > I've gotten an application working with sbt-assembly and spark, thought > I'd present an option. In my experience, trying to bundle any of the Spark > libraries in your uber jar is going to be a major pain. There will be a lot > of deduplication to work through and even if you resolve them it can be > easy to do it incorrectly. I considered it an intractable problem. So the > alternative is to not include those jars in your uber jar. For this to work > you will need the same libraries on the classpath of your Spark cluster and > your driver program (if you are running that as an application and not just > using spark-submit). > > As for your NoClassDefFoundError, you either are missing Joda Time in your > runtime classpath or have conflicting versions. It looks like something > related to AWS wants to use it. Check your uber jar to see if its including > the org/joda/time as well as the classpath of your spark cluster. For > example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory > has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark > 1.2 I found a conflict between httpclient versions that my uber jar pulled > in for AWS libraries and the one bundled in the spark uber jar. I hand > patched the spark uber jar to remove the offending httpclient bytecode to > resolve the issue. You may be facing a similar situation. > > I hope that gives some ideas for resolving your issue. > > Regards, > Rich > > On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis > wrote: > >> Hi Vadim, >> >> After removing "provided" from "org.apache.spark" %% >> "spark-streaming-kinesis-asl" I ended up with huge number of deduplicate >> errors: >> >> https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a >> >> It would be nice if you could share some pieces of your mergeStrategy >> code for reference. >> >> Also, after adding "provided" back to "spark-streaming-kinesis-asl" and I >> submit the spark job with the spark-streaming-kinesis-asl jar file >> >> sh /usr/lib/spark/bin/spark-submit --verbose --jars >> lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer >> target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar >> >> I still end up with the following error... >> >> Exception in thread "main" java.lang.NoClassDefFoundError: >> org/joda/time/format/DateTimeFormat >> at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44) >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >> at java.lang.Class.newInstance(Class.java:379) >> >> Has anyone else run into this issue? >> >> >> >> On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy < >> vadim.bichuts...@gmail.com> wrote: >> >>> I don't believe the Kinesis asl should be provided. I used mergeStrategy >>> successfully to produce an "uber jar." >>> >>> Fyi, I've been having trouble consuming data out of Kinesis with Spark >>> with no success :( >>> Would be curio
Re: sbt-assembly spark-streaming-kinesis-asl error
Hi Vadim, After removing "provided" from "org.apache.spark" %% "spark-streaming-kinesis-asl" I ended up with huge number of deduplicate errors: https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a It would be nice if you could share some pieces of your mergeStrategy code for reference. Also, after adding "provided" back to "spark-streaming-kinesis-asl" and I submit the spark job with the spark-streaming-kinesis-asl jar file sh /usr/lib/spark/bin/spark-submit --verbose --jars lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar I still end up with the following error... Exception in thread "main" java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) Has anyone else run into this issue? On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy < vadim.bichuts...@gmail.com> wrote: > I don't believe the Kinesis asl should be provided. I used mergeStrategy > successfully to produce an "uber jar." > > Fyi, I've been having trouble consuming data out of Kinesis with Spark > with no success :( > Would be curious to know if you got it working. > > Vadim > > On Apr 13, 2015, at 9:36 PM, Mike Trienis wrote: > > Hi All, > > I have having trouble building a fat jar file through sbt-assembly. > > [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' > [warn] Merging 'META-INF/NOTICE' with strategy 'rename' > [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' > [warn] Merging 'META-INF/LICENSE' with strategy 'rename' > [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' > [warn] Merging > 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with > strategy 'discard' > [warn] Merging > 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy > 'discard' > [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' > with strategy 'discard' > [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with > strategy 'discard' > [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' > with strategy 'discard' > [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with > strategy 'discard' > [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with > strategy 'discard' > [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy > 'discard' > [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy > 'discard' > [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' > [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with > strategy 'discard' > [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with > strategy 'discard' > [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with > strategy 'discard' > [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy > 'discard' > [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' > with strategy 'discard' > [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with > strategy 'discard' > [warn] Merging 'META-INF/services/java.sql.Driver' with strategy > 'filterDistinctLines' > [warn] Merging 'rootdoc.txt' with strategy 'concat' > [warn] Strategy 'concat' was applied to a file > [warn] Strategy 'discard' was applied to 17 files > [warn] Strategy 'filterDistinctLines' was applied to a file > [warn] Strategy 'rename' was applied to 4 files > > When submitting the spark application through the command > > sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName > target/scala-2.10/-snapshot.jar > > I end up the the following error, > > Exception in thread "main" java.lang.NoClassDefFoundErr
Re: sbt-assembly spark-streaming-kinesis-asl error
Thanks Vadim, I can certainly consume data from a Kinesis stream when running locally. I'm currently in the processes of extending my work to a proper cluster (i.e. using a spark-submit job via uber jar). Feel free to add me to gmail chat and maybe we can help each other. On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy < vadim.bichuts...@gmail.com> wrote: > I don't believe the Kinesis asl should be provided. I used mergeStrategy > successfully to produce an "uber jar." > > Fyi, I've been having trouble consuming data out of Kinesis with Spark > with no success :( > Would be curious to know if you got it working. > > Vadim > > On Apr 13, 2015, at 9:36 PM, Mike Trienis wrote: > > Hi All, > > I have having trouble building a fat jar file through sbt-assembly. > > [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' > [warn] Merging 'META-INF/NOTICE' with strategy 'rename' > [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' > [warn] Merging 'META-INF/LICENSE' with strategy 'rename' > [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' > [warn] Merging > 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with > strategy 'discard' > [warn] Merging > 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy > 'discard' > [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' > with strategy 'discard' > [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with > strategy 'discard' > [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' > with strategy 'discard' > [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with > strategy 'discard' > [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with > strategy 'discard' > [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy > 'discard' > [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy > 'discard' > [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' > [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with > strategy 'discard' > [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with > strategy 'discard' > [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with > strategy 'discard' > [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy > 'discard' > [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' > with strategy 'discard' > [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with > strategy 'discard' > [warn] Merging 'META-INF/services/java.sql.Driver' with strategy > 'filterDistinctLines' > [warn] Merging 'rootdoc.txt' with strategy 'concat' > [warn] Strategy 'concat' was applied to a file > [warn] Strategy 'discard' was applied to 17 files > [warn] Strategy 'filterDistinctLines' was applied to a file > [warn] Strategy 'rename' was applied to 4 files > > When submitting the spark application through the command > > sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName > target/scala-2.10/-snapshot.jar > > I end up the the following error, > > Exception in thread "main" java.lang.NoClassDefFoundError: > org/joda/time/format/DateTimeFormat > at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at java.lang.Class.newInstance(Class.java:379) > at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) > at > com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105) > at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78) > at > com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307) > at > com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280) > at > com.amazonaws.Amazo
sbt-assembly spark-streaming-kinesis-asl error
Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with strategy 'discard' [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with strategy 'discard' [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 'filterDistinctLines' [warn] Merging 'rootdoc.txt' with strategy 'concat' [warn] Strategy 'concat' was applied to a file [warn] Strategy 'discard' was applied to 17 files [warn] Strategy 'filterDistinctLines' was applied to a file [warn] Strategy 'rename' was applied to 4 files When submitting the spark application through the command sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName target/scala-2.10/-snapshot.jar I end up the the following error, Exception in thread "main" java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) at com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105) at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78) at com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307) at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280) at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160) at com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216) at com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:202) at com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:175) at com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:155) at com.quickstatsengine.aws.AwsProvider$.(AwsProvider.scala:20) at com.quickstatsengine.aws.AwsProvider$.(AwsProvider.scala) The snippet from my build.sbt file is: "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", "org.apache.spark" %% "spark-streaming" % "1.2.0" % "provided", "com.datastax.spark" %% "spark-cassandra-connector" % "1.2.0-alpha1" % "provided", "org.apache.spark" %% "spark-streaming-kinesis-asl" % "1.2.0" % "provided", And the error is originating from: val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) Am I correct to set spark-streaming-kinesis-asl as a *provided *dependency? Also, is there a merge strategy I need apply? Any help would be appreciated, Mike.
Re: MLlib : Gradient Boosted Trees classification confidence
Thank you Peter. I just want to be sure. even if I use the "classification" setting the GBT uses regression trees and not classification trees? I know the difference between the two(theoretically) is only in the loss and impurity functions. thus in case it uses classification trees doing what you proposed will result in the classification it self. Also by looking in the scala API I found that each Node holds a Predict object which contains "probability of the label (classification only)" ( https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.mllib.tree.model.Predict ) ** This what i called confidence So to sum-up my questions and confusion: 1. Does GBT uses classification trees when setting it to classification or it always uses regression trees ? 2. In case it uses classification trees , How could i efficiently get to the confidence = Node. Predict.prob ? Thanks again' Michael On Mon, Apr 13, 2015 at 10:13 AM, pprett [via Apache Spark User List] < ml-node+s1001560n22470...@n3.nabble.com> wrote: > Hi Mike, > > Gradient Boosted Trees (or gradient boosted regression trees) dont store > probabilities in each leaf node but rather model a continuous function > which is then transformed via a logistic sigmoid (ie. like in a Logistic > Regression model). > If you are just interested in a confidence, you can use this continuous > function directly: its just the (weighted) sum of the predictions of the > individual regression trees. Use the absolute value for confidence and the > sign to determine which class label. > Here is an example: > > def score(features: Vector): Double = { > val treePredictions = gbdt.trees.map(_.predict(features)) > blas.ddot(gbdt.numTrees, treePredictions, 1, gbdt.treeWeights, 1) > } > > If you are rather interested in probabilities, just pass the function > value to a logistic sigmoid. > > best, > Peter > > -- > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Gradient-Boosted-Trees-classification-confidence-tp22466p22470.html > To unsubscribe from MLlib : Gradient Boosted Trees classification > confidence, click here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=22466&code=bWljaGFlbGtyYXNAZ21haWwuY29tfDIyNDY2fDQxMDYzODQ0Mw==> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Gradient-Boosted-Trees-classification-confidence-tp22466p22476.html Sent from the Apache Spark User List mailing list archive at Nabble.com.