Re: Avoiding collect but use foreach

2019-02-04 Thread
hi,
I think you can make your python code into an udf and call udf in
foreachpartition.

Aakash Basu  于2019年2月1日周五 下午3:37写道:

> Hi,
>
> This:
>
>
> *to_list = [list(row) for row in df.collect()]*
>
>
> Gives:
>
>
> [[5, 1, 1, 1, 2, 1, 3, 1, 1, 0], [5, 4, 4, 5, 7, 10, 3, 2, 1, 0], [3, 1,
> 1, 1, 2, 2, 3, 1, 1, 0], [6, 8, 8, 1, 3, 4, 3, 7, 1, 0], [4, 1, 1, 3, 2, 1,
> 3, 1, 1, 0]]
>
>
> I want to avoid collect operation, but still convert the dataframe to a
> python list of list just as above for downstream operations.
>
>
> Is there a way, I can do it, maybe a better performant code that using
> collect?
>
>
> Thanks,
>
> Aakash.
>


Re: Connect to postgresql with pyspark

2018-04-30 Thread
hi,
what's the problem you are facing ?

2018-04-30 6:15 GMT+08:00 dimitris plakas :

> I am new in pyspark and i am learning it in order to complete my Thesis
> project in university.
>
>
>
> I am trying to create a dataframe by reading from a postgresql database
> table, but i am facing a problem when i try to connect my pyspark
> application with postgresql db server. Could you please explain me the
> steps that are required in order to have a successfull  connection with the
> database? I am using python 2.7, spark-2.3.0-bin-hadoop2.7,  pycharm IDE
> and windows environmen.
>
>
>
> What i have done is that i have launched a pyspark shell with --jars /path
> to postgresql jar/ and the
>
> df = 
> sqlContext.read.jdbc(url='jdbc:postgresql://localhost:port/[database]?user='username'='paswd',
>  table='table name')
>
>
>
>
>
> Sent from Mail  for
> Windows 10
>
>
>


Re: spark dataframe jdbc Amazon RDS problem

2017-08-26 Thread
my code is here:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
mysql_jdbc_url = 'mydb/test'
table = "test"
props = {"user": "myname", "password": 'mypassword'}

df = spark.read.jdbc(mysql_jdbc_url,table,properties=props)
df.printSchema()
wtf = df.collect()
for i in wtf:print i

2017-08-27 1:00 GMT+08:00 刘虓 <ipf...@gmail.com>:

> hi,all
> I came across this problem yesterday:
> I was using data frame to read from a amazon rds mysql table ,and this
> exception came up:
>
> java.sql.SQLException: Invalid value for getLong() - 'id'
>
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964)
>
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:897)
>
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:886)
>
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860)
>
> at com.mysql.jdbc.ResultSetImpl.getLong(ResultSetImpl.java:2688)
>
> at com.mysql.jdbc.ResultSetImpl.getLong(ResultSetImpl.java:2650)
>
> at org.apache.spark.sql.execution.datasources.jdbc.
> JDBCRDD$$anon$1.getNext(JDBCRDD.scala:447)
>
> at org.apache.spark.sql.execution.datasources.jdbc.
> JDBCRDD$$anon$1.hasNext(JDBCRDD.scala:544)
>
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown Source)
>
> at org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
>
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>
> at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.
> hasNext(SerDeUtil.scala:117)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>
> at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.
> foreach(SerDeUtil.scala:112)
>
> at scala.collection.generic.Growable$class.$plus$plus$eq(
> Growable.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(
> ArrayBuffer.scala:104)
>
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(
> ArrayBuffer.scala:48)
>
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>
> at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.
> to(SerDeUtil.scala:112)
>
> at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.
> scala:302)
>
> at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.
> toBuffer(SerDeUtil.scala:112)
>
> at scala.collection.TraversableOnce$class.toArray(
> TraversableOnce.scala:289)
>
> at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.
> toArray(SerDeUtil.scala:112)
>
> at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.
> apply(RDD.scala:912)
>
> at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.
> apply(RDD.scala:912)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1916)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1916)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:722)
>
>
> obviously there seems to be a column name 'a' in the results.
>
> Have anybody seen this before?
>


spark dataframe jdbc Amazon RDS problem

2017-08-26 Thread
hi,all
I came across this problem yesterday:
I was using data frame to read from a amazon rds mysql table ,and this
exception came up:

java.sql.SQLException: Invalid value for getLong() - 'id'

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964)

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:897)

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:886)

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860)

at com.mysql.jdbc.ResultSetImpl.getLong(ResultSetImpl.java:2688)

at com.mysql.jdbc.ResultSetImpl.getLong(ResultSetImpl.java:2650)

at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.getNext(JDBCRDD.scala:447)

at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.hasNext(JDBCRDD.scala:544)

at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)

