[Spark][Core] Resource Allocation

2022-07-12 Thread Amin Borjian
I have some problems that I am looking for if there is no solution for them 
(due to the current implementation) or if there is a way and I was not aware of 
it.

1)

Currently, we can enable and configure dynamic resource allocation based on 
below documentation.
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

Based on documentation, it is possible to use an initial value of executors at 
first, and if some tasks are idle, use more executors. Also, if some executors 
were idle and we didn't have more tasks, executors will be killed (to be used 
by others). My question is for when we have 2 SparkContext (Separate 
Applications). In such cases, I expect the dynamic method to work as fairly as 
possible and distribute resources equally. But what I observe is that if 
SparkContext 1 uses all of the executors due to having running tasks, it will 
not release them until it has no more tasks to run and executors become idle. 
While Spark could avoid executing the new tasks of the SparkContext 1 (because 
it is not logical to kill the running tasks) and instead make executors free 
for SparkContext 2, it didn't do so. I do not found any configuration for it. 
Have I understood correctly? And is there no way to achieve a fair dynamic 
allocation between contexts?

2)

In dynamic or even static resource allocation, Spark must run a series of 
executors from among the resources in the cluster (workers). The data that 
exists on the cluster has as little skew and is distributed throughout the 
cluster. For this reason, it is better for executors to be distributed as much 
as possible at the cluster in order to benefit from the data locality. But what 
I observe is that Spark sometimes executes 2 or more executors on a same worker 
even if there are some idle workers. Is this intentional and there are other 
reasons for improvement, or is it a better way and not currently supported by 
Spark?


Re: reading each JSON file from dataframe...

2022-07-12 Thread Muthu Jayakumar
Hello Enrico,

Thanks for the reply. I found that I would have to use `mapPartitions` API
of RDD to perform this safely as I have to
1. Read each file from GCS using HDFS FileSystem API.
2. Parse each JSON record in a safe manner.

For (1) to work, I do have to broadcast HadoopConfiguration from
sparkContext. I did try to use GCS Java API to read content, but ran into
many JAR conflicts as the HDFS wrapper and the JAR library uses different
dependencies.
Hope this findings helps others as well.

Thanks,
Muthu


On Mon, 11 Jul 2022 at 14:11, Enrico Minack  wrote:

> All you need to do is implement a method readJson that reads a single
> file given its path. Than, you map the values of column file_path to the
> respective JSON content as a string. This can be done via an UDF or simply
> Dataset.map:
>
> case class RowWithJsonUri(entity_id: String, file_path: String,
> other_useful_id: String)
> case class RowWithJsonContent(entity_id: String, json_content: String,
> other_useful_id: String)
>
> val ds = Seq(
>   RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
> "id-2-01g4he5cb4xqn6s1999k6y1vbd"),
>   RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
> "id-2-01g4he5cbh52che104rwy603sr"),
>   RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
> "id-2-01g4he5cbqmdv7dnx46sebs0gt"),
>   RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
> "id-2-01g4he5cbx1kwhgvdme1s560dw")
> ).toDS()
>
> ds.show(false)
>
> +-+---+---+
> |entity_id
> |file_path
> |other_useful_id|
>
> +-+---+---+
>
> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>
> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>
> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>
> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>
> +-+---+---+
>
>
> def readJson(uri: String): String = { s"content of $uri" }
>
> ds.map { row => RowWithJsonContent(row.entity_id, readJson(row.file_path),
> row.other_useful_id) }.show(false)
>
> +-+--+---+
> |entity_id
> |json_content
> |other_useful_id|
>
> +-+--+---+
> |id-01f7pqqbxddb3b1an6ntyqx6mg|content of
> gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
> |id-01f7pqgbwms4ajmdtdedtwa3mf|content of
> gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
> |id-01f7pqqbxejt3ef4ap9qcs78m5|content of
> gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
> |id-01f7pqqbynh895ptpjjfxvk6dc|content of
> gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>
> +-+--+---+
>
> Cheers,
> Enrico
>
>
>
>
> Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:
>
> Hello there,
>
> I have a dataframe with the following...
>
>
> +-+---+---+
> |entity_id|file_path
>|other_useful_id|
>
> +-+---+---+
>
> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>
> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>
> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>
> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560d

Re: reading each JSON file from dataframe...

2022-07-12 Thread Enrico Minack

Hi,

how does RDD's mapPartitions make a difference regarding 1. and 2. 
compared to Dataset's mapPartitions / map function?


