[ https://issues.apache.org/jira/browse/SPARK-30926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bozhidar Karaargirov updated SPARK-30926: ----------------------------------------- Description: SO I played around with a data set from here: [https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset] I ran the same query for the base CSVs and against a parquet version of them: {color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color} Here is the csv code: {color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._ {color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color}) df.createTempView({color:#008000}"airQuality"{color}) {color:#000080}val {color}result = {color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality WHERE P1 > 20"{color}) .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color}) println(result.count()) Here is the parquet code: {color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._ {color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}).parquet({color:#660e7a}bigParquetDataset{color}) df.createTempView({color:#008000}"airQualityP"{color}) {color:#000080}val {color}result = {color:#660e7a}session{color} .sql({color:#008000}"SELECT * FROM airQualityP WHERE P1 > 20"{color}) .map(ParticleAirQuality.{color:#660e7a}namedMappingFunction{color}) println(result.count()) And this is how I transform the csv into parquets: {color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._ {color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}) .csv({color:#660e7a}originalDataset{color}) .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color}) df.write.parquet({color:#660e7a}bigParquetDataset{color}) These are the two mapping functions: {color:#000080}val {color}{color:#660e7a}mappingFunction {color}= { r: Row => ParticleAirQuality( r.getString({color:#0000ff}1{color}), r.getString({color:#0000ff}2{color}), r.getString({color:#0000ff}3{color}), r.getString({color:#0000ff}4{color}), r.getString({color:#0000ff}5{color}), { {color:#000080}val {color}p1 = r.getString({color:#0000ff}6{color}) {color:#000080}if{color}(p1 == {color:#000080}null{color}) Double.{color:#660e7a}NaN{color} {color:#000080}else {color}p1.toDouble }, { {color:#000080}val {color}p2 = r.getString({color:#0000ff}7{color}) {color:#000080}if{color}(p2 == {color:#000080}null{color}) Double.{color:#660e7a}NaN{color} {color:#000080}else {color}p2.toDouble } ) } {color:#000080}val {color}{color:#660e7a}namedMappingFunction {color}= { r: Row => ParticleAirQuality( r.getAs[{color:#20999d}String{color}]({color:#008000}"sensor_id"{color}), r.getAs[{color:#20999d}String{color}]({color:#008000}"location"{color}), r.getAs[{color:#20999d}String{color}]({color:#008000}"lat"{color}), r.getAs[{color:#20999d}String{color}]({color:#008000}"lon"{color}), r.getAs[{color:#20999d}String{color}]({color:#008000}"timestamp"{color}), r.getAs[Double]({color:#008000}"P1"{color}), r.getAs[Double]({color:#008000}"P2"{color}) ) } If it matters this is the paths (Note that I actually use double \ instead of / since it is windows - but that doesn't really matter): {color:#000080}val {color}{color:#660e7a}originalDataset {color}= {color:#008000}"D:/source/datasets/sofia-air-quality-dataset/*{color}{color:#008000}*sds**.csv"{color} {color:#000080}val {color}{color:#660e7a}bigParquetDataset {color}= {color:#008000}"D:/source/datasets/air-tests/all-parquet"{color} The count from the csvs I get is: 33934609 While the count from the parquets is: 35739394 was: SO I played around with a data set from here: [https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset] I ran the same query for the base CSVs and against a parquet version of them: {color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color} Here is the csv code: {color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._ {color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color}) df.createTempView({color:#008000}"airQuality"{color}) {color:#000080}val {color}result = {color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality WHERE P1 > 20"{color}) .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color}) println(result.count()) Here is the parquet code: {color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._ {color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}).parquet({color:#660e7a}bigParquetDataset{color}) df.createTempView({color:#008000}"airQualityP"{color}) {color:#000080}val {color}result = {color:#660e7a}session{color} .sql({color:#008000}"SELECT * FROM airQualityP WHERE P1 > 20"{color}) .map(ParticleAirQuality.{color:#660e7a}namedMappingFunction{color}) println(result.count()) And this is how I transform the csv into parquets: {color:#000080}import {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._ {color:#000080}val {color}df = {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, {color:#008000}"true"{color}) .csv({color:#660e7a}originalDataset{color}) .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color}) df.write.parquet({color:#660e7a}bigParquetDataset{color}) These are the two mapping functions: {color:#000080}val {color}{color:#660e7a}mappingFunction {color}= { r: Row => ParticleAirQuality( r.getString({color:#0000ff}1{color}), r.getString({color:#0000ff}2{color}), r.getString({color:#0000ff}3{color}), r.getString({color:#0000ff}4{color}), r.getString({color:#0000ff}5{color}), { {color:#000080}val {color}p1 = r.getString({color:#0000ff}6{color}) {color:#000080}if{color}(p1 == {color:#000080}null{color}) Double.{color:#660e7a}NaN{color} {color:#000080}else {color}p1.toDouble }, { {color:#000080}val {color}p2 = r.getString({color:#0000ff}7{color}) {color:#000080}if{color}(p2 == {color:#000080}null{color}) Double.{color:#660e7a}NaN{color} {color:#000080}else {color}p2.toDouble } ) } {color:#000080}val {color}{color:#660e7a}namedMappingFunction {color}= { r: Row => ParticleAirQuality( r.getAs[{color:#20999d}String{color}]({color:#008000}"sensor_id"{color}), r.getAs[{color:#20999d}String{color}]({color:#008000}"location"{color}), r.getAs[{color:#20999d}String{color}]({color:#008000}"lat"{color}), r.getAs[{color:#20999d}String{color}]({color:#008000}"lon"{color}), r.getAs[{color:#20999d}String{color}]({color:#008000}"timestamp"{color}), r.getAs[Double]({color:#008000}"P1"{color}), r.getAs[Double]({color:#008000}"P2"{color}) ) } If it matters this is the paths (Note that I actually use \\ instead of / since it is windows - but that doesn't really matter): {color:#000080}val {color}{color:#660e7a}originalDataset {color}= {color:#008000}"D:/source/datasets/sofia-air-quality-dataset/*{color}{color:#008000}*sds**.csv"{color} {color:#000080}val {color}{color:#660e7a}bigParquetDataset {color}= {color:#008000}"D:/source/datasets/air-tests/all-parquet"{color} The count from the csvs I get is: 33934609 While the count from the parquets is: 35739394 > Same SQL on CSV and on Parquet gives different result > ----------------------------------------------------- > > Key: SPARK-30926 > URL: https://issues.apache.org/jira/browse/SPARK-30926 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.4 > Environment: I run this locally on a windows 10 machine. > The java runtime is: > {color:#cccccc}openjdk 11.0.5 2019-10-15 > OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.5+10) > OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.5+10, mixed mode){color} > Reporter: Bozhidar Karaargirov > Priority: Major > > SO I played around with a data set from here: > [https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset] > I ran the same query for the base CSVs and against a parquet version of them: > {color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color} > Here is the csv code: > {color:#000080}import > {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._ > {color:#000080}val {color}df = > {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, > {color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color}) > df.createTempView({color:#008000}"airQuality"{color}) > {color:#000080}val {color}result = > {color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality > WHERE P1 > 20"{color}) > .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color}) > println(result.count()) > > Here is the parquet code: > > {color:#000080}import > {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._ > {color:#000080}val {color}df = > {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, > {color:#008000}"true"{color}).parquet({color:#660e7a}bigParquetDataset{color}) > df.createTempView({color:#008000}"airQualityP"{color}) > {color:#000080}val {color}result = {color:#660e7a}session{color} > .sql({color:#008000}"SELECT * FROM airQualityP WHERE P1 > 20"{color}) > .map(ParticleAirQuality.{color:#660e7a}namedMappingFunction{color}) > println(result.count()) > > And this is how I transform the csv into parquets: > {color:#000080}import > {color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._ > {color:#000080}val {color}df = > {color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, > {color:#008000}"true"{color}) > .csv({color:#660e7a}originalDataset{color}) > .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color}) > df.write.parquet({color:#660e7a}bigParquetDataset{color}) > > These are the two mapping functions: > {color:#000080}val {color}{color:#660e7a}mappingFunction {color}= { > r: Row => ParticleAirQuality( > r.getString({color:#0000ff}1{color}), > r.getString({color:#0000ff}2{color}), > r.getString({color:#0000ff}3{color}), > r.getString({color:#0000ff}4{color}), > r.getString({color:#0000ff}5{color}), > { > {color:#000080}val {color}p1 = r.getString({color:#0000ff}6{color}) > {color:#000080}if{color}(p1 == {color:#000080}null{color}) > Double.{color:#660e7a}NaN{color} {color:#000080}else {color}p1.toDouble > }, > { > {color:#000080}val {color}p2 = r.getString({color:#0000ff}7{color}) > {color:#000080}if{color}(p2 == {color:#000080}null{color}) > Double.{color:#660e7a}NaN{color} {color:#000080}else {color}p2.toDouble > } > ) } > {color:#000080}val {color}{color:#660e7a}namedMappingFunction {color}= { > r: Row => ParticleAirQuality( > r.getAs[{color:#20999d}String{color}]({color:#008000}"sensor_id"{color}), > r.getAs[{color:#20999d}String{color}]({color:#008000}"location"{color}), > r.getAs[{color:#20999d}String{color}]({color:#008000}"lat"{color}), > r.getAs[{color:#20999d}String{color}]({color:#008000}"lon"{color}), > r.getAs[{color:#20999d}String{color}]({color:#008000}"timestamp"{color}), > r.getAs[Double]({color:#008000}"P1"{color}), > r.getAs[Double]({color:#008000}"P2"{color}) > ) > } > > If it matters this is the paths (Note that I actually use double \ instead of > / since it is windows - but that doesn't really matter): > {color:#000080}val {color}{color:#660e7a}originalDataset {color}= > {color:#008000}"D:/source/datasets/sofia-air-quality-dataset/*{color}{color:#008000}*sds**.csv"{color} > {color:#000080}val {color}{color:#660e7a}bigParquetDataset {color}= > {color:#008000}"D:/source/datasets/air-tests/all-parquet"{color} > > The count from the csvs I get is: 33934609 > While the count from the parquets is: 35739394 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org