[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14614723#comment-14614723 ]
ASF GitHub Bot commented on FLINK-1745: --------------------------------------- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/696#discussion_r33916484 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/KNN.scala --- @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.DataSetUtils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.{DistanceMetric, EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * This algorithm calculates `k` nearest neighbor points in training set for each points of + * testing set. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = ... + * val testingDS: DataSet[Vector] = ... + * + * val knn = KNN() + * .setK(10) + * .setBlocks(5) + * .setDistanceMetric(EuclideanDistanceMetric()) + * + * knn.fit(trainingDS) + * + * val predictionDS: DataSet[(Vector, Array[Vector])] = knn.predict(testingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.classification.KNN.K]] + * Sets the K which is the number of selected points as neighbors. (Default value: '''None''') + * + * - [[org.apache.flink.ml.classification.KNN.Blocks]] + * Sets the number of blocks into which the input data will be split. This number should be set + * at least to the degree of parallelism. If no value is specified, then the parallelism of the + * input [[DataSet]] is used as the number of blocks. (Default value: '''None''') + * + * - [[org.apache.flink.ml.classification.KNN.DistanceMetric]] + * Sets the distance metric to calculate distance between two points. If no metric is specified, + * then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. (Default value: + * '''EuclideanDistanceMetric()''') + * + */ +class KNN extends Predictor[KNN] { + + import KNN._ + + var trainingSet: Option[DataSet[Block[Vector]]] = None + + /** Sets K + * @param k the number of selected points as neighbors + */ + def setK(k: Int): KNN = { + require(k > 1, "K must be positive.") + parameters.add(K, k) + this + } + + /** Sets the distance metric + * @param metric the distance metric to calculate distance between two points + */ + def setDistanceMetric(metric: DistanceMetric): KNN = { + parameters.add(DistanceMetric, metric) + this + } + + /** Sets the number of data blocks/partitions + * @param n the number of data blocks + */ + def setBlocks(n: Int): KNN = { + require(n > 1, "Number of blocks must be positive.") + parameters.add(Blocks, n) + this + } +} + +object KNN { + + case object K extends Parameter[Int] { + val defaultValue: Option[Int] = None + } + + case object DistanceMetric extends Parameter[DistanceMetric] { + val defaultValue: Option[DistanceMetric] = Some(EuclideanDistanceMetric()) + } + + case object Blocks extends Parameter[Int] { + val defaultValue: Option[Int] = None + } + + def apply(): KNN = { + new KNN() + } + + /** [[FitOperation]] which trains a KNN based on the given training data set. + * @tparam T Subtype of [[Vector]] --- End diff -- Clarify we mean FlinkML Vector > Add exact k-nearest-neighbours algorithm to machine learning library > -------------------------------------------------------------------- > > Key: FLINK-1745 > URL: https://issues.apache.org/jira/browse/FLINK-1745 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Labels: ML, Starter > > Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial > it is still used as a mean to classify data and to do regression. This issue > focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as > proposed in [2]. > Could be a starter task. > Resources: > [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] > [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)