Enrico


Am 12.07.22 um 22:13 schrieb Muthu Jayakumar:

Hello Enrico,

Thanks for the reply. I found that I would have to use `mapPartitions` 
API of RDD to perform this safely as I have to

1. Read each file from GCS using HDFS FileSystem API.
2. Parse each JSON record in a safe manner.

For (1) to work, I do have to broadcast HadoopConfiguration from 
sparkContext. I did try to use GCS Java API to read content, but ran 
into many JAR conflicts as the HDFS wrapper and the JAR library uses 
different dependencies.

Hope this findings helps others as well.

Thanks,
Muthu


On Mon, 11 Jul 2022 at 14:11, Enrico Minack  
wrote:


All you need to do is implement a method readJson that reads a
single file given its path. Than, you map the values of column
file_path to the respective JSON content as a string. This can be
done via an UDF or simply Dataset.map:

case class RowWithJsonUri(entity_id: String, file_path: String,
other_useful_id: String)
case class RowWithJsonContent(entity_id: String, json_content:
String, other_useful_id: String)

val ds = Seq(
  RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
"gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
"id-2-01g4he5cb4xqn6s1999k6y1vbd"),
  RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
"gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
"id-2-01g4he5cbh52che104rwy603sr"),
  RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
"gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
"id-2-01g4he5cbqmdv7dnx46sebs0gt"),
  RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
"gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
"id-2-01g4he5cbx1kwhgvdme1s560dw")
).toDS()

ds.show(false)

+-+---+---+
|entity_id |file_path |other_useful_id    |

+-+---+---+

|id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|

|id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|

|id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|

|id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|

+-+---+---+


def readJson(uri: String): String = { s"content of $uri" }

ds.map { row => RowWithJsonContent(row.entity_id,
readJson(row.file_path), row.other_useful_id) }.show(false)

+-+--+---+
|entity_id |json_content |other_useful_id    |

+-+--+---+
|id-01f7pqqbxddb3b1an6ntyqx6mg|content of

gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
|id-01f7pqgbwms4ajmdtdedtwa3mf|content of

gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
|id-01f7pqqbxejt3ef4ap9qcs78m5|content of

gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
|id-01f7pqqbynh895ptpjjfxvk6dc|content of

gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|

+-+--+---+

Cheers,
Enrico




Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:

Hello there,

I have a dataframe with the following...


+-+---+---+
|entity_id                    |file_path  |other_useful_id      
         |

+-+---+---+

|id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|

|id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|i

Re: reading each JSON file from dataframe...

2022-07-12 Thread ayan guha
Another option is:

1. collect the dataframe with file path
2. create a list of paths
3. create a new dataframe with spark.read.json and pass the list of path

This will save you lots of headache

Ayan


On Wed, Jul 13, 2022 at 7:35 AM Enrico Minack 
wrote:

> Hi,
>
> how does RDD's mapPartitions make a difference regarding 1. and 2.
> compared to Dataset's mapPartitions / map function?
>
> Enrico
>
>
> Am 12.07.22 um 22:13 schrieb Muthu Jayakumar:
>
> Hello Enrico,
>
> Thanks for the reply. I found that I would have to use `mapPartitions` API
> of RDD to perform this safely as I have to
> 1. Read each file from GCS using HDFS FileSystem API.
> 2. Parse each JSON record in a safe manner.
>
> For (1) to work, I do have to broadcast HadoopConfiguration from
> sparkContext. I did try to use GCS Java API to read content, but ran into
> many JAR conflicts as the HDFS wrapper and the JAR library uses different
> dependencies.
> Hope this findings helps others as well.
>
> Thanks,
> Muthu
>
>
> On Mon, 11 Jul 2022 at 14:11, Enrico Minack 
> wrote:
>
>> All you need to do is implement a method readJson that reads a single
>> file given its path. Than, you map the values of column file_path to the
>> respective JSON content as a string. This can be done via an UDF or simply
>> Dataset.map:
>>
>> case class RowWithJsonUri(entity_id: String, file_path: String,
>> other_useful_id: String)
>> case class RowWithJsonContent(entity_id: String, json_content: String,
>> other_useful_id: String)
>>
>> val ds = Seq(
>>   RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
>> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
>> "id-2-01g4he5cb4xqn6s1999k6y1vbd"),
>>   RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
>> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
>> "id-2-01g4he5cbh52che104rwy603sr"),
>>   RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
>> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
>> "id-2-01g4he5cbqmdv7dnx46sebs0gt"),
>>   RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
>> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
>> "id-2-01g4he5cbx1kwhgvdme1s560dw")
>> ).toDS()
>>
>> ds.show(false)
>>
>> +-+---+---+
>> |entity_id
>> |file_path
>> |other_useful_id|
>>
>> +-+---+---+
>>
>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>
>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>
>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>
>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>
>> +-+---+---+
>>
>>
>> def readJson(uri: String): String = { s"content of $uri" }
>>
>> ds.map { row => RowWithJsonContent(row.entity_id,
>> readJson(row.file_path), row.other_useful_id) }.show(false)
>>
>> +-+--+---+
>> |entity_id
>> |json_content
>> |other_useful_id|
>>
>> +-+--+---+
>> |id-01f7pqqbxddb3b1an6ntyqx6mg|content of
>> gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>> |id-01f7pqgbwms4ajmdtdedtwa3mf|content of
>> gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>> |id-01f7pqqbxejt3ef4ap9qcs78m5|content of
>> gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>> |id-01f7pqqbynh895ptpjjfxvk6dc|content of
>> gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>
>> +-+--+---+
>>
>> Cheers,
>> Enrico
>>
>>
>>
>>
>> Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:
>>
>> Hello there,
>>
>> I have a dataframe with the following...
>>
>>
>> +-+---+---+
>> |entity_id|file_path
>>  |other_useful_id|
>>
>> +-+-

