Re: Static partitioning in partitionBy()

2019-05-07 Thread Felix Cheung
You could

df.filter(col(“c”) = “c1”).write().partitionBy(“c”).save

It could get some data skew problem but might work for you




From: Burak Yavuz 
Sent: Tuesday, May 7, 2019 9:35:10 AM
To: Shubham Chaurasia
Cc: dev; u...@spark.apache.org
Subject: Re: Static partitioning in partitionBy()

It depends on the data source. Delta Lake (https://delta.io) allows you to do 
it with the .option("replaceWhere", "c = c1"). With other file formats, you can 
write directly into the partition directory (tablePath/c=c1), but you lose 
atomicity.

On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
mailto:shubh.chaura...@gmail.com>> wrote:
Hi All,

Is there a way I can provide static partitions in partitionBy()?

Like:
df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save

Above code gives following error as it tries to find column `c=c1` in df.

org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found in 
schema struct;

Thanks,
Shubham


Need guidance on Spark Session Termination.

2019-05-07 Thread Nasrulla Khan Haris
Hi fellow Spark-devs,

I am pretty new to spark core and I am looking for some answers to my use case. 
 I have a datasource v2 api connector, In my connector we create temporary 
files on the blob storage. Can you please suggest places where I can look if I 
want to delete the temporary files on storage at the end of Spark Session?

Thanks,
Nasrulla



Re: Hive Hash in Spark

2019-05-07 Thread Bruce Robbins
Mildly off-topic:

>From a *correctness* perspective only, it seems Spark can read bucketed
Hive tables just fine. I am ignoring the fact that Spark doesn't take
advantage of the bucketing.

Is that a fair assessment? Or is it more complicated than that?

Also, Spark has code to prevent an application from accidentally writing to
a bucketed Hive table (except it as a hole
). Except for that hole,
the write case is covered.

Spark apps reading bucketed Hive tables seems to be common, so I hope it
works (as it seems to).


On Thu, Mar 7, 2019 at 12:58 PM  wrote:

> Thanks Ryan and Reynold for the information!
>
>
>
> Cheers,
>
> Tyson
>
>
>
> *From:* Ryan Blue 
> *Sent:* Wednesday, March 6, 2019 3:47 PM
> *To:* Reynold Xin 
> *Cc:* tcon...@gmail.com; Spark Dev List 
> *Subject:* Re: Hive Hash in Spark
>
>
>
> I think this was needed to add support for bucketed Hive tables. Like
> Tyson noted, if the other side of a join can be bucketed the same way, then
> Spark can use a bucketed join. I have long-term plans to support this in
> the DataSourceV2 API, but I don't think we are very close to implementing
> it yet.
>
>
>
> rb
>
>
>
> On Wed, Mar 6, 2019 at 1:57 PM Reynold Xin  wrote:
>
> I think they might be used in bucketing? Not 100% sure.
>
>
>
>
>
> On Wed, Mar 06, 2019 at 1:40 PM,  wrote:
>
> Hi,
>
>
>
> I noticed the existence of a Hive Hash partitioning implementation in
> Spark, but also noticed that it’s not being used, and that the Spark hash
> partitioning function is presently hardcoded to Murmur3. My question is
> whether Hive Hash is dead code or are their future plans to support reading
> and understanding data the has been partitioned using Hive Hash? By
> understanding, I mean that I’m able to avoid a full shuffle join on Table A
> (partitioned by Hive Hash) when joining with a Table B that I can shuffle
> via Hive Hash to Table A.
>
>
>
> Thank you,
>
> Tyson
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>


Re: [VOTE][SPARK-27396] SPIP: Public APIs for extended Columnar Processing Support

2019-05-07 Thread Bobby Evans
I am +!

On Tue, May 7, 2019 at 1:37 PM Thomas graves  wrote:

> Hi everyone,
>
> I'd like to call for another vote on SPARK-27396 - SPIP: Public APIs
> for extended Columnar Processing Support.  The proposal is to extend
> the support to allow for more columnar processing.  We had previous
> vote and discussion threads and have updated the SPIP based on the
> comments to clarify a few things and reduce the scope.
>
> You can find the updated proposal in the jira at:
> https://issues.apache.org/jira/browse/SPARK-27396.
>
> Please vote as early as you can, I will leave the vote open until next
> Monday (May 13th), 2pm CST to give people plenty of time.
>
> [ ] +1: Accept the proposal as an official SPIP
> [ ] +0
> [ ] -1: I don't think this is a good idea because ...
>
> Thanks!
> Tom Graves
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


[VOTE][SPARK-27396] SPIP: Public APIs for extended Columnar Processing Support

2019-05-07 Thread Thomas graves
Hi everyone,

I'd like to call for another vote on SPARK-27396 - SPIP: Public APIs
for extended Columnar Processing Support.  The proposal is to extend
the support to allow for more columnar processing.  We had previous
vote and discussion threads and have updated the SPIP based on the
comments to clarify a few things and reduce the scope.

You can find the updated proposal in the jira at:
https://issues.apache.org/jira/browse/SPARK-27396.

Please vote as early as you can, I will leave the vote open until next
Monday (May 13th), 2pm CST to give people plenty of time.

[ ] +1: Accept the proposal as an official SPIP
[ ] +0
[ ] -1: I don't think this is a good idea because ...

Thanks!
Tom Graves

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Static partitioning in partitionBy()

2019-05-07 Thread Burak Yavuz
It depends on the data source. Delta Lake (https://delta.io) allows you to
do it with the .option("replaceWhere", "c = c1"). With other file formats,
you can write directly into the partition directory (tablePath/c=c1), but
you lose atomicity.

On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
wrote:

> Hi All,
>
> Is there a way I can provide static partitions in partitionBy()?
>
> Like:
> df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save
>
> Above code gives following error as it tries to find column `c=c1` in df.
>
> org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
> in schema struct;
>
> Thanks,
> Shubham
>


Static partitioning in partitionBy()

2019-05-07 Thread Shubham Chaurasia
Hi All,

Is there a way I can provide static partitions in partitionBy()?

Like:
df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save

Above code gives following error as it tries to find column `c=c1` in df.

org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
in schema struct;

Thanks,
Shubham


Re: [METRICS] Metrics names inconsistent between executions

2019-05-07 Thread Stavros Kontopoulos
Hi,

With jmx_exporter  and
Prometheus you can always re-write the metrics patterns on the fly. Btw if
you use Grafana its easy to filter things even without the re-write.
If this is a custom dashboard you can always group metrics based on the
spark.app.id as a prefix, no? Also I think some times its good to know if
some executor
failed and why and report specific execution metrics. For example if you
have skewed data and that caused jvm issues etc.

Stavros
On Mon, May 6, 2019 at 11:29 PM Anton Kirillov 
wrote:

> Hi everyone!
>
> We are currently working on building a unified monitoring/alerting
> solution for Spark and would like to rely on Spark's own metrics to avoid
> divergence from the upstream. One of the challenges is to support metrics
> coming from multiple Spark applications running on a cluster: scheduled
> jobs, long-running streaming applications etc.
>
> Original problem:
> Spark assigns metrics names using *spark.app.id *
> and *spark.executor.id * as a part of them.
> Thus the number of metrics is continuously growing because those IDs are
> unique between executions whereas the metrics themselves report the same
> thing. Another issue which arises here is how to use constantly changing
> metric names in dashboards.
>
> For example, *jvm_heap_used* reported by all Spark instances (components):
> - _driver_jvm_heap_used (Driver)
> - __jvm_heap_used (Executors)
>
> While *spark.app.id * can be overridden with
> *spark.metrics.namespace*, there's no such an option for *spark.executor.id
> * which makes it impossible to build a reusable
> dashboard because (given the uniqueness of IDs) differently named metrics
> are emitted for each execution.
>
> One of the possible solutions would be to make executor metrics names
> follow the driver's metrics name pattern, e.g.:
> - _driver_jvm_heap_used (Driver)
> - _executor_jvm_heap_used (Executors)
>
> and distinguish executors based on tags (tags should be configured in
> metric reporters in this case). Not sure if this could potentially break
> Driver UI though.
>
> I'd really appreciate any feedback on this issue and would be happy to
> create a Jira issue/PR if this change looks sane for the community.
>
> Thanks in advance.
>
> --
> *Anton Kirillov*
> Senior Software Engineer, Mesosphere
>