Hello Enrico,

Thanks for the reply. I found that I would have to use `mapPartitions` API
of RDD to perform this safely as I have to
1. Read each file from GCS using HDFS FileSystem API.
2. Parse each JSON record in a safe manner.

For (1) to work, I do have to broadcast HadoopConfiguration from
sparkContext. I did try to use GCS Java API to read content, but ran into
many JAR conflicts as the HDFS wrapper and the JAR library uses different
dependencies.
Hope this findings helps others as well.

Thanks,
Muthu


On Mon, 11 Jul 2022 at 14:11, Enrico Minack <i...@enrico.minack.dev> wrote:

> All you need to do is implement a method readJson that reads a single
> file given its path. Than, you map the values of column file_path to the
> respective JSON content as a string. This can be done via an UDF or simply
> Dataset.map:
>
> case class RowWithJsonUri(entity_id: String, file_path: String,
> other_useful_id: String)
> case class RowWithJsonContent(entity_id: String, json_content: String,
> other_useful_id: String)
>
> val ds = Seq(
>   RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
> "id-2-01g4he5cb4xqn6s1999k6y1vbd"),
>   RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
> "id-2-01g4he5cbh52che104rwy603sr"),
>   RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
> "id-2-01g4he5cbqmdv7dnx46sebs0gt"),
>   RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
> "id-2-01g4he5cbx1kwhgvdme1s560dw")
> ).toDS()
>
> ds.show(false)
>
> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
> |entity_id
> |file_path
> |other_useful_id                |
>
> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>
> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>
> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>
> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>
> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>
> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>
>
> def readJson(uri: String): String = { s"content of $uri" }
>
> ds.map { row => RowWithJsonContent(row.entity_id, readJson(row.file_path),
> row.other_useful_id) }.show(false)
>
> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+
> |entity_id
> |json_content
> |other_useful_id                |
>
> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+
> |id-01f7pqqbxddb3b1an6ntyqx6mg|content of
> gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
> |id-01f7pqgbwms4ajmdtdedtwa3mf|content of
> gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
> |id-01f7pqqbxejt3ef4ap9qcs78m5|content of
> gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
> |id-01f7pqqbynh895ptpjjfxvk6dc|content of
> gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>
> +-----------------------------+------------------------------------------------------------------------------+-------------------------------+
>
> Cheers,
> Enrico
>
>
>
>
> Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:
>
> Hello there,
>
> I have a dataframe with the following...
>
>
> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
> |entity_id                    |file_path
>                        |other_useful_id                |
>
> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>
> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>
> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>
> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>
> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>
> +-----------------------------+-------------------------------------------------------------------+-------------------------------+
>
> I would like to read each row from `file_path` and write the result to
> another dataframe containing `entity_id`, `other_useful_id`,
> `json_content`, `file_path`.
> Assume that I already have the required HDFS url libraries in my classpath.
>
> Please advice,
> Muthu
>
>
>
>

Reply via email to