New to spark 2.2.1 - Problem with finding tables between different metastore db

2018-02-06 Thread Subhajit Purkayastha
All,

 

I am new to Spark 2.2.1.  I have a single node cluster and also have enabled
thriftserver for my Tableau application to connect to my persisted table. 

 

I feel that the spark cluster metastore is different from the thrift-server
metastore. If this assumption is valid, what do I need to do to make it a
universal metastore?

 

Once I have fixed this issue, I will enable another node in my cluster

 

Thanks,

 

Subhajit 

 



Spark DataFrame Join _ performance issues

2016-09-19 Thread Subhajit Purkayastha
I am running my spark (1.5.2) instance in a virtualbox VM. I have 10gb
memory allocated to it.

 

I have a fact table extract, with 1 rows

 

var glbalance_df_select = glbalance_df.select
("LEDGER_ID","CODE_COMBINATION_ID","CURRENCY_CODE",

 
"PERIOD_TYPE","TEMPLATE_ID",

 
"PERIOD_NAME","ACTUAL_FLAG","BUDGET_VERSION_ID",

 
"TRANSLATED_FLAG","PERIOD_NET_DR","PERIOD_NET_CR",

 
"BEGIN_BALANCE_DR","BEGIN_BALANCE_CR")


   .filter( 

 
not(glbalance_df("CURRENCY_CODE")=== "STAT") 

   and 

 
(glbalance_df("TEMPLATE_ID").isNull || glbalance_df("TEMPLATE_ID") ===
"None")

   and

 
(glbalance_df("TRANSLATED_FLAG") === "Y" ||
glbalance_df("TRANSLATED_FLAG").isNull || glbalance_df("TRANSLATED_FLAG")
=== "None" )

   and

 
(glbalance_df("ACTUAL_FLAG") === "A" or glbalance_df("ACTUAL_FLAG") === "B")

   )

 

 

I am joining the fact table to  the first dimension (with 100 rows). 

 

var glbalance_ledger_df = glbalance_df_select.join(ledger_df_select, 

  glbalance_df_select("LEDGER_ID") <=>
ledger_df_select("LEDGER_ID"),

 "inner" )

 .drop(ledger_df_select("LEDGER_ID"))

 

When I save the DataFrame "glbalance_ledger_df" to a textfile , it saves the
data in 1 mins

 

2nd dimension dataframe 

 

tableName = "w_gl_period_d"

var period_df_select = msc.table(s"$dbName.$tableName")

period_df_select = period_df_select.select("PERIOD_NAME",
"PERIOD_TYPE",

"PERIOD_SET_NAME"
,"START_DATE","END_DATE" ).cache()

 

Now I join the 2nd dimension DF to the resultant of the fact DF and save the
data, it takes 2hrs. 

 

var glbalance_ledger_period_df = glbalance_ledger_df.join(period_df_select,


  glbalance_ledger_df("PERIOD_SET_NAME") <=>
period_df_select("PERIOD_SET_NAME")

  && glbalance_ledger_df("PERIOD_NAME") <=>
period_df_select("PERIOD_NAME")

  && glbalance_ledger_df("PERIOD_TYPE") <=>
period_df_select("PERIOD_TYPE")

  ,

 "inner" )

 

 

How do I improve the performance of the join?

 

Thx,

 

Subhajit



Spark Save mode "Overwrite" -Lock wait timeout exceeded; try restarting transaction Error

2016-09-11 Thread Subhajit Purkayastha
I am using spark 1.5.2 with Memsql Database as a persistent repository

 

I am trying to update rows (based on the primary key), if it is appears more
than 1 time (basically run the save load as a Upsert operation)

 

val UpSertConf = SaveToMemSQLConf(msc.memSQLConf, 

 Some(SaveMode.Overwrite),

 //Some(SaveMode.Append),

 Map(

 "duplicate_key_behavior" ->
"Replace"

 //,"insertBatchSize" -> "100"

 

 )

)

 

 

When I set the SaveMode to "Overwrite", I get the following errors, bcos of
object locking

 

 

[cloudera@quickstart scala-2.10]$ spark-submit GLBalance-assembly-1.0.jar
/home/cloudera/Downloads/cloud_code/output/FULL/2016-09-09_15-44-17

(number of records ,204919)


[Stage 7:>  (0 + 4)
/ 7]16/09/09 21:26:01 ERROR Executor: Exception in task 1.0 in stage 7.0
(TID 22)

