[k8s] Fail to expose custom port on executor container specified in my executor pod template

2023-06-26 Thread James Yu
Hi Team,


I have no luck in trying to expose port 5005 (for remote debugging purpose) on 
my executor container using the following pod template and spark configuration

s3a://mybucket/pod-template-executor-debug.yaml

apiVersion: v1
kind: Pod
spec:
  containers:
  - name: spark-kubernetes-executor
ports:
- containerPort: 5005
  name: debug
  protocol: TCP

--config 
spark.kubernetes.executor.podTemplateFile=s3a://mybucket/pod-template-executor-debug.yaml


The resultant executor container only exposes the default 7079/TCP port, but 
not the 5005/TCP that I wanted it to expose.

It works just fine for the driver container with the similar settings where I 
can see all ports are exposed (5005/TCP, 7078/TCP, 7079/TCP, 4040/TCP) as 
expected.

Did I miss anything, or is this a known bug where executor pod template is not 
respected in terms of the port expose?

Thanks in advance for your help.

James


[Spark-SQL] Dataframe write saveAsTable failed

2023-06-26 Thread Anil Dasari
Hi,

We have upgraded Spark from 2.4.x to 3.3.1 recently and managed table
creation while writing dataframe as saveAsTable failed with below error.

Can not create the managed table(``) The associated
location('hdfs:') already exists.

On high level our code does below before writing dataframe as table:

sparkSession.sql(s"DROP TABLE IF EXISTS $hiveTableName PURGE")
mydataframe.write.mode(SaveMode.Overwrite).saveAsTable(hiveTableName)

The above code works with Spark 2 because of
spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation which is
deprecated in Spark 3.

The table is dropped and purged before writing the dataframe. I expected
dataframe write shouldn't complain that the path already exists.

After digging further, I noticed there is `_tempory` folder present in the
hdfs table path.

dfs -ls /apps/hive/warehouse//
Found 1 items
drwxr-xr-x   - hadoop hdfsadmingroup  0 2023-06-23 04:45
/apps/hive/warehouse//_temporary

[root@ip-10-121-107-90 bin]# hdfs dfs -ls
/apps/hive/warehouse//_temporary
Found 1 items
drwxr-xr-x   - hadoop hdfsadmingroup  0 2023-06-23 04:45
/apps/hive/warehouse//_temporary/0

[root@ip-10-121-107-90 bin]# hdfs dfs -ls
/apps/hive/warehouse//_temporary/0
Found 1 items
drwxr-xr-x   - hadoop hdfsadmingroup  0 2023-06-23 04:45
/apps/hive/warehouse//_temporary/0/_temporary

Is it because of task failures ? Is there a way to workaround this issue ?

Thanks


Re: Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Mich Talebzadeh
OK, good news. You have made some progress here :)

bzip (bzip2) works (splittable) because it is block-oriented whereas gzip
is stream oriented. I also noticed that you are creating a managed ORC
file.  You can bucket and partition an ORC (Optimized Row Columnar file
format. An example below:


DROP TABLE IF EXISTS dummy;

CREATE TABLE dummy (
 ID INT
   , CLUSTERED INT
   , SCATTERED INT
   , RANDOMISED INT
   , RANDOM_STRING VARCHAR(50)
   , SMALL_VC VARCHAR(10)
   , PADDING  VARCHAR(10)
)
CLUSTERED BY (ID) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES (
"orc.create.index"="true",
"orc.bloom.filter.columns"="ID",
"orc.bloom.filter.fpp"="0.05",
"orc.compress"="SNAPPY",
"orc.stripe.size"="16777216",
"orc.row.index.stride"="1" )
;

HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Jun 2023 at 19:35, Patrick Tucci  wrote:

> Hi Mich,
>
> Thanks for the reply. I started running ANALYZE TABLE on the external
> table, but the progress was very slow. The stage had only read about 275MB
> in 10 minutes. That equates to about 5.5 hours just to analyze the table.
>
> This might just be the reality of trying to process a 240m record file
> with 80+ columns, unless there's an obvious issue with my setup that
> someone sees. The solution is likely going to involve increasing
> parallelization.
>
> To that end, I extracted and re-zipped this file in bzip. Since bzip is
> splittable and gzip is not, Spark can process the bzip file in parallel.
> The same CTAS query only took about 45 minutes. This is still a bit slower
> than I had hoped, but the import from bzip fully utilized all available
> cores. So we can give the cluster more resources if we need the process to
> go faster.
>
> Patrick
>
> On Mon, Jun 26, 2023 at 12:52 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> OK for now have you analyzed statistics in Hive external table
>>
>> spark-sql (default)> ANALYZE TABLE test.stg_t2 COMPUTE STATISTICS FOR ALL
>> COLUMNS;
>> spark-sql (default)> DESC EXTENDED test.stg_t2;
>>
>> Hive external tables have little optimization
>>
>> HTH
>>
>>
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 26 Jun 2023 at 16:33, Patrick Tucci 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master
>>> node has 2 cores and 8GB of RAM. There is a single worker node with 8 cores
>>> and 64GB of RAM.
>>>
>>> I'm trying to process a large pipe delimited file that has been
>>> compressed with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85
>>> columns). I uploaded the gzipped file to HDFS and created an external table
>>> using the attached script. I tried two simpler queries on the same table,
>>> and they finished in ~5 and ~10 minutes respectively:
>>>
>>> SELECT COUNT(*) FROM ClaimsImport;
>>> SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;
>>>
>>> However, when I tried to create a table stored as ORC using this table
>>> as the input, the query ran for almost 4 hours:
>>>
>>> CREATE TABLE Claims STORED AS ORC
>>> AS
>>> SELECT *
>>> FROM ClaimsImport
>>> --Exclude the header record
>>> WHERE ClaimID <> 'ClaimID';
>>>
>>> [image: image.png]
>>>
>>> Why is there such a speed disparity between these different operations?
>>> I understand that this job cannot be parallelized because the file is
>>> compressed with gzip. I also understand that creating an ORC table from the
>>> input will take more time than a simple COUNT(*). But it doesn't feel like
>>> the CREATE TABLE operation should take more than 24x longer than a simple
>>> SELECT COUNT(*) statement.
>>>
>>> Thanks for any help. Please let me know if I can provide any additional
>>> information.
>>>
>>> Patrick
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Patrick Tucci
Hi Mich,

