Optimizing LIMIT in DSv2

2020-03-30 Thread Andrew Melo
Hello,

Executing "SELECT Muon_Pt FROM rootDF LIMIT 10", where "rootDF" is a temp
view backed by a DSv2 reader yields the attached plan [1]. It appears that
the initial stage is run over every partition in rootDF, even though each
partition has 200k rows (modulo the last partition which holds the
remainder of rows in a file).

Is there some sort of hinting that can done from the datasource side to
better inform the optimizer or, alternately, am I missing an interface in
the PushDown filters that would let me elide transferring/decompressing
unnecessary partitions?

Thanks!
Andrew

[1]

== Parsed Logical Plan ==
'GlobalLimit 10
+- 'LocalLimit 10
   +- 'Project ['Muon_pt]
  +- 'UnresolvedRelation `rootDF`

== Analyzed Logical Plan ==
Muon_pt: array
GlobalLimit 10
+- LocalLimit 10
   +- Project [Muon_pt#119]
  +- SubqueryAlias `rootdf`
 +- RelationV2 root[run#0L, luminosityBlock#1L, event#2L,
CaloMET_phi#3, CaloMET_pt#4, CaloMET_sumEt#5, nElectron#6,
Electron_deltaEtaSC#7, Electron_dr03EcalRecHitSumEt#8,
Electron_dr03HcalDepth1TowerSumEt#9, Electron_dr03TkSumPt#10,
Electron_dxy#11, Electron_dxyErr#12, Electron_dz#13, Electron_dzErr#14,
Electron_eCorr#15, Electron_eInvMinusPInv#16, Electron_energyErr#17,
Electron_eta#18, Electron_hoe#19, Electron_ip3d#20, Electron_mass#21,
Electron_miniPFRelIso_all#22, Electron_miniPFRelIso_chg#23, ... 787 more
fields] (Options: [tree=Events,paths=["hdfs://cmshdfs/store/data"]])

== Optimized Logical Plan ==
GlobalLimit 10
+- LocalLimit 10
   +- Project [Muon_pt#119]
  +- RelationV2 root[run#0L, luminosityBlock#1L, event#2L,
CaloMET_phi#3, CaloMET_pt#4, CaloMET_sumEt#5, nElectron#6,
Electron_deltaEtaSC#7, Electron_dr03EcalRecHitSumEt#8,
Electron_dr03HcalDepth1TowerSumEt#9, Electron_dr03TkSumPt#10,
Electron_dxy#11, Electron_dxyErr#12, Electron_dz#13, Electron_dzErr#14,
Electron_eCorr#15, Electron_eInvMinusPInv#16, Electron_energyErr#17,
Electron_eta#18, Electron_hoe#19, Electron_ip3d#20, Electron_mass#21,
Electron_miniPFRelIso_all#22, Electron_miniPFRelIso_chg#23, ... 787 more
fields] (Options: [tree=Events,paths=["hdfs://cmshdfs/store/data"]])

== Physical Plan ==
CollectLimit 10
+- *(1) Project [Muon_pt#119]
   +- *(1) ScanV2 root[Muon_pt#119] (Options:
[tree=Events,paths=["hdfs://cmshdfs/store/data"]])


Re: Data Source - State (SPARK-28190)

2020-03-30 Thread Jungtaek Lim
Hi Bryan,

Thanks for the interest! Unfortunately there's lack of support on
committers for SPARK-28190 (I have been struggling with lack of support on
structured streaming contributions). I hope things will get better, but in
the meantime, could you please try out my own project instead?

https://github.com/HeartSaVioR/spark-state-tools

It's not super convenient to use as of now, as structured streaming doesn't
store schema for state. The schema should be provided manually, or from
actual query. The improvement is being proposed via SPARK-27237 but this is
also no activity right now due to lack of support as well.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Tue, Mar 31, 2020 at 4:50 AM Bryan Jeffrey 
wrote:

> Hi, Jungtaek.
>
> We've been investigating the use of Spark Structured Streaming to replace
> our Spark Streaming operations.  We have several cases where we're using
> mapWithState to maintain state across batches, often with high volumes of
> data.  We took a look at the Structured Streaming stateful processing.
> Structured Streaming state processing looks great, but has some
> shortcomings:
> 1. State can only be hydrated from checkpoint, which means that
> modification of the state is not possible.
> 2. You cannot cleanup or normalize state data after it has been processed.
>
> These shortcomings appear to be potentially addressed by your
> ticket SPARK-28190 - "Data Source - State".  I see little activity on this
> ticket. Can you help me to understand where this feature currently stands?
>
> Thank you,
>
> Bryan Jeffrey
>


[Spark SQL]: How to deserailize column of ArrayType to java.util.List

2020-03-30 Thread Dima Pavlyshyn
Hello Apache Spark Support Team,
I am writing Spark on Java now. I use Dataset API and I face with an issue,
that  I am doing something like that

public  Dataset> groupByKey(Dataset> consumers, Class kClass) {

consumers.groupBy("_1").agg(collect_list(col("_2"))).printSchema();
return 
consumers.groupBy("_1").agg(collect_list(col("_2"))).as(Encoders.tuple(Encoders.bean(kClass),
Encoders.bean(List.class)));
}

And I faced the issue that I can not deserialize collect_list part.
https://spark.apache.org/docs/latest/sql-reference.html#data-types  -
mapping ArrayType to java.util.List
Could you please give me any suggestions, wasted too much time trying to
fix it?
Best Regards,
Dmytro


[no subject]

2020-03-30 Thread Dima Pavlyshyn
Hello Apache Spark Support Team,
I am writing Spark on Java now. I use Dataset API and I face with an issue,
that  I am doing something like that

public  Dataset> groupByKey(Dataset> consumers, Class kClass) {

consumers.groupBy("_1").agg(collect_list(col("_2"))).printSchema();
return 
consumers.groupBy("_1").agg(collect_list(col("_2"))).as(Encoders.tuple(Encoders.bean(kClass),
Encoders.bean(List.class)));
}

And I faced the issue that I can not deserialize collect_list part.
https://spark.apache.org/docs/latest/sql-reference.html#data-types  -
mapping ArrayType to java.util.List
Could you please give me any suggestions, wasted too much time trying to
fix it?
Best Regards,
Dmytro


Data Source - State (SPARK-28190)

2020-03-30 Thread Bryan Jeffrey
Hi, Jungtaek.

We've been investigating the use of Spark Structured Streaming to replace
our Spark Streaming operations.  We have several cases where we're using
mapWithState to maintain state across batches, often with high volumes of
data.  We took a look at the Structured Streaming stateful processing.
Structured Streaming state processing looks great, but has some
shortcomings:
1. State can only be hydrated from checkpoint, which means that
modification of the state is not possible.
2. You cannot cleanup or normalize state data after it has been processed.

These shortcomings appear to be potentially addressed by your
ticket SPARK-28190 - "Data Source - State".  I see little activity on this
ticket. Can you help me to understand where this feature currently stands?

Thank you,

Bryan Jeffrey


Building Spark + hadoop docker for openshift

2020-03-30 Thread Antoine DUBOIS
Hello, 
I'm trying to build a spark+hadoop docker image compatible with Openshift. 
I've used oshinko Spark build script here 
https://github.com/radanalyticsio/openshift-spark 
to build something with Hadoop jar in classpath to allow usage of S3 storage. 
However I'm now stuk on the spark entrypoint.sh script. 
For reasons unknown, this script kubernetes/dockerfiles/spark/entrypoint.sh 
contains a reference to SPARK_JAVA_OPS which seems deprecated since 2.2 
https://issues.apache.org/jira/browse/SPARK-24577 
I'm using spark 2.4.5 and try to integrate hadoop 2.9.2, so far the image build 
but I fail all the time at submit with an error in entrypoint script. 

Is any of you manage to use spark-submit on K8S and how and is this 
entrypoint.sh file is relevant ? 
here's my spark-submit option: 
./spark-submit \ 
--master k8s://https://wok.in2p3.fr \ 
--deploy-mode cluster \ 
--conf "spark.kubernetes.driverEnv.SPARK_DRIVER_CLASS=SparkPi" \ 
--conf "spark.kubernetes.driverEnv.SPARK_DRIVER_MEMORY=1024m" \ 
--conf "spark.kubernetes.driverEnv.SPARK_EXECUTOR_CORES=2" \ 
--conf "spark.kubernetes.driverEnv.SPARK_EXECUTOR_MEMORY=2048g" \ 
--name "test_$(date +'%m-%d-%y_%H:%m')" \ 
--conf "spark.kubernetes.container.image=private.repo/spark-docker:latest" \ 
--conf "spark.kubernetes.container.image.pullPolicy=Always" \ 
--conf "spark.kubernetes.container.image.pullSecrets=mysecret" \ 
--conf "spark.kubernetes.namespace=spark2" \ 
--conf "spark.executor.instances=4" \ 
--class SparkPi "local:///opt/jar/sparkpi_2.10-1.0.jar" 10 

of course /opt/jar/sparkpi_2.10-1.0.jar is part of my docker build. 

Thank you in advance. 


Antoine DUBOIS 
CCIN2P3 


smime.p7s
Description: S/MIME Cryptographic Signature