Spark streaming pending mircobatches queue max length

2022-07-12 Thread Anil Dasari
Hello,

Spark is adding entry to pending microbatches queue at periodic batch interval. 
Is there config to set the max size for pending microbatches queue ?

Thanks


Re: reading each JSON file from dataframe...

2022-07-12 Thread Muthu Jayakumar
Hello Ayan,

Thank you for the suggestion. But, I would lose correlation of the JSON
file with the other identifier fields. Also, if there are too many files,
will it be an issue? Plus, I may not have the same schema across all the
files.

Hello Enrico,

>how does RDD's mapPartitions make a difference regarding
I guess, in the question above I do have to process row-wise and RDD may be
more efficient?

Thanks,
Muthu

On Tue, 12 Jul 2022 at 14:55, ayan guha  wrote:

> Another option is:
>
> 1. collect the dataframe with file path
> 2. create a list of paths
> 3. create a new dataframe with spark.read.json and pass the list of path
>
> This will save you lots of headache
>
> Ayan
>
>
> On Wed, Jul 13, 2022 at 7:35 AM Enrico Minack 
> wrote:
>
>> Hi,
>>
>> how does RDD's mapPartitions make a difference regarding 1. and 2.
>> compared to Dataset's mapPartitions / map function?
>>
>> Enrico
>>
>>
>> Am 12.07.22 um 22:13 schrieb Muthu Jayakumar:
>>
>> Hello Enrico,
>>
>> Thanks for the reply. I found that I would have to use `mapPartitions`
>> API of RDD to perform this safely as I have to
>> 1. Read each file from GCS using HDFS FileSystem API.
>> 2. Parse each JSON record in a safe manner.
>>
>> For (1) to work, I do have to broadcast HadoopConfiguration from
>> sparkContext. I did try to use GCS Java API to read content, but ran into
>> many JAR conflicts as the HDFS wrapper and the JAR library uses different
>> dependencies.
>> Hope this findings helps others as well.
>>
>> Thanks,
>> Muthu
>>
>>
>> On Mon, 11 Jul 2022 at 14:11, Enrico Minack 
>> wrote:
>>
>>> All you need to do is implement a method readJson that reads a single
>>> file given its path. Than, you map the values of column file_path to
>>> the respective JSON content as a string. This can be done via an UDF or
>>> simply Dataset.map:
>>>
>>> case class RowWithJsonUri(entity_id: String, file_path: String,
>>> other_useful_id: String)
>>> case class RowWithJsonContent(entity_id: String, json_content: String,
>>> other_useful_id: String)
>>>
>>> val ds = Seq(
>>>   RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
>>> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
>>> "id-2-01g4he5cb4xqn6s1999k6y1vbd"),
>>>   RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
>>> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
>>> "id-2-01g4he5cbh52che104rwy603sr"),
>>>   RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
>>> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
>>> "id-2-01g4he5cbqmdv7dnx46sebs0gt"),
>>>   RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
>>> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
>>> "id-2-01g4he5cbx1kwhgvdme1s560dw")
>>> ).toDS()
>>>
>>> ds.show(false)
>>>
>>> +-+---+---+
>>> |entity_id
>>> |file_path
>>> |other_useful_id|
>>>
>>> +-+---+---+
>>>
>>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>>
>>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>>
>>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>>
>>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>>
>>> +-+---+---+
>>>
>>>
>>> def readJson(uri: String): String = { s"content of $uri" }
>>>
>>> ds.map { row => RowWithJsonContent(row.entity_id,
>>> readJson(row.file_path), row.other_useful_id) }.show(false)
>>>
>>> +-+--+---+
>>> |entity_id
>>> |json_content
>>> |other_useful_id|
>>>
>>> +-+--+---+
>>> |id-01f7pqqbxddb3b1an6ntyqx6mg|content of
>>> gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>> |id-01f7pqgbwms4ajmdtdedtwa3mf|content of
>>> gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>> |id-01f7pqqbxejt3ef4ap9qcs78m5|content of
>>> gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>> |id-01f7pqqbynh895ptpjjfxvk6dc|content of
>>> gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>>
>>> +--

