Can you show us the full exception stacktrace? Intuitively I would think
your cluster configuration contains an invalid value for some memory
configuration option.
On 3/4/2021 4:45 PM, Avi Levi wrote:
Hi ,
I am pretty new. I am keep on struggling to read a file from s3 but
getting this weird exception :
Caused by: java.lang.NumberFormatException: For input string: "64M"
(if anyone can link me to a working github example that will be
awesome) . what am i doing wrong?
This is how my code looks like this :
import org.apache.flink.api.scala.createTypeInformation import
org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory
class ParquetSourceFunctionextends SourceFunction[String]{
override def run(ctx: SourceFunction.SourceContext[String]):Unit = {
val inputPath ="s3a://foo/year=2000/month=02/"val conf =new Configuration()
conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
val readFooter = ParquetFileReader.open(hadoopFile)
val metadata = readFooter.getFileMetaData
val schema = metadata.getSchema
val parquetFileReader =new ParquetFileReader(conf, metadata, new
Path(inputPath), readFooter.getRowGroups, schema.getColumns)
var pages: PageReadStore =null try {
while ({ pages = parquetFileReader.readNextRowGroup; pages !=null }) {
val rows = pages.getRowCount
val columnIO =new ColumnIOFactory().getColumnIO(schema)
val recordReader = columnIO.getRecordReader(pages, new
GroupRecordConverter(schema))
(0L until rows).foreach { _ =>
val group = recordReader.read()
val myString = group.getString("field_name", 0)
ctx.collect(myString)
}
}
}
}
override def cancel():Unit =??? }
object Job {
def main(args: Array[String]):Unit = {
// set up the execution environment lazy val env =
StreamExecutionEnvironment.getExecutionEnvironment lazy val stream =
env.addSource(new ParquetSourceFunction)
stream.print()
env.execute()
}
}
sbt dependencies :
ThisBuild /scalaVersion :="2.12.1" val flinkVersion ="1.12.1" val awsSdkVersion ="1.7.4"
val hadoopVersion ="2.7.3" val flinkDependencies =Seq(
"org.apache.flink" %%"flink-clients" % flinkVersion,// % "provided", "org.apache.flink" %%"flink-scala" % flinkVersion,// % "provided",
"org.apache.flink" %%"flink-streaming-scala" % flinkVersion, // % "provided") "org.apache.flink" %%"flink-parquet" % flinkVersion, "org.apache.flink"
%%"flink-hadoop-compatibility" % flinkVersion)
val s3Dependencies =Seq(
("com.amazonaws" %"aws-java-sdk" % awsSdkVersion), ("org.apache.hadoop"
%"hadoop-aws" % hadoopVersion)
)
val serializationDependencies =Seq(
("org.apache.avro" %"avro" %"1.7.7"), ("org.apache.avro" %"avro-mapred" %"1.7.7").classifier("hadoop2"),
("org.apache.parquet" %"parquet-avro" %"1.8.1"))
lazy val root = (project infile(".")).
settings(
libraryDependencies ++= flinkDependencies, libraryDependencies ++= s3Dependencies, libraryDependencies ++= serializationDependencies, libraryDependencies
+="org.apache.hadoop" %"hadoop-common" %"3.3.0" , libraryDependencies +="org.apache.parquet" %"parquet-hadoop"
%"1.11.1", libraryDependencies +="org.apache.flink" %%"flink-table-planner-blink" %"1.12.1" //% "provided" )