[jira] [Updated] (SPARK-30926) Same SQL on CSV and on Parquet gives different result

2020-02-23 Thread Bozhidar Karaargirov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bozhidar Karaargirov updated SPARK-30926:
-
Description: 
SO I played around with a data set from here: 
[https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]

I ran the same query for the base CSVs and against a parquet version of them:

{color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}

Here is the csv code:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})

df.createTempView({color:#008000}"airQuality"{color})

{color:#80}val {color}result = 
{color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality 
WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

println(result.count())

 

Here is the parquet code:

 

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).parquet({color:#660e7a}bigParquetDataset{color})

df.createTempView({color:#008000}"airQualityP"{color})

{color:#80}val {color}result = {color:#660e7a}session{color} 
.sql({color:#008000}"SELECT * FROM airQualityP WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}namedMappingFunction{color})

println(result.count())

 

And this is how I transform the csv into parquets:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color})
 .csv({color:#660e7a}originalDataset{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

df.write.parquet({color:#660e7a}bigParquetDataset{color})

 

These are the two mapping functions:

{color:#80}val {color}{color:#660e7a}mappingFunction {color}= {
 r: Row => ParticleAirQuality(
 r.getString({color:#ff}1{color}),
 r.getString({color:#ff}2{color}),
 r.getString({color:#ff}3{color}),
 r.getString({color:#ff}4{color}),
 r.getString({color:#ff}5{color}),
 {
 {color:#80}val {color}p1 = r.getString({color:#ff}6{color})
 {color:#80}if{color}(p1 == {color:#80}null{color}) 
Double.{color:#660e7a}NaN{color} {color:#80}else {color}p1.toDouble
 },
 {
 {color:#80}val {color}p2 = r.getString({color:#ff}7{color})
 {color:#80}if{color}(p2 == {color:#80}null{color}) 
Double.{color:#660e7a}NaN{color} {color:#80}else {color}p2.toDouble
 }
 ) }

{color:#80}val {color}{color:#660e7a}namedMappingFunction {color}= {
 r: Row => ParticleAirQuality(
 r.getAs[{color:#20999d}String{color}]({color:#008000}"sensor_id"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"location"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"lat"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"lon"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"timestamp"{color}),
 r.getAs[Double]({color:#008000}"P1"{color}),
 r.getAs[Double]({color:#008000}"P2"{color})
 )
 }

 

If it matters this is the paths (Note that I actually use double \ instead of / 
since it is windows - but that doesn't really matter):

{color:#80}val {color}{color:#660e7a}originalDataset {color}= 
{color:#008000}"D:/source/datasets/sofia-air-quality-dataset/*{color}{color:#008000}*sds**.csv"{color}

{color:#80}val {color}{color:#660e7a}bigParquetDataset {color}= 
{color:#008000}"D:/source/datasets/air-tests/all-parquet"{color}

 

The count from the csvs I get is: 33934609

While the count from the parquets is: 35739394

 

 

  was:
SO I played around with a data set from here: 
[https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]

I ran the same query for the base CSVs and against a parquet version of them:

{color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}

Here is the csv code:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})

df.createTempView({color:#008000}"airQuality"{color})

{color:#80}val {color}result = 
{color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality 
WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

println(result.count())

 

Here is the parquet code:

 

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.i

[jira] [Updated] (SPARK-30926) Same SQL on CSV and on Parquet gives different result

2020-02-23 Thread Bozhidar Karaargirov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bozhidar Karaargirov updated SPARK-30926:
-
Description: 
SO I played around with a data set from here: 
[https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]

I ran the same query for the base CSVs and against a parquet version of them:

{color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}

Here is the csv code:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})

df.createTempView({color:#008000}"airQuality"{color})

{color:#80}val {color}result = 
{color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality 
WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

println(result.count())

 

Here is the parquet code:

 

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).parquet({color:#660e7a}bigParquetDataset{color})

df.createTempView({color:#008000}"airQualityP"{color})

{color:#80}val {color}result = {color:#660e7a}session{color} 
.sql({color:#008000}"SELECT * FROM airQualityP WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}namedMappingFunction{color})

println(result.count())

 

And this is how I transform the csv into parquets:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color})
 .csv({color:#660e7a}originalDataset{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

df.write.parquet({color:#660e7a}bigParquetDataset{color})

 

These are the two mapping functions:

{color:#80}val {color}{color:#660e7a}mappingFunction {color}= {
 r: Row => ParticleAirQuality(
 r.getString({color:#ff}1{color}),
 r.getString({color:#ff}2{color}),
 r.getString({color:#ff}3{color}),
 r.getString({color:#ff}4{color}),
 r.getString({color:#ff}5{color}),
 {
 {color:#80}val {color}p1 = r.getString({color:#ff}6{color})
 {color:#80}if{color}(p1 == {color:#80}null{color}) 
Double.{color:#660e7a}NaN{color} {color:#80}else {color}p1.toDouble
 },
 {
 {color:#80}val {color}p2 = r.getString({color:#ff}7{color})
 {color:#80}if{color}(p2 == {color:#80}null{color}) 
Double.{color:#660e7a}NaN{color} {color:#80}else {color}p2.toDouble
 }
 ) }

{color:#80}val {color}{color:#660e7a}namedMappingFunction {color}= {
 r: Row => ParticleAirQuality(
 r.getAs[{color:#20999d}String{color}]({color:#008000}"sensor_id"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"location"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"lat"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"lon"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"timestamp"{color}),
 r.getAs[Double]({color:#008000}"P1"{color}),
 r.getAs[Double]({color:#008000}"P2"{color})
 )
 }

 

If it matters this is the paths (Note that I actually use \\ instead of / since 
it is windows - but that doesn't really matter):

{color:#80}val {color}{color:#660e7a}originalDataset {color}= 
{color:#008000}"D:/source/datasets/sofia-air-quality-dataset/*{color}{color:#008000}*sds**.csv"{color}

{color:#80}val {color}{color:#660e7a}bigParquetDataset {color}= 
{color:#008000}"D:/source/datasets/air-tests/all-parquet"{color}

 

The count from the csvs I get is: 33934609

While the count from the parquets is: 35739394

 

 

  was:
SO I played around with a data set from here: 
[https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]

I ran the same query for the base CSVs and against a parquet version of them:

{color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}

Here is the csv code:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})

df.createTempView({color:#008000}"airQuality"{color})

{color:#80}val {color}result = 
{color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality 
WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

println(result.count())

 

Here is the parquet code:

 

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implici

[jira] [Updated] (SPARK-30926) Same SQL on CSV and on Parquet gives different result

2020-02-23 Thread Bozhidar Karaargirov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bozhidar Karaargirov updated SPARK-30926:
-
Description: 
SO I played around with a data set from here: 
[https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]

I ran the same query for the base CSVs and against a parquet version of them:

{color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}

Here is the csv code:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})

df.createTempView({color:#008000}"airQuality"{color})

{color:#80}val {color}result = 
{color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality 
WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

println(result.count())

 

Here is the parquet code:

 

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).parquet({color:#660e7a}bigParquetDataset{color})

df.createTempView({color:#008000}"airQualityP"{color})

{color:#80}val {color}result = {color:#660e7a}session{color} 
.sql({color:#008000}"SELECT * FROM airQualityP WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}namedMappingFunction{color})

println(result.count())

 

And this is how I transform the csv into parquets:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color})
 .csv({color:#660e7a}originalDataset{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

df.write.parquet({color:#660e7a}bigParquetDataset{color})

 

These are the two mapping functions:

{color:#80}val {color}{color:#660e7a}mappingFunction {color}= {
 r: Row => ParticleAirQuality(
 r.getString({color:#ff}1{color}),
 r.getString({color:#ff}2{color}),
 r.getString({color:#ff}3{color}),
 r.getString({color:#ff}4{color}),
 r.getString({color:#ff}5{color}),
 {
 {color:#80}val {color}p1 = r.getString({color:#ff}6{color})
 {color:#80}if{color}(p1 == {color:#80}null{color}) 
Double.{color:#660e7a}NaN{color} {color:#80}else {color}p1.toDouble
 },
 {
 {color:#80}val {color}p2 = r.getString({color:#ff}7{color})
 {color:#80}if{color}(p2 == {color:#80}null{color}) 
Double.{color:#660e7a}NaN{color} {color:#80}else {color}p2.toDouble
 }
 ) }

{color:#80}val {color}{color:#660e7a}namedMappingFunction {color}= {
 r: Row => ParticleAirQuality(
 r.getAs[{color:#20999d}String{color}]({color:#008000}"sensor_id"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"location"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"lat"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"lon"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"timestamp"{color}),
 r.getAs[Double]({color:#008000}"P1"{color}),
 r.getAs[Double]({color:#008000}"P2"{color})
 )
 }

 

If it matters this is the paths:

{color:#80}val {color}{color:#660e7a}originalDataset {color}= 
{color:#008000}"D:/source/datasets/sofia-air-quality-dataset/*{color}{color:#008000}*sds**.csv"{color}

{color:#80}val {color}{color:#660e7a}bigParquetDataset {color}= 
{color:#008000}"D:/source/datasets/air-tests/all-parquet"{color}

 

The count from the csvs I get is: 33934609

While the count from the parquets is: 35739394

 

 

  was:
SO I played around with a data set from here: 
[https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]

I ran the same query for the base CSVs and against a parquet version of them:

{color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}

Here is the csv code:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})

df.createTempView({color:#008000}"airQuality"{color})

{color:#80}val {color}result = 
{color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality 
WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

println(result.count())

 

Here is the parquet code:

 

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"h

[jira] [Updated] (SPARK-30926) Same SQL on CSV and on Parquet gives different result

2020-02-23 Thread Bozhidar Karaargirov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bozhidar Karaargirov updated SPARK-30926:
-
Description: 
SO I played around with a data set from here: 
[https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]

I ran the same query for the base CSVs and against a parquet version of them:

{color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}

Here is the csv code:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})

df.createTempView({color:#008000}"airQuality"{color})

{color:#80}val {color}result = 
{color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality 
WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

println(result.count())

 

Here is the parquet code:

 

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).parquet({color:#660e7a}bigParquetDataset{color})

df.createTempView({color:#008000}"airQualityP"{color})

{color:#80}val {color}result = {color:#660e7a}session{color} 
.sql({color:#008000}"SELECT * FROM airQualityP WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}namedMappingFunction{color})

println(result.count())

 

And this is how I transform the csv into parquets:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color})
 .csv({color:#660e7a}originalDataset{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

df.write.parquet({color:#660e7a}bigParquetDataset{color})

 

These are the two mapping functions:

{color:#80}val {color}{color:#660e7a}mappingFunction {color}= {
 r: Row => ParticleAirQuality(
 r.getString({color:#ff}1{color}),
 r.getString({color:#ff}2{color}),
 r.getString({color:#ff}3{color}),
 r.getString({color:#ff}4{color}),
 r.getString({color:#ff}5{color}),
 {
 {color:#80}val {color}p1 = r.getString({color:#ff}6{color})
 {color:#80}if{color}(p1 == {color:#80}null{color}) 
Double.{color:#660e7a}NaN{color} {color:#80}else {color}p1.toDouble
 },
 {
 {color:#80}val {color}p2 = r.getString({color:#ff}7{color})
 {color:#80}if{color}(p2 == {color:#80}null{color}) 
Double.{color:#660e7a}NaN{color} {color:#80}else {color}p2.toDouble
 }
 ) }

{color:#80}val {color}{color:#660e7a}namedMappingFunction {color}= {
 r: Row => ParticleAirQuality(
 r.getAs[{color:#20999d}String{color}]({color:#008000}"sensor_id"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"location"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"lat"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"lon"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"timestamp"{color}),
 r.getAs[Double]({color:#008000}"P1"{color}),
 r.getAs[Double]({color:#008000}"P2"{color})
 )
 }

 

If it matters this is the paths:

{color:#80}val {color}{color:#660e7a}originalDataset {color}= 
{color:#008000}"D:\{color}{color:#008000}source\{color}{color:#008000}datasets\{color}{color:#008000}sofia-air-quality-dataset\*{color}{color:#008000}*sds**.csv"{color}

{color:#80}val {color}{color:#660e7a}bigParquetDataset {color}= 
{color:#008000}"D:\{color}{color:#008000}source\{color}{color:#008000}datasets\{color}{color:#008000}air-tests\{color}{color:#008000}all-parquet"{color}

 

The count from the csvs I get is: 33934609

While the count from the parquets is: 35739394

 

 

  was:
SO I played around with a data set from here: 
[https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]

I ran the same query for the base CSVs and against a parquet version of them:

{color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}

Here is the csv code:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})

df.createTempView({color:#008000}"airQuality"{color})

{color:#80}val {color}result = 
{color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality 
WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

println(result.count())

 

Here is the parquet code:

 

{color:#80}import 
{color}{color:#660e7

[jira] [Created] (SPARK-30926) Same SQL on CSV and on Parquet gives different result

2020-02-23 Thread Bozhidar Karaargirov (Jira)
Bozhidar Karaargirov created SPARK-30926:


 Summary: Same SQL on CSV and on Parquet gives different result
 Key: SPARK-30926
 URL: https://issues.apache.org/jira/browse/SPARK-30926
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.4
 Environment: I run this locally on a windows 10 machine.

The java runtime is:


{color:#cc}openjdk 11.0.5 2019-10-15
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.5+10)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.5+10, mixed mode){color}
Reporter: Bozhidar Karaargirov


SO I played around with a data set from here: 
[https://www.kaggle.com/hmavrodiev/sofia-air-quality-dataset]

I ran the same query for the base CSVs and against a parquet version of them:

{color:#008000}SELECT * FROM airQualityP WHERE P1 > 20{color}

Here is the csv code:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).csv({color:#660e7a}originalDataset{color})

df.createTempView({color:#008000}"airQuality"{color})

{color:#80}val {color}result = 
{color:#660e7a}session{color}.sql({color:#008000}"SELECT * FROM airQuality 
WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

println(result.count())

 

Here is the parquet code:

 

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color}).parquet({color:#660e7a}bigParquetDataset{color})

df.createTempView({color:#008000}"airQualityP"{color})

{color:#80}val {color}result = {color:#660e7a}session
{color} .sql({color:#008000}"SELECT * FROM airQualityP WHERE P1 > 20"{color})
 .map(ParticleAirQuality.{color:#660e7a}namedMappingFunction{color})

println(result.count())

 

And this is how I transform the csv into parquets:

{color:#80}import 
{color}{color:#660e7a}session{color}.{color:#660e7a}sqlContext{color}.implicits._

{color:#80}val {color}df = 
{color:#660e7a}session{color}.read.option({color:#008000}"header"{color}, 
{color:#008000}"true"{color})
 .csv({color:#660e7a}originalDataset{color})
 .map(ParticleAirQuality.{color:#660e7a}mappingFunction{color})

df.write.parquet({color:#660e7a}bigParquetDataset{color})

 

These are the two mapping functions:

{color:#80}val {color}{color:#660e7a}mappingFunction {color}= {
 r: Row => ParticleAirQuality(
 r.getString({color:#ff}1{color}),
 r.getString({color:#ff}2{color}),
 r.getString({color:#ff}3{color}),
 r.getString({color:#ff}4{color}),
 r.getString({color:#ff}5{color}),
 {
 {color:#80}val {color}p1 = r.getString({color:#ff}6{color})
 {color:#80}if{color}(p1 == {color:#80}null{color}) 
Double.{color:#660e7a}NaN
{color} {color:#80}else {color}p1.toDouble
 },
 {
 {color:#80}val {color}p2 = r.getString({color:#ff}7{color})
 {color:#80}if{color}(p2 == {color:#80}null{color}) 
Double.{color:#660e7a}NaN
{color} {color:#80}else {color}p2.toDouble
 }
 ) }

{color:#80}val {color}{color:#660e7a}namedMappingFunction {color}= {
 r: Row => ParticleAirQuality(
 r.getAs[{color:#20999d}String{color}]({color:#008000}"sensor_id"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"location"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"lat"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"lon"{color}),
 r.getAs[{color:#20999d}String{color}]({color:#008000}"timestamp"{color}),
 r.getAs[Double]({color:#008000}"P1"{color}),
 r.getAs[Double]({color:#008000}"P2"{color})
 )
}

 

If it matters this is the paths:

{color:#80}val {color}{color:#660e7a}originalDataset {color}= 
{color:#008000}"D:{color}{color:#80}\\{color}{color:#008000}source{color}{color:#80}\\{color}{color:#008000}datasets{color}{color:#80}\\{color}{color:#008000}sofia-air-quality-dataset{color}{color:#80}\\{color}{color:#008000}*sds*.csv"
{color}{color:#80}val {color}{color:#660e7a}bigParquetDataset {color}= 
{color:#008000}"D:{color}{color:#80}\\{color}{color:#008000}source{color}{color:#80}\\{color}{color:#008000}datasets{color}{color:#80}\\{color}{color:#008000}air-tests{color}{color:#80}\\{color}{color:#008000}all-parquet"{color}

 

The count from the csvs I get is: 33934609

While the count from the parquets is: 35739394

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org