Github user myui commented on a diff in the pull request: https://github.com/apache/incubator-hivemall/pull/167#discussion_r226578817 --- Diff: core/src/main/java/hivemall/mf/CofactorModel.java --- @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.mf; + +import hivemall.annotations.VisibleForTesting; +import hivemall.fm.Feature; +import hivemall.utils.lang.Preconditions; +import hivemall.utils.math.MathUtils; +import it.unimi.dsi.fastutil.objects.Object2DoubleArrayMap; +import it.unimi.dsi.fastutil.objects.Object2DoubleMap; +import org.apache.commons.math3.linear.ArrayRealVector; +import org.apache.commons.math3.linear.Array2DRowRealMatrix; +import org.apache.commons.math3.linear.RealMatrix; +import org.apache.commons.math3.linear.RealVector; +import org.apache.commons.math3.linear.SingularValueDecomposition; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + + +public class CofactorModel { + + public enum RankInitScheme { + random /* default */, gaussian; + + + @Nonnegative + private float maxInitValue; + @Nonnegative + private double initStdDev; + @Nonnull + public static CofactorModel.RankInitScheme resolve(@Nullable String opt) { + if (opt == null) { + return random; + } else if ("gaussian".equalsIgnoreCase(opt)) { + return gaussian; + } else if ("random".equalsIgnoreCase(opt)) { + return random; + } + return random; + } + + public void setMaxInitValue(float maxInitValue) { + this.maxInitValue = maxInitValue; + } + + public void setInitStdDev(double initStdDev) { + this.initStdDev = initStdDev; + } + + + } + + @Nonnegative + private final int factor; + + // rank matrix initialization + private final RankInitScheme initScheme; + + @Nonnull + private double globalBias; + + // storing trainable latent factors and weights + private final Map<String, double[]> theta; + private final Map<String, double[]> beta; + private final Object2DoubleMap<String> betaBias; + private final Map<String, double[]> gamma; + private final Object2DoubleMap<String> gammaBias; + + private final Random[] randU, randI; + + // hyperparameters + private final float c0, c1; + private final float lambdaTheta, lambdaBeta, lambdaGamma; + + // solve + private final RealMatrix B; + private final RealVector A; + + // error message strings + private static final String ARRAY_NOT_SQUARE_ERR = "Array is not square"; + private static final String DIFFERENT_DIMS_ERR = "Matrix, vector or array do not match in size"; + + public CofactorModel(@Nonnegative int factor, @Nonnull RankInitScheme initScheme, + float c0, float c1, float lambdaTheta, float lambdaBeta, float lambdaGamma) { + + // rank init scheme is gaussian + // https://github.com/dawenl/cofactor/blob/master/src/cofacto.py#L98 + this.factor = factor; + this.initScheme = initScheme; + this.globalBias = 0.d; + this.lambdaTheta = lambdaTheta; + this.lambdaBeta = lambdaBeta; + this.lambdaGamma = lambdaGamma; + + this.theta = new HashMap<>(); + this.beta = new HashMap<>(); + this.betaBias = new Object2DoubleArrayMap<>(); + this.betaBias.defaultReturnValue(0.d); + this.gamma = new HashMap<>(); + this.gammaBias = new Object2DoubleArrayMap<>(); + this.gammaBias.defaultReturnValue(0.d); + + this.B = new Array2DRowRealMatrix(this.factor, this.factor); + this.A = new ArrayRealVector(this.factor); + + this.randU = newRandoms(factor, 31L); + this.randI = newRandoms(factor, 41L); + + Preconditions.checkArgument(c0 >= 0.f && c0 <= 1.f); + Preconditions.checkArgument(c1 >= 0.f && c1 <= 1.f); + this.c0 = c0; + this.c1 = c1; + + } + + private void initFactorVector(String key, Map<String, double[]> weights) { + if (weights.containsKey(key)) { + return; + } + double[] v = new double[factor]; + switch (initScheme) { + case random: + uniformFill(v, randI[0], initScheme.maxInitValue); + break; + case gaussian: + gaussianFill(v, randI, initScheme.initStdDev); + break; + default: + throw new IllegalStateException( + "Unsupported rank initialization scheme: " + initScheme); + + } + weights.put(key, v); + } + + private static double[] getFactorVector(String key, Map<String, double[]> weights) { + return weights.get(key); + } + + private static void setFactorVector(String key, Map<String, double[]> weights, RealVector factorVector) throws HiveException { + double[] vec = weights.get(key); + if (vec == null) { + throw new HiveException(); + } + copyData(vec, factorVector); + } + + private static double getBias(String key, Object2DoubleMap<String> biases) { + return biases.getDouble(key); + } + + private static void setBias(String key, Object2DoubleMap<String> biases, double value) { + biases.put(key, value); + } + + public void recordContext(String context, Boolean isItem) { + if (isItem) { + initFactorVector(context, beta); + initFactorVector(context, gamma); + } else { + initFactorVector(context, theta); + } + } + + public double[] getGammaVector(final String key) { + return getFactorVector(key, gamma); + } + + public double getGammaBias(final String key) { + return getBias(key, gammaBias); + } + + public void setGammaBias(final String key, final double value) { + setBias(key, gammaBias, value); + } + + public double getGlobalBias() { + return globalBias; + } + + public void setGlobalBias(final double value) { + globalBias = value; + } + + public double[] getThetaVector(final String key) { + return getFactorVector(key, theta); + } + + public double[] getBetaVector(final String key) { + return getFactorVector(key, beta); + } + + public double getBetaBias(final String key) { + return getBias(key, betaBias); + } + + public void setBetaBias(final String key, final double value) { + setBias(key, betaBias, value); + } + + public Map<String, double[]> getTheta() { + return theta; + } + + public Map<String, double[]> getBeta() { + return beta; + } + + public Map<String, double[]> getGamma() { + return gamma; + } + + public Object2DoubleMap<String> getBetaBiases() { + return betaBias; + } + + public Object2DoubleMap<String> getGammaBiases() { + return gammaBias; + } + + public void updateWithUsers(List<CofactorizationUDTF.TrainingSample> users) throws HiveException { + updateTheta(users); + } + + public void updateWithItems(List<CofactorizationUDTF.TrainingSample> items) throws HiveException { + updateBeta(items); + updateGamma(items); + updateBetaBias(items); + updateGammaBias(items); + } + + /** + * Update latent factors of the users in the provided mini-batch. + */ + private void updateTheta(List<CofactorizationUDTF.TrainingSample> samples) throws HiveException { + // initialize item factors + // items should only be trainable if the dataset contains a major entry for that item (which it may not) + // variable names follow cofacto.py + double[][] BTBpR = calculateWTWpR(beta, factor, c0, lambdaTheta); + + for (CofactorizationUDTF.TrainingSample sample : samples) { + RealVector newThetaVec = calculateNewThetaVector(sample, beta, factor, B, A, BTBpR, c0, c1); + if (newThetaVec != null) { + setFactorVector(sample.context, theta, newThetaVec); + } + } + } + + @VisibleForTesting + protected static RealVector calculateNewThetaVector(CofactorizationUDTF.TrainingSample sample, Map<String, double[]> beta, + int numFactors, RealMatrix B, RealVector A, double[][] BTBpR, float c0, float c1) throws HiveException { + // filter for trainable items + List<Feature> trainableItems = filterTrainableFeatures(sample.features, beta); + // TODO: is this correct behaviour? + if (trainableItems.isEmpty()) { + return null; + } + + double[] a = calculateA(trainableItems, beta, numFactors, c1); + + double[][] delta = calculateWTWSubset(trainableItems, beta, numFactors, c1 - c0); + double[][] b = addInPlace(delta, BTBpR); + + // solve and update factors + return solve(B, b, A, a); + } + + /** + * Update latent factors of the items in the provided mini-batch. + */ + private void updateBeta(List<CofactorizationUDTF.TrainingSample> samples) throws HiveException { + // precomputed matrix + double[][] TTTpR = calculateWTWpR(theta, factor, c0, lambdaBeta); + + for (CofactorizationUDTF.TrainingSample sample : samples) { + RealVector newBetaVec = calculateNewBetaVector(sample, theta, gamma, gammaBias, betaBias, factor, B, A, TTTpR, c0, c1); + if (newBetaVec != null) { + setFactorVector(sample.context, beta, newBetaVec); + } + } + } + + @VisibleForTesting + protected static RealVector calculateNewBetaVector(CofactorizationUDTF.TrainingSample sample, Map<String, double[]> theta, + Map<String, double[]> gamma, Object2DoubleMap<String> gammaBias, + Object2DoubleMap<String> betaBias, int numFactors, RealMatrix B, RealVector A, + double[][] TTTpR, float c0, float c1) throws HiveException { + // filter for trainable users + List<Feature> trainableUsers = filterTrainableFeatures(sample.features, theta); + // TODO: is this correct behaviour? + if (trainableUsers.isEmpty()) { + return null; + } + + List<Feature> trainableCooccurringItems = filterTrainableFeatures(sample.sppmi, gamma); + double[] RSD = calculateRSD(sample.context, trainableCooccurringItems, numFactors, betaBias, gammaBias, gamma); + double[] ApRSD = addInPlace(calculateA(trainableUsers, theta, numFactors, c1), RSD, 1.f); + + double[][] GTG = calculateWTWSubset(trainableCooccurringItems, gamma, numFactors, 1.f); + double[][] delta = calculateWTWSubset(trainableUsers, theta, numFactors, c1 - c0); + // never add into the precomputed `TTTpR` array, only add into temporary arrays like `delta` and `GTG` + double[][] b = addInPlace(addInPlace(delta, GTG), TTTpR); + + // solve and update factors + return solve(B, b, A, ApRSD); + } + + /** + * Update latent factors of the items in the provided mini-batch. + */ + private void updateGamma(List<CofactorizationUDTF.TrainingSample> samples) throws HiveException { + for (CofactorizationUDTF.TrainingSample sample : samples) { + RealVector newGammaVec = calculateNewGammaVector(sample, beta, gammaBias, betaBias, factor, B, A, lambdaGamma); + if (newGammaVec != null) { + setFactorVector(sample.context, gamma, newGammaVec); + } + } + } + + @VisibleForTesting + protected static RealVector calculateNewGammaVector(CofactorizationUDTF.TrainingSample sample, Map<String, double[]> beta, + Object2DoubleMap<String> gammaBias, Object2DoubleMap<String> betaBias, + int numFactors, RealMatrix B, RealVector A, float lambdaGamma) throws HiveException { + // filter for trainable items + List<Feature> trainableCooccurringItems = filterTrainableFeatures(sample.sppmi, beta); + // TODO: is this correct behaviour? + if (trainableCooccurringItems.isEmpty()) { + return null; + } + + double[][] b = regularize(calculateWTWSubset(trainableCooccurringItems, beta, numFactors, 1.f), lambdaGamma); + double[] rsd = calculateRSD(sample.context, trainableCooccurringItems, numFactors, gammaBias, betaBias, beta); + + // solve and update factors + return solve(B, b, A, rsd); + } + + private static double[][] regularize(double[][] A, float lambda) { + for (int i = 0; i < A.length; i++) { + A[i][i] += lambda; + } + return A; + } + + private void updateBetaBias(List<CofactorizationUDTF.TrainingSample> samples) { + for (CofactorizationUDTF.TrainingSample sample : samples) { + Double newBetaBias = calculateNewBias(sample, beta, gamma, gammaBias); + // TODO: is this correct behaviour? + if (newBetaBias != null) { + setBetaBias(sample.context, newBetaBias); + } + } + } + + public void updateGammaBias(List<CofactorizationUDTF.TrainingSample> samples) { + for (CofactorizationUDTF.TrainingSample sample : samples) { + Double newGammaBias = calculateNewBias(sample, gamma, beta, betaBias); + // TODO: is this correct behaviour? + if (newGammaBias != null) { + setGammaBias(sample.context, newGammaBias); + } + } + } + + @VisibleForTesting + protected static Double calculateNewBias(CofactorizationUDTF.TrainingSample sample, Map<String, double[]> beta, + Map<String, double[]> gamma, Object2DoubleMap<String> biases) { + // filter for trainable items + List<Feature> trainableCooccurringItems = filterTrainableFeatures(sample.sppmi, beta); + if (trainableCooccurringItems.isEmpty()) { + return null; + } + + double rsd = calculateBiasRSD(sample.context, trainableCooccurringItems, beta, gamma, biases); + return rsd / trainableCooccurringItems.size(); + + } + + @VisibleForTesting + protected static double calculateBiasRSD(String thisItem, List<Feature> trainableItems, Map<String, double[]> beta, + Map<String, double[]> gamma, Object2DoubleMap<String> biases) { + double result = 0.d, cooccurBias; + double[] thisFactorVec = getFactorVector(thisItem, beta); + double[] cooccurVec; + + for (Feature cooccurrence : trainableItems) { + String j = cooccurrence.getFeature(); + cooccurVec = getFactorVector(j, gamma); + cooccurBias = getBias(j, biases); + double value = cooccurrence.getValue() - dotProduct(thisFactorVec, cooccurVec) - cooccurBias; + result += value; + } + return result; + } + + @VisibleForTesting + protected static double[] calculateRSD(String thisItem, List<Feature> trainableItems, int numFactors, + Object2DoubleMap<String> fixedBias, Object2DoubleMap<String> changingBias, + Map<String, double[]> weights) throws HiveException { + + double b = getBias(thisItem, fixedBias); + + double[] accumulator = new double[numFactors]; + + // m_ij is named the same as in cofacto.py + for (Feature cooccurrence : trainableItems) { + String j = cooccurrence.getFeature(); + double scale = cooccurrence.getValue() - b - getBias(j, changingBias); + double[] g = getFactorVector(j, weights); + addInPlace(accumulator, g, scale); + } + return accumulator; + } + + /** + * Calculate W' x W plus regularization matrix + */ + @VisibleForTesting + protected static double[][] calculateWTWpR(Map<String, double[]> W, int numFactors, float c0, float lambda) { + double[][] WTW = calculateWTW(W, numFactors, c0); + return regularize(WTW, lambda); + } + + private static void checkCondition(boolean condition, String errorMessage) throws HiveException { + if (!condition) { + throw new HiveException(errorMessage); + } + } + + @VisibleForTesting + protected static double[][] addInPlace(@Nonnull double[][] A, @Nonnull double[][] B) throws HiveException { + checkCondition(A.length == A[0].length && A.length == B.length && B.length == B[0].length, ARRAY_NOT_SQUARE_ERR); + for (int i = 0; i < A.length; i++) { + for (int j = 0; j < A[0].length; j++) { + A[i][j] += B[i][j]; + } + } + return A; + } + + @VisibleForTesting + protected static List<Feature> filterTrainableFeatures(Feature[] features, Map<String, double[]> weights) { + List<Feature> trainableFeatures = new ArrayList<>(); + String fName; + for (Feature f : features) { + fName = f.getFeature(); + if (isTrainable(fName, weights)) { + trainableFeatures.add(f); + } + } + return trainableFeatures; + } + + @VisibleForTesting + protected static RealVector solve(RealMatrix B, double[][] dataB, RealVector A, double[] dataA) throws HiveException { + // b * x = a + // solves for x + copyData(B, dataB); + copyData(A, dataA); + SingularValueDecomposition svd = new SingularValueDecomposition(B); + return svd.getSolver().solve(A); + } + + private static void copyData(RealMatrix dst, double[][] src) throws HiveException { --- End diff -- add `@Nonnull`
---