Re: How can I read ftp
We have solved it using an orchestrator, which copied data from FTP to HDFS. But of course, you can just use a Java FTP client to just read files, put them somewhere, and then read with Spark. пн, 9 авг. 2021 г. в 06:39, Sean Owen : > FTP is definitely not supported. Read the files to distributed storage > first then read from there. > > On Sun, Aug 8, 2021, 10:18 PM igyu wrote: > >> val ftpUrl = "ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/"; >> >> val schemas = StructType(List( >> new StructField("name", DataTypes.StringType, true), >> new StructField("age", DataTypes.IntegerType, true), >> new StructField("remk", DataTypes.StringType, true))) >> >>val DF = sparkSession.read.format("csv") >> .schema(schemas) >> .option("header","true") >> .load(ftpUrl) >> // .filter("created<=1602864000") >> >> DF.printSchema() >> DF.show() >> >> I get error >> >> Exception in thread "main" java.lang.IllegalArgumentException: Illegal >> pattern component: XXX >> at >> org.apache.commons.lang3.time.FastDatePrinter.parsePattern(FastDatePrinter.java:282) >> at >> org.apache.commons.lang3.time.FastDatePrinter.init(FastDatePrinter.java:149) >> at >> org.apache.commons.lang3.time.FastDatePrinter.(FastDatePrinter.java:142) >> at >> org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:384) >> at >> org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:369) >> at >> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:91) >> at >> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:88) >> at >> org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82) >> at >> org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:165) >> at >> org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:139) >> at >> org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:41) >> at >> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:105) >> at >> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) >> at >> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:312) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:310) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:330) >> at >> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:615) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) >> at >> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339) >> at >> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) >> at org.apache.spark.sql.Dataset.org >> $apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383) >> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) >> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) >> at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) >> at >> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) >> at >> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) >> at >> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) >> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) >> at org.apache.spark.sql.Dataset.head(Dataset.scala:2544) >> at org.apache.spark.sql.Dataset.take(Dataset.scala:2758) >> at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254) >> at org.apache.spark.sql.Dataset.showString(Dataset.scala:291) >> at org.apache.spark.sql.Dataset.show(Dataset.scala:745) >> at org.apache.spark.sql.Dataset.show(Dataset.scala:704) >> at org.apache.spark.sql.Dataset.show(Dataset.scala:713) >> at com.join.ftp.reader.FtpReader.readFrom(FtpReader.scala:40) >> at com.join.synctool$.main(synctool.scala:41) >> at com.join.synctool.main(synctool.scala) >> 21/08/09 11:15:08 INFO SparkContext: Invoking stop() from sh
Re: How can I read ftp
FTP is definitely not supported. Read the files to distributed storage first then read from there. On Sun, Aug 8, 2021, 10:18 PM igyu wrote: > val ftpUrl = "ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/"; > > val schemas = StructType(List( > new StructField("name", DataTypes.StringType, true), > new StructField("age", DataTypes.IntegerType, true), > new StructField("remk", DataTypes.StringType, true))) > >val DF = sparkSession.read.format("csv") > .schema(schemas) > .option("header","true") > .load(ftpUrl) > // .filter("created<=1602864000") > > DF.printSchema() > DF.show() > > I get error > > Exception in thread "main" java.lang.IllegalArgumentException: Illegal > pattern component: XXX > at > org.apache.commons.lang3.time.FastDatePrinter.parsePattern(FastDatePrinter.java:282) > at > org.apache.commons.lang3.time.FastDatePrinter.init(FastDatePrinter.java:149) > at > org.apache.commons.lang3.time.FastDatePrinter.(FastDatePrinter.java:142) > at > org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:384) > at > org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:369) > at > org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:91) > at > org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:88) > at > org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82) > at > org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:165) > at > org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:139) > at > org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:41) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:105) > at > org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) > at > org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:312) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:310) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:330) > at > org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:615) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) > at org.apache.spark.sql.Dataset.org > $apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383) > at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) > at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) > at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) > at org.apache.spark.sql.Dataset.head(Dataset.scala:2544) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2758) > at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:291) > at org.apache.spark.sql.Dataset.show(Dataset.scala:745) > at org.apache.spark.sql.Dataset.show(Dataset.scala:704) > at org.apache.spark.sql.Dataset.show(Dataset.scala:713) > at com.join.ftp.reader.FtpReader.readFrom(FtpReader.scala:40) > at com.join.synctool$.main(synctool.scala:41) > at com.join.synctool.main(synctool.scala) > 21/08/09 11:15:08 INFO SparkContext: Invoking stop() from shutdown hook > > -- > igyu >
How can I read ftp
val ftpUrl = "ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/"; val schemas = StructType(List( new StructField("name", DataTypes.StringType, true), new StructField("age", DataTypes.IntegerType, true), new StructField("remk", DataTypes.StringType, true))) val DF = sparkSession.read.format("csv") .schema(schemas) .option("header","true") .load(ftpUrl) // .filter("created<=1602864000") DF.printSchema() DF.show() I get error Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX at org.apache.commons.lang3.time.FastDatePrinter.parsePattern(FastDatePrinter.java:282) at org.apache.commons.lang3.time.FastDatePrinter.init(FastDatePrinter.java:149) at org.apache.commons.lang3.time.FastDatePrinter.(FastDatePrinter.java:142) at org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:384) at org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:369) at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:91) at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:88) at org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82) at org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:165) at org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:139) at org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:41) at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:105) at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:312) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:310) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:330) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:615) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) at org.apache.spark.sql.Dataset.head(Dataset.scala:2544) at org.apache.spark.sql.Dataset.take(Dataset.scala:2758) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254) at org.apache.spark.sql.Dataset.showString(Dataset.scala:291) at org.apache.spark.sql.Dataset.show(Dataset.scala:745) at org.apache.spark.sql.Dataset.show(Dataset.scala:704) at org.apache.spark.sql.Dataset.show(Dataset.scala:713) at com.join.ftp.reader.FtpReader.readFrom(FtpReader.scala:40) at com.join.synctool$.main(synctool.scala:41) at com.join.synctool.main(synctool.scala) 21/08/09 11:15:08 INFO SparkContext: Invoking stop() from shutdown hook igyu