java.sql.SQLException: Leaf Error (127.0.0.1:3308): Lock wait timeout
exceeded; try restarting transaction

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:996)

at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887)

at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)

at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:870)

at com.mysql.jdbc.MysqlIO.sendFileToServer(MysqlIO.java:3790)

at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:2995)

at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2245)

at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2638)

at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2526)

at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1618)

at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1549)

at
org.apache.commons.dbcp2.DelegatingStatement.executeUpdate(DelegatingStateme
nt.java:234)

at
org.apache.commons.dbcp2.DelegatingStatement.executeUpdate(DelegatingStateme
nt.java:234)

 

 

If I change the SaveMode to "append", the load completes but all the
duplicate records gets rejected. I was told that there is a known bug 

 

https://issues.apache.org/jira/browse/SPARK-13699 (edited)

 

If there another way for me to save the data (in an Upsert mode) ?

 

Subhs



RE: Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Subhajit Purkayastha
So the data in the fcst dataframe is like this

 

Product, fcst_qty

A 100

B 50

 

Sales DF has data like this

 

Order# Item#Sales qty

101 A 10

101 B 5

102 A 5

102 B 10

 

I want to update the FCSt DF data, based on Product=Item#

 

So the resultant FCST DF should have data

Product, fcst_qty

A 85

B 35

 

Hope it helps

 

