Unsubscribe
Unsubscribe
How can I write data to ftp
DF.write.format("csv") .option("timestampFormat", "/MM/dd HH:mm:ss ZZ") .mode(SaveMode.Overwrite) .save("ftp://ftp:ftp@127.0.0.1:21/sparkftp/write/";) I get a error 21/08/10 08:32:04 WARN FileOutputCommitter: Could not delete ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/write/_temporary/0/_temporary/attempt_20210810082742__m_00_0 21/08/10 08:32:04 ERROR FileFormatWriter: Job job_20210810082742_ aborted. 21/08/10 08:32:04 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to rename FileStatus{path=ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/write/_temporary/0/_temporary/attempt_20210810082742__m_00_0/part-0-e2a4ba4d-18a3-4e97-b7f1-b7a7a65b777e-c000.csv; isDirectory=false; length=39; replication=1; blocksize=4096; modification_time=162852648; access_time=0; owner=1027; group=1027; permission=rw-r--r--; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} to ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/write/part-0-e2a4ba4d-18a3-4e97-b7f1-b7a7a65b777e-c000.csv at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:462) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:475) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:586) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:549) at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50) at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1442) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248) ... 10 more 21/08/10 08:32:04 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to rename FileStatus{path=ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/write/_temporary/0/_temporary/attempt_20210810082742__m_00_0/part-0-e2a4ba4d-18a3-4e97-b7f1-b7a7a65b777e-c000.csv; isDirectory=false; length=39; replicati
Performance of PySpark jobs on the Kubernetes cluster
Hi, I have a basic question to ask. I am running a Google k8s cluster (AKA GKE) with three nodes each having configuration below e2-standard-2 (2 vCPUs, 8 GB memory) spark-submit is launched from another node (actually a data proc single node that I have just upgraded to e2-custom (4 vCPUs, 8 GB mem). We call this the launch node OK I know that the cluster is not much but Google was complaining about the launch node hitting 100% cpus. So I added two more cpus to it. It appears that despite using k8s as the computational cluster, the burden falls upon the launch node! The cpu utilisation for launch node shown below [image: image.png] The dip is when 2 more cpus were added to it so it had to reboot. so around %70 usage The combined cpu usage for GKE nodes is shown below: [image: image.png] Never goes above 20%! I can see that the drive and executors as below: k get pods -n spark NAME READY STATUSRESTARTS AGE pytest-c958c97b2c52b6ed-driver 1/1 Running 0 69s randomdatabigquery-e68a8a7b2c52f468-exec-1 1/1 Running 0 51s randomdatabigquery-e68a8a7b2c52f468-exec-2 1/1 Running 0 51s randomdatabigquery-e68a8a7b2c52f468-exec-3 0/1 Pending 0 51s It is a PySpark 3.1.1 image using java 8 and pushing random data generated into Google BigQuery data warehouse. The last executor (exec-3) seems to be just pending. The spark-submit is as below: spark-submit --verbose \ --properties-file ${property_file} \ --master k8s://https://$KUBERNETES_MASTER_IP:443 \ --deploy-mode cluster \ --name pytest \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \ --py-files $CODE_DIRECTORY/DSBQ.zip \ --conf spark.kubernetes.namespace=$NAMESPACE \ --conf spark.executor.memory=5000m \ --conf spark.network.timeout=300 \ --conf spark.executor.instances=3 \ --conf spark.kubernetes.driver.limit.cores=1 \ --conf spark.driver.cores=1 \ --conf spark.executor.cores=1 \ --conf spark.executor.memory=2000m \ --conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \ --conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \ --conf spark.kubernetes.container.image=${IMAGEGCP} \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \ --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \ --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \ --conf spark.sql.execution.arrow.pyspark.enabled="true" \ $CODE_DIRECTORY/${APPLICATION} Aren't the driver and executors running on K8s cluster? So why is the launch node heavily used but k8s cluster is underutilized? Thanks *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
unsubscribe
unsubscribe Regards, Vijay Gharge
Is the pandas version in doc of using pyarrow in spark wrong
The doc says that the minimum supported pandas version is 0.23.2 which is only supported in python2. IIRC, python2 is not supported in pyspark a long time ago. Can any one confirm whether the doc is wrong and what is the right version of pandas and pyarrow ? https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#recommended-pandas-and-pyarrow-versions -- Best Regards Jeff Zhang
Re: How can I read ftp
We have solved it using an orchestrator, which copied data from FTP to HDFS. But of course, you can just use a Java FTP client to just read files, put them somewhere, and then read with Spark. пн, 9 авг. 2021 г. в 06:39, Sean Owen : > FTP is definitely not supported. Read the files to distributed storage > first then read from there. > > On Sun, Aug 8, 2021, 10:18 PM igyu wrote: > >> val ftpUrl = "ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/"; >> >> val schemas = StructType(List( >> new StructField("name", DataTypes.StringType, true), >> new StructField("age", DataTypes.IntegerType, true), >> new StructField("remk", DataTypes.StringType, true))) >> >>val DF = sparkSession.read.format("csv") >> .schema(schemas) >> .option("header","true") >> .load(ftpUrl) >> // .filter("created<=1602864000") >> >> DF.printSchema() >> DF.show() >> >> I get error >> >> Exception in thread "main" java.lang.IllegalArgumentException: Illegal >> pattern component: XXX >> at >> org.apache.commons.lang3.time.FastDatePrinter.parsePattern(FastDatePrinter.java:282) >> at >> org.apache.commons.lang3.time.FastDatePrinter.init(FastDatePrinter.java:149) >> at >> org.apache.commons.lang3.time.FastDatePrinter.(FastDatePrinter.java:142) >> at >> org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:384) >> at >> org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:369) >> at >> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:91) >> at >> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:88) >> at >> org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82) >> at >> org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:165) >> at >> org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:139) >> at >> org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:41) >> at >> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:105) >> at >> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) >> at >> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:312) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:310) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:330) >> at >> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:615) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) >> at >> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339) >> at >> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) >> at org.apache.spark.sql.Dataset.org >> $apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383) >> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) >> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) >> at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) >> at >> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) >> at >> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) >> at >> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) >> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) >> at org.apache.spark.sql.Dataset.head(Dataset.scala:2544) >> at org.apache.spark.sql.Dataset.take(Dataset.scala:2758) >> at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254) >> at org.apache.spark.sql.Dataset.showString(Dataset.scala:291) >> at org.apache.spark.sql.Dataset.show(Dataset.scala:745) >> at org.apache.spark.sql.Dataset.show(Dataset.scala:704) >> at org.apache.spark.sql.Dataset.show(Dataset.scala:713) >> at com.join.ftp.reader.FtpReader.readFrom(FtpReader.scala:40) >> at com.join.synctool$.main(synctool.scala:41) >> at com.join.synctool.main(synctool.scala) >> 21/08/09 11:15:08 INFO SparkContext: Invoking stop() from sh