Re: How reading works?

2022-07-12 Thread Sid
Yeah, I understood that now.

Thanks for the explanation, Bjorn.

Sid

On Wed, Jul 6, 2022 at 1:46 AM Bjørn Jørgensen 
wrote:

> Ehh.. What is "*duplicate column*" ? I don't think Spark supports that.
>
> duplicate column = duplicate rows
>
>
> tir. 5. jul. 2022 kl. 22:13 skrev Bjørn Jørgensen <
> bjornjorgen...@gmail.com>:
>
>> "*but I am getting the issue of the duplicate column which was present
>> in the old dataset.*"
>>
>> So you have answered your question!
>>
>> spark.read.option("multiline","true").json("path").filter(
>> col("edl_timestamp")>last_saved_timestamp) As you have figured out,
>> spark read all the json files in "path" then filter.
>>
>> There are some file formats that can have filters before reading files.
>> The one that I know about is Parquet. Like this link explains Spark:
>> Understand the Basic of Pushed Filter and Partition Filter Using Parquet
>> File
>> 
>>
>>
>>
>>
>>
>> tir. 5. jul. 2022 kl. 21:21 skrev Sid :
>>
>>> Hi Team,
>>>
>>> I still need help in understanding how reading works exactly?
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Mon, Jun 20, 2022 at 2:23 PM Sid  wrote:
>>>
 Hi Team,

 Can somebody help?

 Thanks,
 Sid

 On Sun, Jun 19, 2022 at 3:51 PM Sid  wrote:

> Hi,
>
> I already have a partitioned JSON dataset in s3 like the below:
>
> edl_timestamp=2022090800
>
> Now, the problem is, in the earlier 10 days of data collection there
> was a duplicate columns issue due to which we couldn't read the data.
>
> Now the latest 10 days of data are proper. So, I am trying to do
> something like the below:
>
>
> spark.read.option("multiline","true").json("path").filter(col("edl_timestamp")>last_saved_timestamp)
>
> but I am getting the issue of the duplicate column which was present
> in the old dataset. So, I am trying to understand how the spark reads the
> data. Does it full dataset and filter on the basis of the last saved
> timestamp or does it filter only what is required? If the second case is
> true, then it should have read the data since the latest data is correct.
>
> So just trying to understand. Could anyone help here?
>
> Thanks,
> Sid
>
>
>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


How use pattern matching in spark

2022-07-12 Thread Sid
Hi Team,

I have a dataset like the below one in .dat file:

13/07/2022abc
PWJ   PWJABC 513213217ABC GM20 05. 6/20/39
#01000count

Now I want to extract the header and tail records which I was able to do
it. Now, from the header, I need to extract the date and match it with the
current system date. Also, for the tail records, I need to match the number
of actual rows i.e 1 in my case with the values mentioned in the last row.
That is a kind of pattern matching so that I can find '1' in the last row
and say that the actual records and the value in the tail record matches
with each other.

How can I do this? Any links would be helpful. I think regex pattern
matching should help.

Also, I will be getting 3 formats for now i.e CSV, .DAT file and .TXT file.

So, as per me I could do validation for all these 3 file formats using
spark.read.text().rdd and performing intended operations on Rdds. Just the
validation part.

Therefore, wanted to understand is there any better way to achieve this?

Thanks,
Sid