Another option is: 1. collect the dataframe with file path 2. create a list of paths 3. create a new dataframe with spark.read.json and pass the list of path
This will save you lots of headache Ayan On Wed, Jul 13, 2022 at 7:35 AM Enrico Minack <i...@enrico.minack.dev> wrote: > Hi, > > how does RDD's mapPartitions make a difference regarding 1. and 2. > compared to Dataset's mapPartitions / map function? > > Enrico > > > Am 12.07.22 um 22:13 schrieb Muthu Jayakumar: > > Hello Enrico, > > Thanks for the reply. I found that I would have to use `mapPartitions` API > of RDD to perform this safely as I have to > 1. Read each file from GCS using HDFS FileSystem API. > 2. Parse each JSON record in a safe manner. > > For (1) to work, I do have to broadcast HadoopConfiguration from > sparkContext. I did try to use GCS Java API to read content, but ran into > many JAR conflicts as the HDFS wrapper and the JAR library uses different > dependencies. > Hope this findings helps others as well. > > Thanks, > Muthu > > > On Mon, 11 Jul 2022 at 14:11, Enrico Minack <i...@enrico.minack.dev> > wrote: > >> All you need to do is implement a method readJson that reads a single >> file given its path. Than, you map the values of column file_path to the >> respective JSON content as a string. This can be done via an UDF or simply >> Dataset.map: >> >> case class RowWithJsonUri(entity_id: String, file_path: String, >> other_useful_id: String) >> case class RowWithJsonContent(entity_id: String, json_content: String, >> other_useful_id: String) >> >> val ds = Seq( >> RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg", >> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json", >> "id-2-01g4he5cb4xqn6s1999k6y1vbd"), >> RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf", >> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json", >> "id-2-01g4he5cbh52che104rwy603sr"), >> RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5", >> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json", >> "id-2-01g4he5cbqmdv7dnx46sebs0gt"), >> RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc", >> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json", >> "id-2-01g4he5cbx1kwhgvdme1s560dw") >> ).toDS() >> >> ds.show(false) >> >> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >> |entity_id >> |file_path >> |other_useful_id | >> >> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >> >> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| >> >> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >> >> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >> >> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >> >> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >> >> >> def readJson(uri: String): String = { s"content of $uri" } >> >> ds.map { row => RowWithJsonContent(row.entity_id, >> readJson(row.file_path), row.other_useful_id) }.show(false) >> >> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+ >> |entity_id >> |json_content >> |other_useful_id | >> >> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+ >> |id-01f7pqqbxddb3b1an6ntyqx6mg|content of >> gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| >> |id-01f7pqgbwms4ajmdtdedtwa3mf|content of >> gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >> |id-01f7pqqbxejt3ef4ap9qcs78m5|content of >> gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >> |id-01f7pqqbynh895ptpjjfxvk6dc|content of >> gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >> >> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+ >> >> Cheers, >> Enrico >> >> >> >> >> Am 10.07.22 um 09:11 schrieb Muthu Jayakumar: >> >> Hello there, >> >> I have a dataframe with the following... >> >> >> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >> |entity_id |file_path >> |other_useful_id | >> >> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >> >> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| >> >> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >> >> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >> >> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >> >> +-----------------------------+-------------------------------------------------------------------+-------------------------------+ >> >> I would like to read each row from `file_path` and write the result to >> another dataframe containing `entity_id`, `other_useful_id`, >> `json_content`, `file_path`. >> Assume that I already have the required HDFS url libraries in my >> classpath. >> >> Please advice, >> Muthu >> >> >> >> > -- Best Regards, Ayan Guha