Re: Scheduling Time > Processing Time

2021-06-20 Thread Mohamadreza Rostami
Hi,
I think it’s because of locality time out. In streaming tasks you must decrease 
the locality time out. 


Sent from my iPhone

> On Jun 20, 2021, at 11:55 PM, Siva Tarun Ponnada  wrote:
> 
> 
> Hi Team,
>  I have a spark streaming job which I am running in a single node 
> cluster. I often see the schedulingTime > Processing Time in streaming 
> statistics after a few minutes of my application startup. What does that 
> mean? Should I increase the no:of receivers? 
> 
> 
> 
> Regards
> Taun

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Scheduling Time > Processing Time

2021-06-20 Thread Siva Tarun Ponnada
Hi Team,
 I have a spark streaming job which I am running in a single node
cluster. I often see the schedulingTime > Processing Time in streaming
statistics after a few minutes of my application startup. What does that
mean? Should I increase the no:of receivers?



Regards
Taun


Re: Unsubscribe

2021-06-20 Thread Alun ap Rhisiart
Guys, can we please stop asking other list members to unsubscribe you? You have 
to unsubscribe yourself from mailing lists. The address is actually in the 
header for every email you get from a list, but in case you miss it, it is this 
line:
List-Unsubscribe: 

If you are or a make, this actually puts an ‘unsubscribe’ button on the message.

> On 20 Jun 2021, at 17:34, yiling  wrote:
> 
>  
>  
> From: gvss [mailto:srikanthg...@gmail.com ] 
> Sent: Sunday, June 20, 2021 10:16 PM
> To: Arnaud Wolf
> Cc: user@spark.apache.org 
> Subject: Re: Unsubscribe
>  
>  
> On Sun, 20 Jun 2021, 19:44 Arnaud Wolf,  > wrote:




RE: Unsubscribe

2021-06-20 Thread yiling
 

 

From: gvss [mailto:srikanthg...@gmail.com] 
Sent: Sunday, June 20, 2021 10:16 PM
To: Arnaud Wolf
Cc: user@spark.apache.org
Subject: Re: Unsubscribe

 

 

On Sun, 20 Jun 2021, 19:44 Arnaud Wolf,  wrote:



Re: Unsubscribe

2021-06-20 Thread gvss
On Sun, 20 Jun 2021, 19:44 Arnaud Wolf,  wrote:

>


Unsubscribe

2021-06-20 Thread Arnaud Wolf



Re: Insert into table with one the value is derived from DB function using spark

2021-06-20 Thread Mich Talebzadeh
Actually I found a solution to this issue

*Challenge*

Insert data from Spark dataframe when one or more columns in theOracle
table rely on some derived_colums dependent on data in one or more
dataframe columns.

Standard JDBC from Spark to Oracle does batch insert of dataframe into
Oracle *so it cannot handle these derived columns*. Refer below

*dataFrame.* \
write. \
format("jdbc"). \
option("url", url of Oracle). \
*option("dbtable", schema.tableName)*. \
option("user", user). \
option("password", password). \
option("driver", Oracle driver). \
mode(mode). \
*save()*

This writes the whole content of the dataframe to the Oracle table. Cannot
replace  schema.tableName  with INSERT statement

*Possible solution*


   1. Need a cursor based solution. Create a cursor from Spark dataframe.
   So we can walk through every row and get the value of each column from the
   dataframe
   2. Oracle provides the cx_Oracle package.  cx_Oracle
    is a Python extension
   module that enables access to Oracle Database. It conforms to the Python
   database API 2.0 specification
    with a
   considerable number of additions and a couple of exclusions. It is
   maintained by Oracle.
   3. Using cx_Oracle we should be able to create a Connection type to
   Oracle and use Connection.cursor() to deal with rows. See below


This is an example

Create connection to Oracle. Need to install cx_oracle package in PySpark


import cx_Oracle

def loadIntoOracleTableWithCursor(self, df):
  # set Oracle details
  tableName = "randomdata"
fullyQualifiedTableName =
self.config['OracleVariables']['dbschema']+'.'+tableName
user = self.config['OracleVariables']['oracle_user']
password = self.config['OracleVariables']['oracle_password']
serverName = self.config['OracleVariables']['oracleHost']
port = self.config['OracleVariables']['oraclePort']
serviceName = self.config['OracleVariables']['serviceName']
dsn_tns = cx_Oracle.makedsn(serverName, port,
service_name=serviceName)
# create connection conn
conn = cx_Oracle.connect(user, password, dsn_tns)
cursor = conn.cursor()
# df is the dataframe containing the data. Let us build a cursor on
it.

   for row in df.rdd.collect():
# get individual column values from the dataframe
id = row[0]
clustered = row[1]
scattered = row[2]
randomised = row[3]
random_string = row[4]
small_vc = row[5]
padding = row[6]
# Build INSERT/SELECT statement to be executed in Oracle. This
is what we are sending for every row to the Oracle table. Oracle table has
a column called *derived_col *that dataframe does not have it.
  #  That is the one that is derived from some value on
the dataframe column(s). For example here I assign *derived_col = cos(id)*
and pass it in sqlText. You need {} to pass the value and enclose i single
quotes
   #  if the column is character type
sqlText = f"""insert into {fullyQualifiedTableName}
(id,clustered,scattered,randomised,random_string,small_vc,padding,
*derived_col)*
  values
({id},{clustered},{scattered},{randomised},'{random_string}','{small_vc}','{padding}',
*cos({id*}))"""
print(sqlText)
cursor.execute(sqlText)
conn.commit()

Our dataframe has 10 rows and id in Oracle table has been made the primary
key


scratch...@orasource.mich.LOCAL> CREATE TABLE scratchpad.randomdata
  2  (
  3  "ID" NUMBER(*,0),
  4  "CLUSTERED" NUMBER(*,0),
  5  "SCATTERED" NUMBER(*,0),
  6  "RANDOMISED" NUMBER(*,0),
  7  "RANDOM_STRING" VARCHAR2(50 BYTE),
  8  "SMALL_VC" VARCHAR2(50 BYTE),
  9  "PADDING" VARCHAR2(4000 BYTE),
 10  "DERIVED_COL" FLOAT(126)
 11  );

Table created.
scratch...@orasource.mich.LOCAL> ALTER TABLE scratchpad.randomdata ADD
CONSTRAINT randomdata_PK PRIMARY KEY (ID);
Table altered.

Run it and see the output of  print(sqlText)

insert into SCRATCHPAD.randomdata
(id,clustered,scattered,randomised,random_string,small_vc,padding,derived_col)
  values
(1,0.0,0.0,2.0,'KZWeqhFWCEPyYngFbyBMWXaSCrUZoLgubbbPIayRnBUbHoWCFJ','

 
1','xxx',cos(1))

This works fine. It creates the rows and does a commit


In Oracle confirm those 10 rows added starting with id = 1


scratch...@orasource.mich.LOCAL> select count(1) from scratchpad.randomdata;

  COUNT(1)