at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)

at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)

at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)

at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)

at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)

at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)

at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)

at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)

at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)

at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)

at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912)

at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)

at org.apache.spark.scheduler.Task.run(Task.scala:86)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:722)


obviously there seems to be a column name 'a' in the results.

Have anybody seen this before?


Re: Flatten JSON to multiple columns in Spark

2017-07-18 Thread
Hi,
have you tried to use explode?

Chetan Khatri 于2017年7月18日 周二下午2:06写道:

> Hello Spark Dev's,
>
> Can you please guide me, how to flatten JSON to multiple columns in Spark.
>
> *Example:*
>
> Sr No Title ISBN Info
> 1 Calculus Theory 1234567890 [{"cert":[{
> "authSbmtr":"009415da-c8cd-418d-869e-0a19601d79fa",
> 009415da-c8cd-418d-869e-0a19601d79fa
> "certUUID":"03ea5a1a-5530-4fa3-8871-9d1ebac627c4",
> "effDt":"2016-05-06T15:04:56.279Z",
> "fileFmt":"rjrCsv","status":"live"}],
>
> "expdCnt":"15",
> "mfgAcctNum":"531093",
> "oUUID":"23d07397-4fbe-4897-8a18-b79c9f64726c",
> "pgmRole":["RETAILER"],
> "pgmUUID":"1cb5dd63-817a-45bc-a15c-5660e4accd63",
> "regUUID":"cc1bd898-657d-40dc-af5d-4bf1569a1cc4",
> "rtlrsSbmtd":["009415da-c8cd-418d-869e-0a19601d79fa"]}]
>
> I want to get single row with 11 columns.
>
> Thanks.
>


Re: Scala Vs Python

2016-09-06 Thread
Hi,
I have been using spark-sql with python for more than one year from ver
1.5.0 to ver 2.0.0,
It works great so far,the performance is always great,though I have not
done the benchmark yet.
also I have skimmed through source code of python api,most of it only calls
scala api,nothing heavily is done using python.


2016-09-06 18:38 GMT+08:00 Leonard Cohen <3498363...@qq.com>:

> hi spark user,
>
> IMHO, I will use the language for application aligning with the language
> under which the system designed.
>
> If working on Spark, I choose Scala.
> If working on Hadoop, I choose Java.
> If working on nothing, I use Python.
> Why?
> Because it will save my life, just kidding.
>
>
> Best regards,
> Leonard
> -- Original --
> *From: * "Luciano Resende";;
> *Send time:* Tuesday, Sep 6, 2016 8:07 AM
> *To:* "darren";
> *Cc:* "Mich Talebzadeh"; "Jakob Odersky"<
> ja...@odersky.com>; "ayan guha"; "kant kodali"<
> kanth...@gmail.com>; "AssafMendelson"; "user"<
> user@spark.apache.org>;
> *Subject: * Re: Scala Vs Python
>
>
>
> On Thu, Sep 1, 2016 at 3:15 PM, darren  wrote:
>
>> This topic is a concern for us as well. In the data science world no one
>> uses native scala or java by choice. It's R and Python. And python is
>> growing. Yet in spark, python is 3rd in line for feature support, if at all.
>>
>> This is why we have decoupled from spark in our project. It's really
>> unfortunate spark team have invested so heavily in scale.
>>
>> As for speed it comes from horizontal scaling and throughout. When you
>> can scale outward, individual VM performance is less an issue. Basic HPC
>> principles.
>>
>>
> You could still try to get best of the both worlds, having your data
> scientists writing their algorithms using Python and/or R and have a
> compiler/optimizer handling the optimizations to run in a distributed
> fashion in a spark cluster leveraging some of the low level apis written in
> java/scala. Take a look at Apache SystemML http://systemml.apache.org/
> for more details.
>
>
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>


Re: Why does spark take so much time for simple task without calculation?

2016-09-04 Thread
Hi,
I think you can refer to spark history server to figure out how the time
was spent.

2016-09-05 10:36 GMT+08:00 xiefeng :

> The spark context will be reused, so the spark context initialization won't
> affect the throughput test.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Why-does-spark-take-so-much-time-
> for-simple-task-without-calculation-tp27628p27657.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


spark-sql jdbc dataframe mysql data type issue

2016-06-25 Thread
Hi,
I came across this strange behavior of Apache Spark 1.6.1:
when I was reading mysql table into spark dataframe ,a column of data type
float got mapped into double.

dataframe schema:

root

 |-- id: long (nullable = true)

 |-- ctime: double (nullable = true)

 |-- atime: double (nullable = true)

mysql schema:

mysql> desc test.user_action_2;

+---+--+--+-+-+---+

| Field | Type | Null | Key | Default | Extra |

+---+--+--+-+-+---+

| id| int(10) unsigned | YES  | | NULL|   |