If I join the data between the 2 DFs (based on Product# and item#), I will get 
a cartesion join and my result will not be what I want

 

Thanks for your help

 

 

From: Mike Metzger [mailto:m...@flexiblecreations.com] 
Sent: Friday, August 26, 2016 2:12 PM
To: Subhajit Purkayastha <spurk...@p3si.net>
Cc: user @spark <user@spark.apache.org>
Subject: Re: Spark 2.0 - Insert/Update to a DataFrame

 

Without seeing exactly what you were wanting to accomplish, it's hard to say.  
A Join is still probably the method I'd suggest using something like:

 

select (FCST.quantity - SO.quantity) as quantity



from FCST

LEFT OUTER JOIN

SO ON FCST.productid = SO.productid

WHERE



 

with specifics depending on the layout and what language you're using.

 

Thanks

 

Mike

 

On Fri, Aug 26, 2016 at 3:29 PM, Subhajit Purkayastha <spurk...@p3si.net 
<mailto:spurk...@p3si.net> > wrote:

Mike,

 

The grains of the dataFrame are different.

 

I need to reduce the forecast qty (which is in the FCST DF)  based on the sales 
qty (coming from the sales  order DF)

 

Hope it helps

 

Subhajit

 

From: Mike Metzger [mailto:m...@flexiblecreations.com 
<mailto:m...@flexiblecreations.com> ] 
Sent: Friday, August 26, 2016 1:13 PM
To: Subhajit Purkayastha <spurk...@p3si.net <mailto:spurk...@p3si.net> >
Cc: user @spark <user@spark.apache.org <mailto:user@spark.apache.org> >
Subject: Re: Spark 2.0 - Insert/Update to a DataFrame

 

Without seeing the makeup of the Dataframes nor what your logic is for updating 
them, I'd suggest doing a join of the Forecast DF with the appropriate columns 
from the SalesOrder DF.  

 

Mike

 

On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha <spurk...@p3si.net 
<mailto:spurk...@p3si.net> > wrote:

I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need to 
update the Forecast Dataframe record(s), based on the SaleOrder DF record. What 
is the best way to achieve this functionality

 

 



RE: Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Subhajit Purkayastha
Mike,

 

The grains of the dataFrame are different.

 

I need to reduce the forecast qty (which is in the FCST DF)  based on the sales 
qty (coming from the sales  order DF)

 

Hope it helps

 

Subhajit

 

From: Mike Metzger [mailto:m...@flexiblecreations.com] 
Sent: Friday, August 26, 2016 1:13 PM
To: Subhajit Purkayastha <spurk...@p3si.net>
Cc: user @spark <user@spark.apache.org>
Subject: Re: Spark 2.0 - Insert/Update to a DataFrame

 

Without seeing the makeup of the Dataframes nor what your logic is for updating 
them, I'd suggest doing a join of the Forecast DF with the appropriate columns 
from the SalesOrder DF.  

 

Mike

 

On Fri, Aug 26, 2016 at 11:53 AM, Subhajit Purkayastha <spurk...@p3si.net 
<mailto:spurk...@p3si.net> > wrote:

I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need to 
update the Forecast Dataframe record(s), based on the SaleOrder DF record. What 
is the best way to achieve this functionality

 



Spark 2.0 - Insert/Update to a DataFrame

2016-08-26 Thread Subhajit Purkayastha
I am using spark 2.0, have 2 DataFrames, SalesOrder and Forecast. I need to
update the Forecast Dataframe record(s), based on the SaleOrder DF record.
What is the best way to achieve this functionality



DataFrame Data Manipulation - Based on a timestamp column Not Working

2016-08-23 Thread Subhajit Purkayastha
Using spark 2.0  & scala 2.11.8, I have a DataFrame with a timestamp column

 

root

|-- ORG_ID: integer (nullable = true)

|-- HEADER_ID: integer (nullable = true)

|-- ORDER_NUMBER: integer (nullable = true)

|-- LINE_ID: integer (nullable = true)

|-- LINE_NUMBER: integer (nullable = true)

|-- ITEM_TYPE_CODE: string (nullable = true)

|-- ORGANIZATION_ID: integer (nullable = true)

|-- INVENTORY_ITEM_ID: integer (nullable = true)

|-- SCHEDULE_SHIP_DATE: timestamp (nullable = true)

|-- ORDER_QUANTITY_UOM: string (nullable = true)

|-- UNIT_SELLING_PRICE: double (nullable = true)

|-- OPEN_QUANTITY: double (nullable = true)

 

[204,94468,56721,197328,1,STANDARD,207,149,2004-01-08
23:59:59.0,Ea,1599.0,28.0]

[204,94468,56721,197331,2,STANDARD,207,151,2004-01-08
23:59:59.0,Ea,1899.05,40.0]

[204,94468,56721,197332,3,STANDARD,207,436,2004-01-08
23:59:59.0,Ea,300.0,24.0]

[204,94468,56721,197335,4,STANDARD,207,3751,2004-01-08
23:59:59.0,Ea,380.0,24.0]

 

I want to manipulate the dataframe data based on a parameter =
demand_time_fence_date

 

var demand_timefence_end_date_instance = new
MutableDateTime(planning_start_date)

var demand_timefence_days =
demand_timefence_end_date_instance.addDays(demand_time_fence)

val demand_timefence_end_date =
ISODateTimeFormat.yearMonthDay().print(demand_timefence_end_date_instance)

 

var filter_stmt = "from_unixtime(SCHEDULE_SHIP_DATE,'-MM-dd') >= "+
demand_timefence_end_date  

 

val sales_order_dataFrame =
sales_order_base_dataFrame.filter(filter_stmt).limit(10)

 

What is the correct syntax to pass the parameter value? 

 

The above filter statement is not working to restrict the dataset

 

Thanks,

 

Subhajit

 

 



Spark 2.0 - Join statement compile error

2016-08-22 Thread Subhajit Purkayastha
All,

 

I have the following dataFrames and the temp table. 

 

I am trying to create a new DF , the following statement is not compiling

 

val df =
sales_demand.join(product_master,(sales_demand.INVENTORY_ITEM_ID==product_ma
ster.INVENTORY_ITEM_ID),joinType="inner")



 



 

What am I doing wrong?

 

==Code===

 

var sales_order_sql_stmt = s"""SELECT ORDER_NUMBER , INVENTORY_ITEM_ID,
ORGANIZATION_ID,

  from_unixtime(unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd'),
'-MM-dd') AS schedule_date

  FROM sales_order_demand 

  WHERE unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd') >=
$planning_start_date  limit 10"""

  

val sales_demand = spark.sql (sales_order_sql_stmt)



//print the data

sales_demand.collect().foreach { println }





val product_sql_stmt = "select
SEGMENT1,INVENTORY_ITEM_ID,ORGANIZATION_ID from product limit 10"

val product_master = spark.sql (product_sql_stmt)



//print the data

product_master.collect().foreach { println }

  

val df =
sales_demand.join(product_master,(sales_demand.INVENTORY_ITEM_ID==product_ma
ster.INVENTORY_ITEM_ID),joinType="inner")



 

 

   spark.stop()



Getting error, when I do df.show()

2016-08-01 Thread Subhajit Purkayastha
I am getting this error in the spark-shell when I do . Which jar file I need
to download to fix this error?

 

Df.show()

 

Error

 

scala> val df = msc.sql(query)

df: org.apache.spark.sql.DataFrame = [id: int, name: string]

 

scala> df.show()

java.lang.NoClassDefFoundError: spray/json/JsonReader

at
com.memsql.spark.pushdown.MemSQLPhysicalRDD$.fromAbstractQueryTree(MemSQLPhy
sicalRDD.scala:95)

at
com.memsql.spark.pushdown.MemSQLPushdownStrategy.apply(MemSQLPushdownStrateg
y.scala:49)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPl
anner.scala:58)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPl
anner.scala:58)

