Andrew Crosby created SPARK-28062: ------------------------------------- Summary: HuberAggregator copies coefficients vector every time an instance is added Key: SPARK-28062 URL: https://issues.apache.org/jira/browse/SPARK-28062 Project: Spark Issue Type: Bug Components: ML Affects Versions: 3.0.0 Reporter: Andrew Crosby
Every time an instance is added to the HuberAggregator, a copy of the coefficients vector is created (see code snippet below). This causes a performance degradation, which is particularly severe when the instances have long sparse feature vectors. {code:scala} def add(instance: Instance): HuberAggregator = { instance match { case Instance(label, weight, features) => require(numFeatures == features.size, s"Dimensions mismatch when adding new sample." + s" Expecting $numFeatures but got ${features.size}.") require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this val localFeaturesStd = bcFeaturesStd.value val localCoefficients = bcParameters.value.toArray.slice(0, numFeatures) val localGradientSumArray = gradientSumArray // Snip } {code} The LeastSquaresAggregator class avoids this performance issue via the use of transient lazy class variables to store such reused values. Applying a similar approach to HuberAggregator gives a significant speed boost. Running the script below locally on my machine gives the following timing results: {noformat} Current implementation: Time(s): 540.1439919471741 Iterations: 26 Intercept: 0.518109382890512 Coefficients: [0.0, -0.2516936902000245, 0.0, 0.0, -0.19633887469839809, 0.0, -0.39565545053893925, 0.0, -0.18617574426698882, 0.0478922416670529] Modified implementation to match LeastSquaresAggregator: Time(s): 46.82946586608887 Iterations: 26 Intercept: 0.5181093828893774 Coefficients: [0.0, -0.25169369020031357, 0.0, 0.0, -0.1963388746927919, 0.0, -0.3956554505389966, 0.0, -0.18617574426702874, 0.04789224166878518] {noformat} {code:python} from random import random, randint, seed import time from pyspark.ml.feature import OneHotEncoder from pyspark.ml.regression import LinearRegression from pyspark.sql import SparkSession seed(0) spark = SparkSession.builder.appName('huber-speed-test').getOrCreate() df = spark.createDataFrame([[randint(0, 100000), random()] for i in range(100000)], ["category", "target"]) ohe = OneHotEncoder(inputCols=["category"], outputCols=["encoded_category"]).fit(df) lr = LinearRegression(featuresCol="encoded_category", labelCol="target", loss="huber", regParam=1.0) start = time.time() model = lr.fit(ohe.transform(df)) end = time.time() print("Time(s): " + str(end - start)) print("Iterations: " + str(model.summary.totalIterations)) print("Intercept: " + str(model.intercept)) print("Coefficients: " + str(list(model.coefficients)[0:10])) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org