[ https://issues.apache.org/jira/browse/SPARK-28062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-28062: ------------------------------------ Assignee: (was: Apache Spark) > HuberAggregator copies coefficients vector every time an instance is added > -------------------------------------------------------------------------- > > Key: SPARK-28062 > URL: https://issues.apache.org/jira/browse/SPARK-28062 > Project: Spark > Issue Type: Bug > Components: ML > Affects Versions: 3.0.0 > Reporter: Andrew Crosby > Priority: Major > > Every time an instance is added to the HuberAggregator, a copy of the > coefficients vector is created (see code snippet below). This causes a > performance degradation, which is particularly severe when the instances have > long sparse feature vectors. > {code:scala} > def add(instance: Instance): HuberAggregator = { > instance match { case Instance(label, weight, features) => > require(numFeatures == features.size, s"Dimensions mismatch when adding > new sample." + > s" Expecting $numFeatures but got ${features.size}.") > require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") > if (weight == 0.0) return this > val localFeaturesStd = bcFeaturesStd.value > val localCoefficients = bcParameters.value.toArray.slice(0, numFeatures) > val localGradientSumArray = gradientSumArray > // Snip > } > {code} > The LeastSquaresAggregator class avoids this performance issue via the use of > transient lazy class variables to store such reused values. Applying a > similar approach to HuberAggregator gives a significant speed boost. Running > the script below locally on my machine gives the following timing results: > {noformat} > Current implementation: > Time(s): 540.1439919471741 > Iterations: 26 > Intercept: 0.518109382890512 > Coefficients: [0.0, -0.2516936902000245, 0.0, 0.0, -0.19633887469839809, > 0.0, -0.39565545053893925, 0.0, -0.18617574426698882, 0.0478922416670529] > Modified implementation to match LeastSquaresAggregator: > Time(s): 46.82946586608887 > Iterations: 26 > Intercept: 0.5181093828893774 > Coefficients: [0.0, -0.25169369020031357, 0.0, 0.0, -0.1963388746927919, > 0.0, -0.3956554505389966, 0.0, -0.18617574426702874, 0.04789224166878518] > {noformat} > {code:python} > from random import random, randint, seed > import time > from pyspark.ml.feature import OneHotEncoder > from pyspark.ml.regression import LinearRegression > from pyspark.sql import SparkSession > seed(0) > spark = SparkSession.builder.appName('huber-speed-test').getOrCreate() > df = spark.createDataFrame([[randint(0, 100000), random()] for i in > range(100000)], ["category", "target"]) > ohe = OneHotEncoder(inputCols=["category"], > outputCols=["encoded_category"]).fit(df) > lr = LinearRegression(featuresCol="encoded_category", labelCol="target", > loss="huber", regParam=1.0) > start = time.time() > model = lr.fit(ohe.transform(df)) > end = time.time() > print("Time(s): " + str(end - start)) > print("Iterations: " + str(model.summary.totalIterations)) > print("Intercept: " + str(model.intercept)) > print("Coefficients: " + str(list(model.coefficients)[0:10])) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org