Can you show us the full exception stacktrace? Intuitively I would think your cluster configuration contains an invalid value for some memory configuration option.

On 3/4/2021 4:45 PM, Avi Levi wrote:
Hi ,
I am pretty new. I am keep on struggling to read a file from s3 but getting this weird exception : Caused by: java.lang.NumberFormatException: For input string: "64M" (if anyone can link me to a working github example that will be awesome) . what am i doing wrong?

This is how my code looks like this :
import org.apache.flink.api.scala.createTypeInformation import 
org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory

class ParquetSourceFunctionextends SourceFunction[String]{
   override def run(ctx: SourceFunction.SourceContext[String]):Unit = {
     val inputPath ="s3a://foo/year=2000/month=02/"val conf =new Configuration()
     conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
     val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
     val readFooter = ParquetFileReader.open(hadoopFile)
     val metadata = readFooter.getFileMetaData
     val schema = metadata.getSchema
     val parquetFileReader =new ParquetFileReader(conf, metadata, new 
Path(inputPath), readFooter.getRowGroups, schema.getColumns)
var pages: PageReadStore =null try {
       while ({ pages = parquetFileReader.readNextRowGroup; pages !=null }) {
         val rows = pages.getRowCount
         val columnIO =new ColumnIOFactory().getColumnIO(schema)
         val recordReader = columnIO.getRecordReader(pages, new 
GroupRecordConverter(schema))
         (0L until rows).foreach { _ =>
           val group = recordReader.read()
           val myString = group.getString("field_name", 0)
           ctx.collect(myString)
         }
       }
     }
   }

   override def cancel():Unit =??? }

object Job {
   def main(args: Array[String]):Unit = {
     // set up the execution environment lazy val env = 
StreamExecutionEnvironment.getExecutionEnvironment lazy val stream = 
env.addSource(new ParquetSourceFunction)
     stream.print()
     env.execute()
   }
}
sbt dependencies :

ThisBuild /scalaVersion :="2.12.1" val flinkVersion ="1.12.1" val awsSdkVersion ="1.7.4" 
val hadoopVersion ="2.7.3" val flinkDependencies =Seq(
   "org.apache.flink" %%"flink-clients" % flinkVersion,// % "provided", "org.apache.flink" %%"flink-scala" % flinkVersion,// % "provided", 
"org.apache.flink" %%"flink-streaming-scala" % flinkVersion, // % "provided") "org.apache.flink" %%"flink-parquet" % flinkVersion, "org.apache.flink" 
%%"flink-hadoop-compatibility" % flinkVersion)

val s3Dependencies =Seq(
   ("com.amazonaws" %"aws-java-sdk" % awsSdkVersion), ("org.apache.hadoop" 
%"hadoop-aws" % hadoopVersion)
)

val serializationDependencies =Seq(
   ("org.apache.avro" %"avro" %"1.7.7"), ("org.apache.avro" %"avro-mapred" %"1.7.7").classifier("hadoop2"), 
("org.apache.parquet" %"parquet-avro" %"1.8.1"))

lazy val root = (project infile(".")).
   settings(
     libraryDependencies ++= flinkDependencies, libraryDependencies ++= s3Dependencies, libraryDependencies ++= serializationDependencies, libraryDependencies 
+="org.apache.hadoop" %"hadoop-common" %"3.3.0" , libraryDependencies +="org.apache.parquet" %"parquet-hadoop" 
%"1.11.1", libraryDependencies +="org.apache.flink" %%"flink-table-planner-blink" %"1.12.1" //% "provided" )


Reply via email to