Optimizing LIMIT in DSv2
Hello, Executing "SELECT Muon_Pt FROM rootDF LIMIT 10", where "rootDF" is a temp view backed by a DSv2 reader yields the attached plan [1]. It appears that the initial stage is run over every partition in rootDF, even though each partition has 200k rows (modulo the last partition which holds the remainder of rows in a file). Is there some sort of hinting that can done from the datasource side to better inform the optimizer or, alternately, am I missing an interface in the PushDown filters that would let me elide transferring/decompressing unnecessary partitions? Thanks! Andrew [1] == Parsed Logical Plan == 'GlobalLimit 10 +- 'LocalLimit 10 +- 'Project ['Muon_pt] +- 'UnresolvedRelation `rootDF` == Analyzed Logical Plan == Muon_pt: array GlobalLimit 10 +- LocalLimit 10 +- Project [Muon_pt#119] +- SubqueryAlias `rootdf` +- RelationV2 root[run#0L, luminosityBlock#1L, event#2L, CaloMET_phi#3, CaloMET_pt#4, CaloMET_sumEt#5, nElectron#6, Electron_deltaEtaSC#7, Electron_dr03EcalRecHitSumEt#8, Electron_dr03HcalDepth1TowerSumEt#9, Electron_dr03TkSumPt#10, Electron_dxy#11, Electron_dxyErr#12, Electron_dz#13, Electron_dzErr#14, Electron_eCorr#15, Electron_eInvMinusPInv#16, Electron_energyErr#17, Electron_eta#18, Electron_hoe#19, Electron_ip3d#20, Electron_mass#21, Electron_miniPFRelIso_all#22, Electron_miniPFRelIso_chg#23, ... 787 more fields] (Options: [tree=Events,paths=["hdfs://cmshdfs/store/data"]]) == Optimized Logical Plan == GlobalLimit 10 +- LocalLimit 10 +- Project [Muon_pt#119] +- RelationV2 root[run#0L, luminosityBlock#1L, event#2L, CaloMET_phi#3, CaloMET_pt#4, CaloMET_sumEt#5, nElectron#6, Electron_deltaEtaSC#7, Electron_dr03EcalRecHitSumEt#8, Electron_dr03HcalDepth1TowerSumEt#9, Electron_dr03TkSumPt#10, Electron_dxy#11, Electron_dxyErr#12, Electron_dz#13, Electron_dzErr#14, Electron_eCorr#15, Electron_eInvMinusPInv#16, Electron_energyErr#17, Electron_eta#18, Electron_hoe#19, Electron_ip3d#20, Electron_mass#21, Electron_miniPFRelIso_all#22, Electron_miniPFRelIso_chg#23, ... 787 more fields] (Options: [tree=Events,paths=["hdfs://cmshdfs/store/data"]]) == Physical Plan == CollectLimit 10 +- *(1) Project [Muon_pt#119] +- *(1) ScanV2 root[Muon_pt#119] (Options: [tree=Events,paths=["hdfs://cmshdfs/store/data"]])
Re: Data Source - State (SPARK-28190)
Hi Bryan, Thanks for the interest! Unfortunately there's lack of support on committers for SPARK-28190 (I have been struggling with lack of support on structured streaming contributions). I hope things will get better, but in the meantime, could you please try out my own project instead? https://github.com/HeartSaVioR/spark-state-tools It's not super convenient to use as of now, as structured streaming doesn't store schema for state. The schema should be provided manually, or from actual query. The improvement is being proposed via SPARK-27237 but this is also no activity right now due to lack of support as well. Thanks, Jungtaek Lim (HeartSaVioR) On Tue, Mar 31, 2020 at 4:50 AM Bryan Jeffrey wrote: > Hi, Jungtaek. > > We've been investigating the use of Spark Structured Streaming to replace > our Spark Streaming operations. We have several cases where we're using > mapWithState to maintain state across batches, often with high volumes of > data. We took a look at the Structured Streaming stateful processing. > Structured Streaming state processing looks great, but has some > shortcomings: > 1. State can only be hydrated from checkpoint, which means that > modification of the state is not possible. > 2. You cannot cleanup or normalize state data after it has been processed. > > These shortcomings appear to be potentially addressed by your > ticket SPARK-28190 - "Data Source - State". I see little activity on this > ticket. Can you help me to understand where this feature currently stands? > > Thank you, > > Bryan Jeffrey >
[Spark SQL]: How to deserailize column of ArrayType to java.util.List
Hello Apache Spark Support Team, I am writing Spark on Java now. I use Dataset API and I face with an issue, that I am doing something like that public Dataset> groupByKey(Dataset> consumers, Class kClass) { consumers.groupBy("_1").agg(collect_list(col("_2"))).printSchema(); return consumers.groupBy("_1").agg(collect_list(col("_2"))).as(Encoders.tuple(Encoders.bean(kClass), Encoders.bean(List.class))); } And I faced the issue that I can not deserialize collect_list part. https://spark.apache.org/docs/latest/sql-reference.html#data-types - mapping ArrayType to java.util.List Could you please give me any suggestions, wasted too much time trying to fix it? Best Regards, Dmytro
[no subject]
Hello Apache Spark Support Team, I am writing Spark on Java now. I use Dataset API and I face with an issue, that I am doing something like that public Dataset> groupByKey(Dataset> consumers, Class kClass) { consumers.groupBy("_1").agg(collect_list(col("_2"))).printSchema(); return consumers.groupBy("_1").agg(collect_list(col("_2"))).as(Encoders.tuple(Encoders.bean(kClass), Encoders.bean(List.class))); } And I faced the issue that I can not deserialize collect_list part. https://spark.apache.org/docs/latest/sql-reference.html#data-types - mapping ArrayType to java.util.List Could you please give me any suggestions, wasted too much time trying to fix it? Best Regards, Dmytro
Data Source - State (SPARK-28190)
Hi, Jungtaek. We've been investigating the use of Spark Structured Streaming to replace our Spark Streaming operations. We have several cases where we're using mapWithState to maintain state across batches, often with high volumes of data. We took a look at the Structured Streaming stateful processing. Structured Streaming state processing looks great, but has some shortcomings: 1. State can only be hydrated from checkpoint, which means that modification of the state is not possible. 2. You cannot cleanup or normalize state data after it has been processed. These shortcomings appear to be potentially addressed by your ticket SPARK-28190 - "Data Source - State". I see little activity on this ticket. Can you help me to understand where this feature currently stands? Thank you, Bryan Jeffrey
Building Spark + hadoop docker for openshift
Hello, I'm trying to build a spark+hadoop docker image compatible with Openshift. I've used oshinko Spark build script here https://github.com/radanalyticsio/openshift-spark to build something with Hadoop jar in classpath to allow usage of S3 storage. However I'm now stuk on the spark entrypoint.sh script. For reasons unknown, this script kubernetes/dockerfiles/spark/entrypoint.sh contains a reference to SPARK_JAVA_OPS which seems deprecated since 2.2 https://issues.apache.org/jira/browse/SPARK-24577 I'm using spark 2.4.5 and try to integrate hadoop 2.9.2, so far the image build but I fail all the time at submit with an error in entrypoint script. Is any of you manage to use spark-submit on K8S and how and is this entrypoint.sh file is relevant ? here's my spark-submit option: ./spark-submit \ --master k8s://https://wok.in2p3.fr \ --deploy-mode cluster \ --conf "spark.kubernetes.driverEnv.SPARK_DRIVER_CLASS=SparkPi" \ --conf "spark.kubernetes.driverEnv.SPARK_DRIVER_MEMORY=1024m" \ --conf "spark.kubernetes.driverEnv.SPARK_EXECUTOR_CORES=2" \ --conf "spark.kubernetes.driverEnv.SPARK_EXECUTOR_MEMORY=2048g" \ --name "test_$(date +'%m-%d-%y_%H:%m')" \ --conf "spark.kubernetes.container.image=private.repo/spark-docker:latest" \ --conf "spark.kubernetes.container.image.pullPolicy=Always" \ --conf "spark.kubernetes.container.image.pullSecrets=mysecret" \ --conf "spark.kubernetes.namespace=spark2" \ --conf "spark.executor.instances=4" \ --class SparkPi "local:///opt/jar/sparkpi_2.10-1.0.jar" 10 of course /opt/jar/sparkpi_2.10-1.0.jar is part of my docker build. Thank you in advance. Antoine DUBOIS CCIN2P3 smime.p7s Description: S/MIME Cryptographic Signature