Re: Spark Structured Streaming org.apache.spark.sql.functions.input_file_name Intermittently Missing FileName

2021-10-12 Thread Alchemist
 Looks like somehow related to API unable to send data from executor to driver
If I set spark master to local I get these 6 files
When spark.master is local&  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/d&  InputReportAndFileName 
fileName file:///Users/abc/Desktop/test/Streaming/a&  
InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/b&  InputReportAndFileName 
fileName file:///Users/abc/Desktop/test/Streaming/g&  
InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/e&  InputReportAndFileName 
fileName file:///Users/abc/Desktop/test/Streaming/f&  
InputReportAndFileName fileName 
If I set spark master local[*] I get these file...&  
InputReportAndFileName fileName &  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/b&  InputReportAndFileName 
fileName file:///Users/abc/Desktop/test/Streaming/e&  
InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/d&  InputReportAndFileName 
fileName &  InputReportAndFileName fileName &  
InputReportAndFileName fileName 

On Tuesday, October 12, 2021, 05:23:45 AM PDT, Alchemist 
 wrote:  
 
  Here is Spark's API definition, unable to understand what does it mean to 
have "unknown" file.  We are processing file we will have fileName I have 7 
files it can print 3 and miss other 4
    /**       * Returns the holding file name or empty string if it is unknown. 
   */        def getInputFilePath: UTF8String = inputBlock.get().get().filePath
Can anyone help me understand what does it mean by file name unknown hence 
above API returning blank filename below.
On Monday, October 11, 2021, 08:43:42 PM PDT, Alchemist 
 wrote:  
 
 Hello all,
I am trying to extract file name like following but intermittently we are 
getting empty file name.
Step 1: Get SchemaStructType jsonSchema = sparkSession.read() 
.option("multiLine", true) .json("src/main/resources/sample.json") 
.schema();Step2: Get Input DataSetDataset inputDS = sparkSession 
.readStream() .format("text") .option("multiLine", true) .schema(jsonSchema) 
.json(inputPath + "/*");Step3: Add fileName columnDataset inputDf= 
inputDS.select(functions.col("Report")).toJSON()  .withColumn("FileName", 
org.apache.spark.sql.functions.input_file_name());Step4: Print 
fileNameDataset inputDF = inputDf 
.as(ExpressionEncoder.javaBean(InputReportAndFileName.class)).map((MapFunction) inputReportAndFileName ->{  
System.out.println("&  InputReportAndFileName fileName " + 
inputReportAndFileName.getFileName()); return inputReportAndFileName;}, 
ExpressionEncoder.javaBean(InputReportAndFileName.class));
Output: Here we see missing fileName&  InputReportAndFileName 
fileName &  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_1420254%202&
  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_14202040&
  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_142720%202&
  InputReportAndFileName fileName &  InputReportAndFileName 
fileName &  InputReportAndFileName fileName 


Re: Spark Structured Streaming org.apache.spark.sql.functions.input_file_name Intermittently Missing FileName

2021-10-12 Thread Alchemist
 Here is Spark's API definition, unable to understand what does it mean to have 
"unknown" file.  We are processing file we will have fileName I have 7 
files it can print 3 and miss other 4
    /**       * Returns the holding file name or empty string if it is unknown. 
   */        def getInputFilePath: UTF8String = inputBlock.get().get().filePath
Can anyone help me understand what does it mean by file name unknown hence 
above API returning blank filename below.
On Monday, October 11, 2021, 08:43:42 PM PDT, Alchemist 
 wrote:  
 
 Hello all,
I am trying to extract file name like following but intermittently we are 
getting empty file name.
Step 1: Get SchemaStructType jsonSchema = sparkSession.read() 
.option("multiLine", true) .json("src/main/resources/sample.json") 
.schema();Step2: Get Input DataSetDataset inputDS = sparkSession 
.readStream() .format("text") .option("multiLine", true) .schema(jsonSchema) 
.json(inputPath + "/*");Step3: Add fileName columnDataset inputDf= 
inputDS.select(functions.col("Report")).toJSON()  .withColumn("FileName", 
org.apache.spark.sql.functions.input_file_name());Step4: Print 
fileNameDataset inputDF = inputDf 
.as(ExpressionEncoder.javaBean(InputReportAndFileName.class)).map((MapFunction) inputReportAndFileName ->{  
System.out.println("&  InputReportAndFileName fileName " + 
inputReportAndFileName.getFileName()); return inputReportAndFileName;}, 
ExpressionEncoder.javaBean(InputReportAndFileName.class));
Output: Here we see missing fileName&  InputReportAndFileName 
fileName &  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_1420254%202&
  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_14202040&
  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_142720%202&
  InputReportAndFileName fileName &  InputReportAndFileName 
fileName &  InputReportAndFileName fileName