Thanks for the reply. I started running ANALYZE TABLE on the external
table, but the progress was very slow. The stage had only read about 275MB
in 10 minutes. That equates to about 5.5 hours just to analyze the table.

This might just be the reality of trying to process a 240m record file with
80+ columns, unless there's an obvious issue with my setup that someone
sees. The solution is likely going to involve increasing parallelization.

To that end, I extracted and re-zipped this file in bzip. Since bzip is
splittable and gzip is not, Spark can process the bzip file in parallel.
The same CTAS query only took about 45 minutes. This is still a bit slower
than I had hoped, but the import from bzip fully utilized all available
cores. So we can give the cluster more resources if we need the process to
go faster.

Patrick

On Mon, Jun 26, 2023 at 12:52 PM Mich Talebzadeh 
wrote:

> OK for now have you analyzed statistics in Hive external table
>
> spark-sql (default)> ANALYZE TABLE test.stg_t2 COMPUTE STATISTICS FOR ALL
> COLUMNS;
> spark-sql (default)> DESC EXTENDED test.stg_t2;
>
> Hive external tables have little optimization
>
> HTH
>
>
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 26 Jun 2023 at 16:33, Patrick Tucci 
> wrote:
>
>> Hello,
>>
>> I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master
>> node has 2 cores and 8GB of RAM. There is a single worker node with 8 cores
>> and 64GB of RAM.
>>
>> I'm trying to process a large pipe delimited file that has been
>> compressed with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85
>> columns). I uploaded the gzipped file to HDFS and created an external table
>> using the attached script. I tried two simpler queries on the same table,
>> and they finished in ~5 and ~10 minutes respectively:
>>
>> SELECT COUNT(*) FROM ClaimsImport;
>> SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;
>>
>> However, when I tried to create a table stored as ORC using this table as
>> the input, the query ran for almost 4 hours:
>>
>> CREATE TABLE Claims STORED AS ORC
>> AS
>> SELECT *
>> FROM ClaimsImport
>> --Exclude the header record
>> WHERE ClaimID <> 'ClaimID';
>>
>> [image: image.png]
>>
>> Why is there such a speed disparity between these different operations? I
>> understand that this job cannot be parallelized because the file is
>> compressed with gzip. I also understand that creating an ORC table from the
>> input will take more time than a simple COUNT(*). But it doesn't feel like
>> the CREATE TABLE operation should take more than 24x longer than a simple
>> SELECT COUNT(*) statement.
>>
>> Thanks for any help. Please let me know if I can provide any additional
>> information.
>>
>> Patrick
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Unable to populate spark metrics using custom metrics API

2023-06-26 Thread Surya Soma
Hello,

I am trying to publish custom metrics using Spark CustomMetric API as
supported since spark 3.2 https://github.com/apache/spark/pull/31476,

https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric/CustomMetric.html

I have created a custom metric implementing `CustomMetic` with default
constructor overriding name and description.
Created a new instance of the created custom metric in the
`supportedCustomMetrics` method of  `spark.sql.connector.read.Scan`.

Created a custom task metric implementing `CustomTaskMetric` with the same
name as that of CustomMetric class and initialized this in
`currentMetricsValues` of PartitionReader.

I have static values as of now but when I run the application, I see in the
spark history page the corresponding value to the metric as N/A.
I have added logs in the `aggregateTaskMetrics` and my flow is going into
it. The spark SQLAppStatusListener.aggregateMetrics is loading my class and
calling the `aggregateTaskMetrics` yet I still see N/A in the spark ui
page.


