Re: reading each JSON file from dataframe...
Hello Ayan, Thank you for the suggestion. But, I would lose correlation of the JSON file with the other identifier fields. Also, if there are too many files, will it be an issue? Plus, I may not have the same schema across all the files. Hello Enrico, >how does RDD's mapPartitions make a difference regarding I guess, in the question above I do have to process row-wise and RDD may be more efficient? Thanks, Muthu On Tue, 12 Jul 2022 at 14:55, ayan guha wrote: > Another option is: > > 1. collect the dataframe with file path > 2. create a list of paths > 3. create a new dataframe with spark.read.json and pass the list of path > > This will save you lots of headache > > Ayan > > > On Wed, Jul 13, 2022 at 7:35 AM Enrico Minack > wrote: > >> Hi, >> >> how does RDD's mapPartitions make a difference regarding 1. and 2. >> compared to Dataset's mapPartitions / map function? >> >> Enrico >> >> >> Am 12.07.22 um 22:13 schrieb Muthu Jayakumar: >> >> Hello Enrico, >> >> Thanks for the reply. I found that I would have to use `mapPartitions` >> API of RDD to perform this safely as I have to >> 1. Read each file from GCS using HDFS FileSystem API. >> 2. Parse each JSON record in a safe manner. >> >> For (1) to work, I do have to broadcast HadoopConfiguration from >> sparkContext. I did try to use GCS Java API to read content, but ran into >> many JAR conflicts as the HDFS wrapper and the JAR library uses different >> dependencies. >> Hope this findings helps others as well. >> >> Thanks, >> Muthu >> >> >> On Mon, 11 Jul 2022 at 14:11, Enrico Minack >> wrote: >> >>> All you need to do is implement a method readJson that reads a single >>> file given its path. Than, you map the values of column file_path to >>> the respective JSON content as a string. This can be done via an UDF or >>> simply Dataset.map: >>> >>> case class RowWithJsonUri(entity_id: String, file_path: String, >>> other_useful_id: String) >>> case class RowWithJsonContent(entity_id: String, json_content: String, >>> other_useful_id: String) >>> >>> val ds = Seq( >>> RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg", >>> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json", >>> "id-2-01g4he5cb4xqn6s1999k6y1vbd"), >>> RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf", >>> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json", >>> "id-2-01g4he5cbh52che104rwy603sr"), >>> RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5", >>> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json", >>> "id-2-01g4he5cbqmdv7dnx46sebs0gt"), >>> RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc", >>> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json", >>> "id-2-01g4he5cbx1kwhgvdme1s560dw") >>> ).toDS() >>> >>> ds.show(false) >>> >>> +-+---+---+ >>> |entity_id >>> |file_path >>> |other_useful_id| >>> >>> +-+---+---+ >>> >>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| >>> >>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >>> >>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >>> >>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >>> >>> +-+---+---+ >>> >>> >>> def readJson(uri: String): String = { s"content of $uri" } >>> >>> ds.map { row => RowWithJsonContent(row.entity_id, >>> readJson(row.file_path), row.other_useful_id) }.show(false) >>> >>> +-+--+---+ >>> |entity_id >>> |json_content >>> |other_useful_id| >>> >>> +-+--+---+ >>> |id-01f7pqqbxddb3b1an6ntyqx6mg|content of >>> gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| >>> |id-01f7pqgbwms4ajmdtdedtwa3mf|content of >>> gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >>> |id-01f7pqqbxejt3ef4ap9qcs78m5|content of >>> gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >>> |id-01f7pqqbynh895ptpjjfxvk6dc|content of >>> gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >>> >>>
Spark streaming pending mircobatches queue max length
Hello, Spark is adding entry to pending microbatches queue at periodic batch interval. Is there config to set the max size for pending microbatches queue ? Thanks
Re: reading each JSON file from dataframe...
Another option is: 1. collect the dataframe with file path 2. create a list of paths 3. create a new dataframe with spark.read.json and pass the list of path This will save you lots of headache Ayan On Wed, Jul 13, 2022 at 7:35 AM Enrico Minack wrote: > Hi, > > how does RDD's mapPartitions make a difference regarding 1. and 2. > compared to Dataset's mapPartitions / map function? > > Enrico > > > Am 12.07.22 um 22:13 schrieb Muthu Jayakumar: > > Hello Enrico, > > Thanks for the reply. I found that I would have to use `mapPartitions` API > of RDD to perform this safely as I have to > 1. Read each file from GCS using HDFS FileSystem API. > 2. Parse each JSON record in a safe manner. > > For (1) to work, I do have to broadcast HadoopConfiguration from > sparkContext. I did try to use GCS Java API to read content, but ran into > many JAR conflicts as the HDFS wrapper and the JAR library uses different > dependencies. > Hope this findings helps others as well. > > Thanks, > Muthu > > > On Mon, 11 Jul 2022 at 14:11, Enrico Minack > wrote: > >> All you need to do is implement a method readJson that reads a single >> file given its path. Than, you map the values of column file_path to the >> respective JSON content as a string. This can be done via an UDF or simply >> Dataset.map: >> >> case class RowWithJsonUri(entity_id: String, file_path: String, >> other_useful_id: String) >> case class RowWithJsonContent(entity_id: String, json_content: String, >> other_useful_id: String) >> >> val ds = Seq( >> RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg", >> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json", >> "id-2-01g4he5cb4xqn6s1999k6y1vbd"), >> RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf", >> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json", >> "id-2-01g4he5cbh52che104rwy603sr"), >> RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5", >> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json", >> "id-2-01g4he5cbqmdv7dnx46sebs0gt"), >> RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc", >> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json", >> "id-2-01g4he5cbx1kwhgvdme1s560dw") >> ).toDS() >> >> ds.show(false) >> >> +-+---+---+ >> |entity_id >> |file_path >> |other_useful_id| >> >> +-+---+---+ >> >> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| >> >> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >> >> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >> >> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >> >> +-+---+---+ >> >> >> def readJson(uri: String): String = { s"content of $uri" } >> >> ds.map { row => RowWithJsonContent(row.entity_id, >> readJson(row.file_path), row.other_useful_id) }.show(false) >> >> +-+--+---+ >> |entity_id >> |json_content >> |other_useful_id| >> >> +-+--+---+ >> |id-01f7pqqbxddb3b1an6ntyqx6mg|content of >> gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| >> |id-01f7pqgbwms4ajmdtdedtwa3mf|content of >> gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| >> |id-01f7pqqbxejt3ef4ap9qcs78m5|content of >> gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| >> |id-01f7pqqbynh895ptpjjfxvk6dc|content of >> gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| >> >> +-+--+---+ >> >> Cheers, >> Enrico >> >> >> >> >> Am 10.07.22 um 09:11 schrieb Muthu Jayakumar: >> >> Hello there, >> >> I have a dataframe with the following... >> >> >> +-+---+---+ >> |entity_id|file_path >> |other_useful_id| >> >>
Re: reading each JSON file from dataframe...
Hi, how does RDD's mapPartitions make a difference regarding 1. and 2. compared to Dataset's mapPartitions / map function? Enrico Am 12.07.22 um 22:13 schrieb Muthu Jayakumar: Hello Enrico, Thanks for the reply. I found that I would have to use `mapPartitions` API of RDD to perform this safely as I have to 1. Read each file from GCS using HDFS FileSystem API. 2. Parse each JSON record in a safe manner. For (1) to work, I do have to broadcast HadoopConfiguration from sparkContext. I did try to use GCS Java API to read content, but ran into many JAR conflicts as the HDFS wrapper and the JAR library uses different dependencies. Hope this findings helps others as well. Thanks, Muthu On Mon, 11 Jul 2022 at 14:11, Enrico Minack wrote: All you need to do is implement a method readJson that reads a single file given its path. Than, you map the values of column file_path to the respective JSON content as a string. This can be done via an UDF or simply Dataset.map: case class RowWithJsonUri(entity_id: String, file_path: String, other_useful_id: String) case class RowWithJsonContent(entity_id: String, json_content: String, other_useful_id: String) val ds = Seq( RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg", "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json", "id-2-01g4he5cb4xqn6s1999k6y1vbd"), RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf", "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json", "id-2-01g4he5cbh52che104rwy603sr"), RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5", "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json", "id-2-01g4he5cbqmdv7dnx46sebs0gt"), RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc", "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json", "id-2-01g4he5cbx1kwhgvdme1s560dw") ).toDS() ds.show(false) +-+---+---+ |entity_id |file_path |other_useful_id | +-+---+---+ |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| +-+---+---+ def readJson(uri: String): String = { s"content of $uri" } ds.map { row => RowWithJsonContent(row.entity_id, readJson(row.file_path), row.other_useful_id) }.show(false) +-+--+---+ |entity_id |json_content |other_useful_id | +-+--+---+ |id-01f7pqqbxddb3b1an6ntyqx6mg|content of gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| |id-01f7pqgbwms4ajmdtdedtwa3mf|content of gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| |id-01f7pqqbxejt3ef4ap9qcs78m5|content of gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| |id-01f7pqqbynh895ptpjjfxvk6dc|content of gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| +-+--+---+ Cheers, Enrico Am 10.07.22 um 09:11 schrieb Muthu Jayakumar: Hello there, I have a dataframe with the following... +-+---+---+ |entity_id |file_path |other_useful_id | +-+---+---+ |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
Re: reading each JSON file from dataframe...
Hello Enrico, Thanks for the reply. I found that I would have to use `mapPartitions` API of RDD to perform this safely as I have to 1. Read each file from GCS using HDFS FileSystem API. 2. Parse each JSON record in a safe manner. For (1) to work, I do have to broadcast HadoopConfiguration from sparkContext. I did try to use GCS Java API to read content, but ran into many JAR conflicts as the HDFS wrapper and the JAR library uses different dependencies. Hope this findings helps others as well. Thanks, Muthu On Mon, 11 Jul 2022 at 14:11, Enrico Minack wrote: > All you need to do is implement a method readJson that reads a single > file given its path. Than, you map the values of column file_path to the > respective JSON content as a string. This can be done via an UDF or simply > Dataset.map: > > case class RowWithJsonUri(entity_id: String, file_path: String, > other_useful_id: String) > case class RowWithJsonContent(entity_id: String, json_content: String, > other_useful_id: String) > > val ds = Seq( > RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg", > "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json", > "id-2-01g4he5cb4xqn6s1999k6y1vbd"), > RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf", > "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json", > "id-2-01g4he5cbh52che104rwy603sr"), > RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5", > "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json", > "id-2-01g4he5cbqmdv7dnx46sebs0gt"), > RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc", > "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json", > "id-2-01g4he5cbx1kwhgvdme1s560dw") > ).toDS() > > ds.show(false) > > +-+---+---+ > |entity_id > |file_path > |other_useful_id| > > +-+---+---+ > > |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| > > |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| > > |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| > > |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| > > +-+---+---+ > > > def readJson(uri: String): String = { s"content of $uri" } > > ds.map { row => RowWithJsonContent(row.entity_id, readJson(row.file_path), > row.other_useful_id) }.show(false) > > +-+--+---+ > |entity_id > |json_content > |other_useful_id| > > +-+--+---+ > |id-01f7pqqbxddb3b1an6ntyqx6mg|content of > gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| > |id-01f7pqgbwms4ajmdtdedtwa3mf|content of > gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| > |id-01f7pqqbxejt3ef4ap9qcs78m5|content of > gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| > |id-01f7pqqbynh895ptpjjfxvk6dc|content of > gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw| > > +-+--+---+ > > Cheers, > Enrico > > > > > Am 10.07.22 um 09:11 schrieb Muthu Jayakumar: > > Hello there, > > I have a dataframe with the following... > > > +-+---+---+ > |entity_id|file_path >|other_useful_id| > > +-+---+---+ > > |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd| > > |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr| > > |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt| > >
[Spark][Core] Resource Allocation
I have some problems that I am looking for if there is no solution for them (due to the current implementation) or if there is a way and I was not aware of it. 1) Currently, we can enable and configure dynamic resource allocation based on below documentation. https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation Based on documentation, it is possible to use an initial value of executors at first, and if some tasks are idle, use more executors. Also, if some executors were idle and we didn't have more tasks, executors will be killed (to be used by others). My question is for when we have 2 SparkContext (Separate Applications). In such cases, I expect the dynamic method to work as fairly as possible and distribute resources equally. But what I observe is that if SparkContext 1 uses all of the executors due to having running tasks, it will not release them until it has no more tasks to run and executors become idle. While Spark could avoid executing the new tasks of the SparkContext 1 (because it is not logical to kill the running tasks) and instead make executors free for SparkContext 2, it didn't do so. I do not found any configuration for it. Have I understood correctly? And is there no way to achieve a fair dynamic allocation between contexts? 2) In dynamic or even static resource allocation, Spark must run a series of executors from among the resources in the cluster (workers). The data that exists on the cluster has as little skew and is distributed throughout the cluster. For this reason, it is better for executors to be distributed as much as possible at the cluster in order to benefit from the data locality. But what I observe is that Spark sometimes executes 2 or more executors on a same worker even if there are some idle workers. Is this intentional and there are other reasons for improvement, or is it a better way and not currently supported by Spark?