Looks like somehow related to API unable to send data from executor to driver
If I set spark master to local I get these 6 files
When spark.master is local& InputReportAndFileName fileName
file:///Users/abc/Desktop/test/Streaming/d& InputReportAndFileName
fileName file:///Users/abc/Desktop/test/Streaming/a&
InputReportAndFileName fileName
file:///Users/abc/Desktop/test/Streaming/b& InputReportAndFileName
fileName file:///Users/abc/Desktop/test/Streaming/g&
InputReportAndFileName fileName
file:///Users/abc/Desktop/test/Streaming/e& InputReportAndFileName
fileName file:///Users/abc/Desktop/test/Streaming/f&
InputReportAndFileName fileName
If I set spark master local[*] I get these file...&
InputReportAndFileName fileName & InputReportAndFileName fileName
file:///Users/abc/Desktop/test/Streaming/b& InputReportAndFileName
fileName file:///Users/abc/Desktop/test/Streaming/e&
InputReportAndFileName fileName
file:///Users/abc/Desktop/test/Streaming/d& InputReportAndFileName
fileName & InputReportAndFileName fileName &
InputReportAndFileName fileName
On Tuesday, October 12, 2021, 05:23:45 AM PDT, Alchemist
wrote:
Here is Spark's API definition, unable to understand what does it mean to
have "unknown" file. We are processing file we will have fileName I have 7
files it can print 3 and miss other 4
/** * Returns the holding file name or empty string if it is unknown.
*/ def getInputFilePath: UTF8String = inputBlock.get().get().filePath
Can anyone help me understand what does it mean by file name unknown hence
above API returning blank filename below.
On Monday, October 11, 2021, 08:43:42 PM PDT, Alchemist
wrote:
Hello all,
I am trying to extract file name like following but intermittently we are
getting empty file name.
Step 1: Get SchemaStructType jsonSchema = sparkSession.read()
.option("multiLine", true) .json("src/main/resources/sample.json")
.schema();Step2: Get Input DataSetDataset inputDS = sparkSession
.readStream() .format("text") .option("multiLine", true) .schema(jsonSchema)
.json(inputPath + "/*");Step3: Add fileName columnDataset inputDf=
inputDS.select(functions.col("Report")).toJSON() .withColumn("FileName",
org.apache.spark.sql.functions.input_file_name());Step4: Print
fileNameDataset inputDF = inputDf
.as(ExpressionEncoder.javaBean(InputReportAndFileName.class)).map((MapFunction) inputReportAndFileName ->{
System.out.println("& InputReportAndFileName fileName " +
inputReportAndFileName.getFileName()); return inputReportAndFileName;},
ExpressionEncoder.javaBean(InputReportAndFileName.class));
Output: Here we see missing fileName& InputReportAndFileName
fileName & InputReportAndFileName fileName
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_1420254%202&
InputReportAndFileName fileName
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_14202040&
InputReportAndFileName fileName
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_142720%202&
InputReportAndFileName fileName & InputReportAndFileName
fileName & InputReportAndFileName fileName