Driver log:

```

23/06/23 19:23:53 INFO Spark32CustomMetric: Spark32CustomMetric in
aggregateTaskMetrics start
23/06/23 19:23:53 INFO Spark32CustomMetric: Spark32CustomMetric in
aggregateTaskMetrics sum:1234end
+-+--+---+---+
| word|word_count| corpus|corpus_date|
+-+--+---+---+
| LVII| 1|sonnets|  0|
|   augurs| 1|sonnets|  0|
|   dimm'd| 1|sonnets|  0|```


Attaching the Spark UI page screenshot.

Am I missing something? Any help is really appreciated.

Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Mich Talebzadeh
OK for now have you analyzed statistics in Hive external table

spark-sql (default)> ANALYZE TABLE test.stg_t2 COMPUTE STATISTICS FOR ALL
COLUMNS;
spark-sql (default)> DESC EXTENDED test.stg_t2;

Hive external tables have little optimization

HTH



Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Jun 2023 at 16:33, Patrick Tucci  wrote:

> Hello,
>
> I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master
> node has 2 cores and 8GB of RAM. There is a single worker node with 8 cores
> and 64GB of RAM.
>
> I'm trying to process a large pipe delimited file that has been compressed
> with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85 columns). I
> uploaded the gzipped file to HDFS and created an external table using the
> attached script. I tried two simpler queries on the same table, and they
> finished in ~5 and ~10 minutes respectively:
>
> SELECT COUNT(*) FROM ClaimsImport;
> SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;
>
> However, when I tried to create a table stored as ORC using this table as
> the input, the query ran for almost 4 hours:
>
> CREATE TABLE Claims STORED AS ORC
> AS
> SELECT *
> FROM ClaimsImport
> --Exclude the header record
> WHERE ClaimID <> 'ClaimID';
>
> [image: image.png]
>
> Why is there such a speed disparity between these different operations? I
> understand that this job cannot be parallelized because the file is
> compressed with gzip. I also understand that creating an ORC table from the
> input will take more time than a simple COUNT(*). But it doesn't feel like
> the CREATE TABLE operation should take more than 24x longer than a simple
> SELECT COUNT(*) statement.
>
> Thanks for any help. Please let me know if I can provide any additional
> information.
>
> Patrick
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Unsubscribe

2023-06-26 Thread Ghazi Naceur
Unsubscribe


Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Patrick Tucci
Hello,

I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master node
has 2 cores and 8GB of RAM. There is a single worker node with 8 cores and
64GB of RAM.

I'm trying to process a large pipe delimited file that has been compressed
with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85 columns). I
uploaded the gzipped file to HDFS and created an external table using the
attached script. I tried two simpler queries on the same table, and they
finished in ~5 and ~10 minutes respectively:

SELECT COUNT(*) FROM ClaimsImport;
SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;

However, when I tried to create a table stored as ORC using this table as
the input, the query ran for almost 4 hours:

CREATE TABLE Claims STORED AS ORC
AS
SELECT *
FROM ClaimsImport
--Exclude the header record
WHERE ClaimID <> 'ClaimID';

[image: image.png]

Why is there such a speed disparity between these different operations? I
understand that this job cannot be parallelized because the file is
compressed with gzip. I also understand that creating an ORC table from the
input will take more time than a simple COUNT(*). But it doesn't feel like
the CREATE TABLE operation should take more than 24x longer than a simple
SELECT COUNT(*) statement.

Thanks for any help. Please let me know if I can provide any additional
information.

Patrick


Create Table.sql
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: [Spark streaming]: Microbatch id in logs

2023-06-26 Thread Mich Talebzadeh
In SSS
writeStream. \
   outputMode('append'). \
   option("truncate", "false"). \
  * foreachBatch(SendToBigQuery). \*
   option('checkpointLocation', checkpoint_path). \

so this writeStream will call  foreachBatch()

   """
   "foreachBatch" performs custom write logic on each
micro-batch through SendToBigQuery function
foreachBatch(SendToBigQuery) expects 2 parameters, first:*
micro-batch as DataFrame or Dataset and second: unique id for each batch*
   Using foreachBatch, we write each micro batch to storage
defined in our custom logic. In this case, we store the output of our
streaming application to Google BigQuery table

that does this

def SendToBigQuery(df, batchId):
if(len(df.take(1))) > 0:
print(batchId)
# do your logic
else:
print("DataFrame is empty")

You should also have it in

   option('checkpointLocation', checkpoint_path).

See this article on mine
Processing Change Data Capture with Spark Structured Streaming


HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Jun 2023 at 06:01, Anil Dasari  wrote:

> Hi,
> I am using spark 3.3.1 distribution and spark stream in my application. Is
> there a way to add a microbatch id to all logs generated by spark and spark
> applications ?
>
> Thanks.
>