New to spark 2.2.1 - Problem with finding tables between different metastore db
All, I am new to Spark 2.2.1. I have a single node cluster and also have enabled thriftserver for my Tableau application to connect to my persisted table. I feel that the spark cluster metastore is different from the thrift-server metastore. If this assumption is valid, what do I need to do to make it a universal metastore? Once I have fixed this issue, I will enable another node in my cluster Thanks, Subhajit
Spark DataFrame Join _ performance issues
I am running my spark (1.5.2) instance in a virtualbox VM. I have 10gb memory allocated to it. I have a fact table extract, with 1 rows var glbalance_df_select = glbalance_df.select ("LEDGER_ID","CODE_COMBINATION_ID","CURRENCY_CODE", "PERIOD_TYPE","TEMPLATE_ID", "PERIOD_NAME","ACTUAL_FLAG","BUDGET_VERSION_ID", "TRANSLATED_FLAG","PERIOD_NET_DR","PERIOD_NET_CR", "BEGIN_BALANCE_DR","BEGIN_BALANCE_CR") .filter( not(glbalance_df("CURRENCY_CODE")=== "STAT") and (glbalance_df("TEMPLATE_ID").isNull || glbalance_df("TEMPLATE_ID") === "None") and (glbalance_df("TRANSLATED_FLAG") === "Y" || glbalance_df("TRANSLATED_FLAG").isNull || glbalance_df("TRANSLATED_FLAG") === "None" ) and (glbalance_df("ACTUAL_FLAG") === "A" or glbalance_df("ACTUAL_FLAG") === "B") ) I am joining the fact table to the first dimension (with 100 rows). var glbalance_ledger_df = glbalance_df_select.join(ledger_df_select, glbalance_df_select("LEDGER_ID") <=> ledger_df_select("LEDGER_ID"), "inner" ) .drop(ledger_df_select("LEDGER_ID")) When I save the DataFrame "glbalance_ledger_df" to a textfile , it saves the data in 1 mins 2nd dimension dataframe tableName = "w_gl_period_d" var period_df_select = msc.table(s"$dbName.$tableName") period_df_select = period_df_select.select("PERIOD_NAME", "PERIOD_TYPE", "PERIOD_SET_NAME" ,"START_DATE","END_DATE" ).cache() Now I join the 2nd dimension DF to the resultant of the fact DF and save the data, it takes 2hrs. var glbalance_ledger_period_df = glbalance_ledger_df.join(period_df_select, glbalance_ledger_df("PERIOD_SET_NAME") <=> period_df_select("PERIOD_SET_NAME") && glbalance_ledger_df("PERIOD_NAME") <=> period_df_select("PERIOD_NAME") && glbalance_ledger_df("PERIOD_TYPE") <=> period_df_select("PERIOD_TYPE") , "inner" ) How do I improve the performance of the join? Thx, Subhajit
Spark Save mode "Overwrite" -Lock wait timeout exceeded; try restarting transaction Error
I am using spark 1.5.2 with Memsql Database as a persistent repository I am trying to update rows (based on the primary key), if it is appears more than 1 time (basically run the save load as a Upsert operation) val UpSertConf = SaveToMemSQLConf(msc.memSQLConf, Some(SaveMode.Overwrite), //Some(SaveMode.Append), Map( "duplicate_key_behavior" -> "Replace" //,"insertBatchSize" -> "100" ) ) When I set the SaveMode to "Overwrite", I get the following errors, bcos of object locking [cloudera@quickstart scala-2.10]$ spark-submit GLBalance-assembly-1.0.jar /home/cloudera/Downloads/cloud_code/output/FULL/2016-09-09_15-44-17 (number of records ,204919) [Stage 7:> (0 + 4) / 7]16/09/09 21:26:01 ERROR Executor: Exception in task 1.0 in stage 7.0 (TID 22) java.sql.SQLException: Leaf Error (127.0.0.1:3308): Lock wait timeout exceeded; try restarting transaction at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:996) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:870) at com.mysql.jdbc.MysqlIO.sendFileToServer(MysqlIO.java:3790) at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:2995) at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2245) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2638) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2526) at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1618) at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1549) at org.apache.commons.dbcp2.DelegatingStatement.executeUpdate(DelegatingStateme nt.java:234) at org.apache.commons.dbcp2.DelegatingStatement.executeUpdate(DelegatingStateme nt.java:234) If I change the SaveMode to "append", the load completes but all the duplicate records gets rejected. I was told that there is a known bug https://issues.apache.org/jira/browse/SPARK-13699 (edited) If there another way for me to save the data (in an Upsert mode) ? Subhs
RE: Spark 2.0 - Insert/Update to a DataFrame
So the data in the fcst dataframe is like this Product, fcst_qty A 100 B 50 Sales DF has data like this Order# Item#Sales qty 101 A 10 101 B 5 102 A 5 102 B 10 I want to update the FCSt DF data, based on Product=Item# So the resultant FCST DF should have data Product, fcst_qty A 85 B 35 Hope it helps If I join the data between the 2 DFs (based on Product# and item#), I will get a cartesion join and my result will not be what I want Thanks for your help From: Mike Metzger [mailto:m...@flexiblecreations.com] Sent: Friday, August 26, 2016 2:12 PM To: Subhajit Purkayastha <spurk...@p3si.net> Cc: user @spark <user@spark.apache.org> Subject: Re: Spark 2.0 - Insert/Update to a DataFrame Without seeing exactly what you were wanting to accomplish, it's hard to say. A Join is still probably the method I'd suggest using something like: select (FCST.quantity - SO.quantity) as quantity from FCST LEFT OUTER JOIN SO ON FCST.productid = SO.productid WHERE with specifics depending on the layout and what language you're using. Thanks Mike On Fri, Aug 26, 2016 at 3:29 PM, Subhajit Purkayastha <spurk...@p3si.net <mailto:spurk...@p3si.net> > wrote: Mike, The grains of the dataFrame are different. I need to reduce the forecast qty (which is in the FCST DF) based on the sales qty (coming from the sales order DF) Hope it helps Subhajit From: Mike Metzger [mailto:m...@flexiblecreations.com <mailto:m...@flexiblecreations.com> ] Sent: Friday, August 26, 2016 1:13 PM To: Subhajit Purkayastha <spurk...@p3si.net <mailto:spurk...@p3si.net> > Cc: user @spark <user@spark.apache.org <mailto:user@spark.apache.org> > Subject: Re: Spark 2.0 - Insert/Update to a DataFrame Without seeing the makeup of the Dataframes nor what your logic is for updating them, I'd suggest doing a join of the Forecast DF with the appropriate columns from the SalesOrder DF. Mike On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha <spurk...@p3si.net <mailto:spurk...@p3si.net> > wrote: I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need to update the Forecast Dataframe record(s), based on the SaleOrder DF record. What is the best way to achieve this functionality
RE: Spark 2.0 - Insert/Update to a DataFrame
Mike, The grains of the dataFrame are different. I need to reduce the forecast qty (which is in the FCST DF) based on the sales qty (coming from the sales order DF) Hope it helps Subhajit From: Mike Metzger [mailto:m...@flexiblecreations.com] Sent: Friday, August 26, 2016 1:13 PM To: Subhajit Purkayastha <spurk...@p3si.net> Cc: user @spark <user@spark.apache.org> Subject: Re: Spark 2.0 - Insert/Update to a DataFrame Without seeing the makeup of the Dataframes nor what your logic is for updating them, I'd suggest doing a join of the Forecast DF with the appropriate columns from the SalesOrder DF. Mike On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha <spurk...@p3si.net <mailto:spurk...@p3si.net> > wrote: I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need to update the Forecast Dataframe record(s), based on the SaleOrder DF record. What is the best way to achieve this functionality
Spark 2.0 - Insert/Update to a DataFrame
I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need to update the Forecast Dataframe record(s), based on the SaleOrder DF record. What is the best way to achieve this functionality
DataFrame Data Manipulation - Based on a timestamp column Not Working
Using spark 2.0 & scala 2.11.8, I have a DataFrame with a timestamp column root |-- ORG_ID: integer (nullable = true) |-- HEADER_ID: integer (nullable = true) |-- ORDER_NUMBER: integer (nullable = true) |-- LINE_ID: integer (nullable = true) |-- LINE_NUMBER: integer (nullable = true) |-- ITEM_TYPE_CODE: string (nullable = true) |-- ORGANIZATION_ID: integer (nullable = true) |-- INVENTORY_ITEM_ID: integer (nullable = true) |-- SCHEDULE_SHIP_DATE: timestamp (nullable = true) |-- ORDER_QUANTITY_UOM: string (nullable = true) |-- UNIT_SELLING_PRICE: double (nullable = true) |-- OPEN_QUANTITY: double (nullable = true) [204,94468,56721,197328,1,STANDARD,207,149,2004-01-08 23:59:59.0,Ea,1599.0,28.0] [204,94468,56721,197331,2,STANDARD,207,151,2004-01-08 23:59:59.0,Ea,1899.05,40.0] [204,94468,56721,197332,3,STANDARD,207,436,2004-01-08 23:59:59.0,Ea,300.0,24.0] [204,94468,56721,197335,4,STANDARD,207,3751,2004-01-08 23:59:59.0,Ea,380.0,24.0] I want to manipulate the dataframe data based on a parameter = demand_time_fence_date var demand_timefence_end_date_instance = new MutableDateTime(planning_start_date) var demand_timefence_days = demand_timefence_end_date_instance.addDays(demand_time_fence) val demand_timefence_end_date = ISODateTimeFormat.yearMonthDay().print(demand_timefence_end_date_instance) var filter_stmt = "from_unixtime(SCHEDULE_SHIP_DATE,'-MM-dd') >= "+ demand_timefence_end_date val sales_order_dataFrame = sales_order_base_dataFrame.filter(filter_stmt).limit(10) What is the correct syntax to pass the parameter value? The above filter statement is not working to restrict the dataset Thanks, Subhajit
Spark 2.0 - Join statement compile error
All, I have the following dataFrames and the temp table. I am trying to create a new DF , the following statement is not compiling val df = sales_demand.join(product_master,(sales_demand.INVENTORY_ITEM_ID==product_ma ster.INVENTORY_ITEM_ID),joinType="inner") What am I doing wrong? ==Code=== var sales_order_sql_stmt = s"""SELECT ORDER_NUMBER , INVENTORY_ITEM_ID, ORGANIZATION_ID, from_unixtime(unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd'), '-MM-dd') AS schedule_date FROM sales_order_demand WHERE unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd') >= $planning_start_date limit 10""" val sales_demand = spark.sql (sales_order_sql_stmt) //print the data sales_demand.collect().foreach { println } val product_sql_stmt = "select SEGMENT1,INVENTORY_ITEM_ID,ORGANIZATION_ID from product limit 10" val product_master = spark.sql (product_sql_stmt) //print the data product_master.collect().foreach { println } val df = sales_demand.join(product_master,(sales_demand.INVENTORY_ITEM_ID==product_ma ster.INVENTORY_ITEM_ID),joinType="inner") spark.stop()
Getting error, when I do df.show()
I am getting this error in the spark-shell when I do . Which jar file I need to download to fix this error? Df.show() Error scala> val df = msc.sql(query) df: org.apache.spark.sql.DataFrame = [id: int, name: string] scala> df.show() java.lang.NoClassDefFoundError: spray/json/JsonReader at com.memsql.spark.pushdown.MemSQLPhysicalRDD$.fromAbstractQueryTree(MemSQLPhy sicalRDD.scala:95) at com.memsql.spark.pushdown.MemSQLPushdownStrategy.apply(MemSQLPushdownStrateg y.scala:49) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPl anner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPl anner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala: 59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.s cala:54) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkSt rategies.scala:374) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPl anner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPl anner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala: 59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLConte xt.scala:926) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:92 4) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLCo ntext.scala:930) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala :930) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution .scala:53) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:401) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:362) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:370) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC$$iwC.(:48) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC.(:53) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC.(:55) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.< init>(:57) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC. (:59) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61)
Configure Spark to run with MemSQL DB Cluster
All, Is it possible to integrate spark 1.6.1 with MemSQL Cluster? Any pointers on how to start with the project will be appreciated. Thx, Subhajit
Saprk 1.5 - How to join 3 RDDs in a SQL DF?
Can I join 3 different RDDs together in a Spark SQL DF? I can find examples for 2 RDDs but not 3. Thanks
Error - Calling a package (com.databricks:spark-csv_2.10:1.0.3) with spark-submit
I am on spark 1.3.1 When I do the following with spark-shell, it works spark-shell --packages com.databricks:spark-csv_2.10:1.0.3 Then I can create a DF using the spark-csv package import sqlContext.implicits._ import org.apache.spark.sql._ // Return the dataset specified by data source as a DataFrame, use the header for column names val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "sfpd.csv", "header" -> "true")) Now, I want to do the above as part of my package using spark-submit spark-submit --class "ServiceProject" --master local[4] --packages com.databricks:spark-csv_2.10:1.0.3 target/scala-2.10/serviceproject_2.10-1.0.jar Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars :: loading settings :: url = jar:file:/usr/hdp/2.3.0.0-2557/spark/lib/spark-assembly-1.3.1.2.3.0.0-2557-h adoop2.7.1.2.3.0.0-2557.jar!/org/apache/ivy/core/settings/ivysettings.xml com.databricks#spark-csv_2.10 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.databricks#spark-csv_2.10;1.0.3 in central found org.apache.commons#commons-csv;1.1 in list :: resolution report :: resolve 366ms :: artifacts dl 11ms :: modules in use: com.databricks#spark-csv_2.10;1.0.3 from central in [default] org.apache.commons#commons-csv;1.1 from list in [default] - | |modules|| artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 2 | 0 | 0 | 0 || 2 | 0 | - :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 2 already retrieved (0kB/19ms) Error: application failed with exception java.lang.ArrayIndexOutOfBoundsException: 0 at ServiceProject$.main(ServiceProject.scala:29) at ServiceProject.main(ServiceProject.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl .java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ru nMain(SparkSubmit.scala:577) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) What am I doing wrong? Subhajit