[ https://issues.apache.org/jira/browse/SPARK-27842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-27842: --------------------------------- Affects Version/s: 2.4.3 > Inconsistent results of Statistics.corr() and > PearsonCorrelation.computeCorrelationMatrix() > ------------------------------------------------------------------------------------------- > > Key: SPARK-27842 > URL: https://issues.apache.org/jira/browse/SPARK-27842 > Project: Spark > Issue Type: Bug > Components: ML, Spark Core, Windows > Affects Versions: 2.3.1, 2.4.3 > Reporter: Peter Nijem > Priority: Major > Attachments: vectorList.txt > > > Hi, > I am working with Spark Java API in local mode (1 node, 8 cores). Spark > version as follows in my pom.xml: > *MLLib* > _<artifactId>spark-mllib_2.11</artifactId>_ > _<version>2.3.1</version>_ > *Core* > _<artifactId>spark-core_2.11</artifactId>_ > _<version>2.3.1</version>_ > I am experiencing inconsistent results of correlation when starting my Spark > application with 8 cores vs 1/2/3 cores. > I've created a Main class which reads from a file a list of Vectors; 240 > Vector which each Vector is of the length of 226. > As you can see, I am firstly initializing Spark with local[*], running > Correlation, saving the Matrix and stopping Spark. Then, I do the same but > for local[3]. > I am expecting to get the same matrices on both runs. But this is not the > case. The input file is attached. > I tried to compute the correlation using > PearsonCorrealtion.computeCorrelationMatrix() but I faced the same issue here > as well. > > In my work, I am dependent on the resulting correlation matrix. Thus, I am > experiencing bad results in y application due to the inconsistent results I > am getting. As a workaround, I am working now with local[1] > > > {code:java} > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.mllib.linalg.DenseVector; > import org.apache.spark.mllib.linalg.Matrix; > import org.apache.spark.mllib.linalg.Vector; > import org.apache.spark.mllib.stat.Statistics; > import org.apache.spark.rdd.RDD; > import org.junit.Assert; > import java.io.BufferedReader; > import java.io.FileReader; > import java.io.IOException; > import java.math.RoundingMode; > import java.text.DecimalFormat; > import java.util.ArrayList; > import java.util.Arrays; > import java.util.List; > import java.util.stream.Collectors; > public class TestSparkCorr { > private static JavaSparkContext ctx; > public static void main(String[] args) { > List<List<Double>> doublesLists = readInputFile(); > List<Vector> resultVectors = getVectorsList(doublesLists); > //=========================================================================== > initSpark("*"); > RDD<Vector> RDD_vector = ctx.parallelize(resultVectors).rdd(); > Matrix m = Statistics.corr(RDD_vector, "pearson"); > stopSpark(); > //=========================================================================== > initSpark("3"); > RDD<Vector> RDD_vector_3 = ctx.parallelize(resultVectors).rdd(); > Matrix m3 = Statistics.corr(RDD_vector_3, "pearson"); > stopSpark(); > //=========================================================================== > Assert.assertEquals(m3, m); > } > private static List<Vector> getVectorsList(List<List<Double>> doublesLists) { > List<Vector> resultVectors = new ArrayList<>(doublesLists.size()); > for (List<Double> vector : doublesLists) { > double[] x = new double[vector.size()]; > for(int i = 0; i < x.length; i++){ > x[i] = vector.get(i); > } > resultVectors.add(new DenseVector(x)); > } > return resultVectors; > } > private static List<List<Double>> readInputFile() { > List<List<Double>> doublesLists = new ArrayList<>(); > try (BufferedReader reader = new BufferedReader(new FileReader( > ".//output//vectorList.txt"))) { > String line = reader.readLine(); > while (line != null) { > String[] splitLine = line.substring(1, line.length() - 2).split(","); > List<Double> doubles = Arrays.stream(splitLine).map(x -> > Double.valueOf(x.trim())).collect(Collectors.toList()); > doublesLists.add(doubles); > // read next line > line = reader.readLine(); > } > } catch (IOException e) { > e.printStackTrace(); > } > return doublesLists; > } > private static void initSpark(String coresNum) { > final SparkConf sparkConf = new SparkConf().setAppName("Span"); > sparkConf.setMaster(String.format("local[%s]", coresNum)); > sparkConf.set("spark.ui.enabled", "false"); > ctx = new JavaSparkContext(sparkConf); > } > private static void stopSpark() { > ctx.stop(); > if(ctx.sc().isStopped()){ > ctx = null; > } > } > } > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org