| ctime | float| YES  | | NULL|   |

| atime | double   | YES  | | NULL|   |

+---+--+--+-+-+---+
I wonder if anyone have seen this behavior before.


Re: DataFrame --> JSON objects, instead of un-named array of fields

2016-03-29 Thread
Hi,
Besides your solution ,yon can use df.write.format('json').save('a.json')

2016-03-29 4:11 GMT+08:00 Russell Jurney :

> To answer my own question, DataFrame.toJSON() does this, so there is no
> need to map and json.dump():
>
>
> on_time_dataframe.toJSON().saveAsTextFile('../data/On_Time_On_Time_Performance_2015.jsonl')
>
>
> Thanks!
>
> On Mon, Mar 28, 2016 at 12:54 PM, Russell Jurney  > wrote:
>
>> In PySpark, given a DataFrame, I am attempting to save it as JSON
>> Lines/ndjson. I run this code:
>>
>> json_lines = on_time_dataframe.map(lambda x: json.dumps(x))
>>
>> json_lines.saveAsTextFile('../data/On_Time_On_Time_Performance_2015.jsonl')
>>
>>
>> This results in simple arrays of fields, instead of JSON objects:
>>
>> [2015, 1, 1, 1, 4, "2015-01-01", "AA", 19805, "AA", "N787AA", 1, 12478,
>> 1247802, 31703, "JFK", "New York, NY", "NY", 36, "New York", 22, 12892,
>> 1289203, 32575, "LAX", "Los Angeles, CA", "CA", 6, "California", 91, 900,
>> 855, -5.0, 0.0, 0.0, -1, "0900-0959", 17.0, 912, 1230, 7.0, 1230, 1237,
>> 7.0, 7.0, 0.0, 0, "1200-1259", 0.0, "", 0.0, 390.0, 402.0, 378.0, 1.0,
>> 2475.0, 10, null, null, null, null, null, null, null, null, 0, null, null,
>> null, null, "", null, null, null, null, null, null, "", "", null, null,
>> null, null, null, null, "", "", null, null, null, null, null, "", "", "",
>> "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
>>
>> What I actually want is JSON objects, with a field name for each field:
>>
>> {"year": "2015", "month": 1, ...}
>>
>>
>> How can I achieve this in PySpark?
>>
>> Thanks!
>> --
>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>
>
>
>
> --
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>


Re: Restrictions on SQL operations on Spark temporary tables

2016-02-27 Thread
Hi,
For now Spark-sql does not support subquery,I guess that's the reason your
query fails

2016-02-27 20:01 GMT+08:00 Mich Talebzadeh :

> It appeas that certain SQL on Spark temporary tables do not support Hive
> SQL even when they are using HiveContext
>
> example
>
> scala> HiveContext.sql("select count(1) from tmp  where ID in (select
> max(id) from tmp)")
> org.apache.spark.sql.AnalysisException:
> Unsupported language features in query: select count(1) from tmp  where ID
> in (select max(id) from tmp)
>
> that tmp table is from d.registerTempTable("tmp")
> The same Query in Hive on the same underlying table works fine.
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: Re: spark dataframe jdbc read/write using dbcp connection pool

2016-01-20 Thread
Hi,
I think you can view the spark job ui to find out whether the partition
works or not,pay attention to the storage page to the partition size and
which stage / task fails

2016-01-20 16:25 GMT+08:00 fightf...@163.com <fightf...@163.com>:

> OK. I am trying to use the jdbc read datasource with predicate like the
> following :
>sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
> I can see that the task goes to 62 partitions. But I still get exception
> and the parquet
> file did not write successfully. Do I need to increase the partitions? Or
> is there any other
> alternatives I can choose to tune this ?
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>
>
> *From:* fightf...@163.com
> *Date:* 2016-01-20 15:06
> *To:* 刘虓 <ipf...@gmail.com>
> *CC:* user <user@spark.apache.org>
> *Subject:* Re: Re: spark dataframe jdbc read/write using dbcp connection
> pool
> Hi,
> Thanks a lot for your suggestion. I then tried the following code :
> val prop = new java.util.Properties
> prop.setProperty("user","test")
> prop.setProperty("password", "test")
> prop.setProperty("partitionColumn", "added_year")
> prop.setProperty("lowerBound", "1985")
> prop.setProperty("upperBound","2015")
> prop.setProperty("numPartitions", "200")
>
> val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
> val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
> val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop)
> val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop)
>
> val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
> jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4
> ")
>
> The added_year column in mysql table contains range of (1985-2015), and I
> pass the numPartitions property
> to get the partition purpose. Is this what you recommend ? Can you advice
> a little more implementation on this ?
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>
>
> *From:* 刘虓 <ipf...@gmail.com>
> *Date:* 2016-01-20 11:26
> *To:* fightf...@163.com
> *CC:* user <user@spark.apache.org>
> *Subject:* Re: spark dataframe jdbc read/write using dbcp connection pool
> Hi,
> I suggest you partition the JDBC reading on a indexed column of the mysql
> table
>
> 2016-01-20 10:11 GMT+08:00 fightf...@163.com <fightf...@163.com>:
>
>> Hi ,
>> I want to load really large volumn datasets from mysql using spark
>> dataframe api. And then save as
>> parquet file or orc file to facilitate that with hive / Impala. The
>> datasets size is about 1 billion records and
>> when I am using the following naive code to run that , Error occurs and
>> executor lost failure.
>>
>> val prop = new java.util.Properties
>> prop.setProperty("user","test")
>> prop.setProperty("password", "test")
>>
>> val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
>> val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
>> val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
>> val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)
>>
>> val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
>> jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf
>> ")
>>
>> I can see from the executor log and the message is like the following. I
>> can see from the log that the wait_timeout threshold reached
>> and there is no retry mechanism in the code process. So I am asking you
>> experts to help on tuning this. Or should I try to use a jdbc
>> connection pool to increase parallelism ?
>>
>>
>> 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 
>> 0.0 (TID 0)
>>
>> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
>> failure
>>
>>
>> The last packet successfully received from the server was 377,769 
>> milliseconds ago.  The last packet sent successfully to the server was 
>> 377,790 milliseconds ago.
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>
>> Caused by:
>> java.io.EOFException: Can not read response from server. Expected to read 4 
>> bytes, read 1 bytes before connection was unexpectedly lost.
>> at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
>> at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
>> ... 22 more
>>
>> 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
>> task 4
>> 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
>>
>> 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 
>> 0.0 (TID 2)
>>
>> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
>> failure
>>
>>
>>
>> --
>> fightf...@163.com
>>
>
>