at
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:
59)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.s
cala:54)

at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkSt
rategies.scala:374)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPl
anner.scala:58)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPl
anner.scala:58)

at
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:
59)

at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLConte
xt.scala:926)

at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:92
4)

at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLCo
ntext.scala:930)

at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala
:930)

at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution
.scala:53)

at
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)

at
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)

at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)

at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377)

at
org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)

at org.apache.spark.sql.DataFrame.show(DataFrame.scala:401)

at org.apache.spark.sql.DataFrame.show(DataFrame.scala:362)

at org.apache.spark.sql.DataFrame.show(DataFrame.scala:370)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
iwC$$iwC$$iwC.(:48)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
iwC$$iwC.(:53)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
iwC.(:55)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<
init>(:57)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.
(:59)

at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61)

 

 



Configure Spark to run with MemSQL DB Cluster

2016-07-26 Thread Subhajit Purkayastha
All,

 

Is it possible to integrate spark 1.6.1 with MemSQL Cluster? Any pointers on
how to start with the project will be appreciated.

 

Thx,

 

Subhajit



Saprk 1.5 - How to join 3 RDDs in a SQL DF?

2015-10-11 Thread Subhajit Purkayastha
Can I join 3 different RDDs together in a Spark SQL DF? I can find examples
for 2 RDDs but not 3.

 

Thanks

 



Error - Calling a package (com.databricks:spark-csv_2.10:1.0.3) with spark-submit

2015-09-11 Thread Subhajit Purkayastha
 

I am on spark 1.3.1

 

When I do the following with spark-shell, it works

 

spark-shell --packages com.databricks:spark-csv_2.10:1.0.3

 

Then I can create a DF using the spark-csv package

import sqlContext.implicits._

import org.apache.spark.sql._

 

//  Return the dataset specified by data source as a DataFrame, use the
header for column names

val df = sqlContext.load("com.databricks.spark.csv", Map("path" ->
"sfpd.csv", "header" -> "true"))

 

Now, I want to do the above as part of my package using spark-submit

spark-submit   --class "ServiceProject"   --master local[4]   --packages
com.databricks:spark-csv_2.10:1.0.3
target/scala-2.10/serviceproject_2.10-1.0.jar

 

Ivy Default Cache set to: /root/.ivy2/cache

The jars for the packages stored in: /root/.ivy2/jars

:: loading settings :: url =
jar:file:/usr/hdp/2.3.0.0-2557/spark/lib/spark-assembly-1.3.1.2.3.0.0-2557-h
adoop2.7.1.2.3.0.0-2557.jar!/org/apache/ivy/core/settings/ivysettings.xml

com.databricks#spark-csv_2.10 added as a dependency

:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0

confs: [default]

found com.databricks#spark-csv_2.10;1.0.3 in central

found org.apache.commons#commons-csv;1.1 in list

:: resolution report :: resolve 366ms :: artifacts dl 11ms

:: modules in use:

com.databricks#spark-csv_2.10;1.0.3 from central in [default]

org.apache.commons#commons-csv;1.1 from list in [default]

 
-

|  |modules||   artifacts
|

|   conf   | number| search|dwnlded|evicted||
number|dwnlded|

 
-

|  default |   2   |   0   |   0   |   0   ||   2   |   0
|

 
-

:: retrieving :: org.apache.spark#spark-submit-parent

confs: [default]

0 artifacts copied, 2 already retrieved (0kB/19ms)

Error: application failed with exception

java.lang.ArrayIndexOutOfBoundsException: 0

at ServiceProject$.main(ServiceProject.scala:29)

at ServiceProject.main(ServiceProject.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57
)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ru
nMain(SparkSubmit.scala:577)

at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174)

at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 

 

 

What am I doing wrong?

 

Subhajit