Re: How can I read ftp

2021-08-09 Thread Паша
We have solved it using an orchestrator, which copied data from FTP to
HDFS. But of course, you can just use a Java FTP client to just read files,
put them somewhere, and then read with Spark.

пн, 9 авг. 2021 г. в 06:39, Sean Owen :

> FTP is definitely not supported. Read the files to distributed storage
> first then read from there.
>
> On Sun, Aug 8, 2021, 10:18 PM igyu  wrote:
>
>> val ftpUrl = "ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/";
>>
>> val schemas = StructType(List(
>> new StructField("name", DataTypes.StringType, true),
>> new StructField("age", DataTypes.IntegerType, true),
>> new StructField("remk", DataTypes.StringType, true)))
>>
>>val DF = sparkSession.read.format("csv")
>>   .schema(schemas)
>>   .option("header","true")
>>   .load(ftpUrl)
>> //  .filter("created<=1602864000")
>>
>> DF.printSchema()
>> DF.show()
>>
>> I get error
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Illegal
>> pattern component: XXX
>> at
>> org.apache.commons.lang3.time.FastDatePrinter.parsePattern(FastDatePrinter.java:282)
>> at
>> org.apache.commons.lang3.time.FastDatePrinter.init(FastDatePrinter.java:149)
>> at
>> org.apache.commons.lang3.time.FastDatePrinter.(FastDatePrinter.java:142)
>> at
>> org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:384)
>> at
>> org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:369)
>> at
>> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:91)
>> at
>> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:88)
>> at
>> org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82)
>> at
>> org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:165)
>> at
>> org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:139)
>> at
>> org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:41)
>> at
>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:105)
>> at
>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>> at
>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>> at
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:312)
>> at
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:310)
>> at
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:330)
>> at
>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>> at
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:615)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> at
>> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
>> at
>> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
>> at org.apache.spark.sql.Dataset.org
>> $apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
>> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
>> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
>> at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
>> at
>> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
>> at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
>> at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
>> at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
>> at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
>> at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
>> at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
>> at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
>> at com.join.ftp.reader.FtpReader.readFrom(FtpReader.scala:40)
>> at com.join.synctool$.main(synctool.scala:41)
>> at com.join.synctool.main(synctool.scala)
>> 21/08/09 11:15:08 INFO SparkContext: Invoking stop() from sh

Is the pandas version in doc of using pyarrow in spark wrong

2021-08-09 Thread Jeff Zhang
The doc says that the minimum supported pandas version is 0.23.2 which is
only supported in python2.
IIRC, python2 is not supported in pyspark a long time ago. Can any one
confirm whether the doc is wrong and what is the right version of pandas
and pyarrow ?

https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#recommended-pandas-and-pyarrow-versions

-- 
Best Regards

Jeff Zhang


unsubscribe

2021-08-09 Thread Vijay Gharge
unsubscribe

Regards,
Vijay Gharge


Performance of PySpark jobs on the Kubernetes cluster

2021-08-09 Thread Mich Talebzadeh
Hi,

I have a basic question to ask.

I am running a Google k8s cluster (AKA GKE) with three nodes each having
configuration below

e2-standard-2 (2 vCPUs, 8 GB memory)


spark-submit is launched from another node (actually a data proc single
node that I have just upgraded to e2-custom (4 vCPUs, 8 GB mem). We call
this the launch node

OK I know that the cluster is not much but Google was complaining about the
launch node hitting 100% cpus. So I added two more cpus to it.

It appears that despite using k8s as the computational cluster, the burden
falls upon the launch node!

The cpu utilisation for launch node shown below

[image: image.png]
The dip is when 2 more cpus were added to  it so it had to reboot. so
around %70 usage

The combined cpu usage for GKE nodes is shown below:

[image: image.png]

Never goes above 20%!

I can see that the drive and executors as below:

k get pods -n spark
NAME READY   STATUSRESTARTS
 AGE
pytest-c958c97b2c52b6ed-driver   1/1 Running   0
69s
randomdatabigquery-e68a8a7b2c52f468-exec-1   1/1 Running   0
51s
randomdatabigquery-e68a8a7b2c52f468-exec-2   1/1 Running   0
51s
randomdatabigquery-e68a8a7b2c52f468-exec-3   0/1 Pending   0
51s

It is a PySpark 3.1.1 image using java 8 and pushing random data generated
into Google BigQuery data warehouse. The last executor (exec-3) seems to be
just pending. The spark-submit is as below:

spark-submit --verbose \
   --properties-file ${property_file} \
   --master k8s://https://$KUBERNETES_MASTER_IP:443 \
   --deploy-mode cluster \
   --name pytest \
   --conf
spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \
   --py-files $CODE_DIRECTORY/DSBQ.zip \
   --conf spark.kubernetes.namespace=$NAMESPACE \
   --conf spark.executor.memory=5000m \
   --conf spark.network.timeout=300 \
   --conf spark.executor.instances=3 \
   --conf spark.kubernetes.driver.limit.cores=1 \
   --conf spark.driver.cores=1 \
   --conf spark.executor.cores=1 \
   --conf spark.executor.memory=2000m \
   --conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \
   --conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \
   --conf spark.kubernetes.container.image=${IMAGEGCP} \
   --conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
   --conf
spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
   --conf
spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
\
   --conf spark.sql.execution.arrow.pyspark.enabled="true" \
   $CODE_DIRECTORY/${APPLICATION}

Aren't the driver and executors running on K8s cluster? So why is the
launch node heavily used but k8s cluster is underutilized?

Thanks

*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


How can I write data to ftp

2021-08-09 Thread igyu
DF.write.format("csv")
  .option("timestampFormat", "/MM/dd HH:mm:ss ZZ")
  .mode(SaveMode.Overwrite)
  .save("ftp://ftp:ftp@127.0.0.1:21/sparkftp/write/";)
I get a error

21/08/10 08:32:04 WARN FileOutputCommitter: Could not delete 
ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/write/_temporary/0/_temporary/attempt_20210810082742__m_00_0
21/08/10 08:32:04 ERROR FileFormatWriter: Job job_20210810082742_ aborted.
21/08/10 08:32:04 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to rename 
FileStatus{path=ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/write/_temporary/0/_temporary/attempt_20210810082742__m_00_0/part-0-e2a4ba4d-18a3-4e97-b7f1-b7a7a65b777e-c000.csv;
 isDirectory=false; length=39; replication=1; blocksize=4096; 
modification_time=162852648; access_time=0; owner=1027; group=1027; 
permission=rw-r--r--; isSymlink=false; hasAcl=false; isEncrypted=false; 
isErasureCoded=false} to 
ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/write/part-0-e2a4ba4d-18a3-4e97-b7f1-b7a7a65b777e-c000.csv
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:462)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:475)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:586)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:549)
at 
org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at 
org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)
at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1442)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
... 10 more
21/08/10 08:32:04 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
localhost, executor driver): org.apache.spark.SparkException: Task failed while 
writing rows.
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to rename 
FileStatus{path=ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/write/_temporary/0/_temporary/attempt_20210810082742__m_00_0/part-0-e2a4ba4d-18a3-4e97-b7f1-b7a7a65b777e-c000.csv;
 isDirectory=false; length=39; replicati

Unsubscribe

2021-08-09 Thread Sandeep Patra
Unsubscribe