Re: spark dataframe jdbc read/write using dbcp connection pool

2016-01-19 Thread
Hi,
I suggest you partition the JDBC reading on a indexed column of the mysql
table

2016-01-20 10:11 GMT+08:00 fightf...@163.com :

> Hi ,
> I want to load really large volumn datasets from mysql using spark
> dataframe api. And then save as
> parquet file or orc file to facilitate that with hive / Impala. The
> datasets size is about 1 billion records and
> when I am using the following naive code to run that , Error occurs and
> executor lost failure.
>
> val prop = new java.util.Properties
> prop.setProperty("user","test")
> prop.setProperty("password", "test")
>
> val url1 = "jdbc:mysql://172.16.54.136:3306/db1"
> val url2 = "jdbc:mysql://172.16.54.138:3306/db1"
> val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop)
> val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop)
>
> val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2)
> jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf")
>
> I can see from the executor log and the message is like the following. I
> can see from the log that the wait_timeout threshold reached
> and there is no retry mechanism in the code process. So I am asking you
> experts to help on tuning this. Or should I try to use a jdbc
> connection pool to increase parallelism ?
>
>
> 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 
> (TID 0)
>
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
> failure
>
>
> The last packet successfully received from the server was 377,769 
> milliseconds ago.  The last packet sent successfully to the server was 
> 377,790 milliseconds ago.
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
> Caused by:
> java.io.EOFException: Can not read response from server. Expected to read 4 
> bytes, read 1 bytes before connection was unexpectedly lost.
> at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
> at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996)
> ... 22 more
>
> 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
> task 4
> 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection
>
> 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 
> (TID 2)
>
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
> failure
>
>
>
> --
> fightf...@163.com
>


Re: spark yarn client mode

2016-01-19 Thread
Hi,
No,you don't need to.
However,when submitting jobs certain resources will be uploaded to
hdfs,which could be a performance issue
read the log and you will understand:

15/12/29 11:10:06 INFO Client: Uploading resource
file:/data/spark/spark152/lib/spark-assembly-1.5.2-hadoop2.6.0.jar -> hdfs

15/12/29 11:10:08 INFO Client: Uploading resource
file:/data/spark/spark152/python/lib/pyspark.zip -> hdfs

15/12/29 11:10:08 INFO Client: Uploading resource
file:/data/spark/spark152/python/lib/py4j-0.8.2.1-src.zip -> hdfs

15/12/29 11:10:08 INFO Client: Uploading resource
file:/data/tmp/spark-86791975-2cef-4663-aacd-5da95e58cd91/__spark_conf__6261788210225867171.zip
-> hdfs

2016-01-19 19:43 GMT+08:00 Sanjeev Verma :

> Hi
>
> Do I need to install spark on all the yarn cluster node if I want to
> submit the job to yarn client?
> is there any way exists in which I can spawn a spark job executors on the
> cluster nodes where I have not installed spark.
>
> Thanks
> Sanjeev
>