Global Sort + ZipWithIndex
Hello, I'm trying to assign a unique (and deterministic) ID to a globally sorted DataSet. Given a DataSet of String, I'm computing the frequency of each label as follows: val env = ExecutionEnvironment.getExecutionEnvironment val data = env.fromCollection(List("a","b","c","a","a","d","a","a","a","b","b","c","a","c","b","c")) val mapping = data.map(s => (s,1)) .groupBy(0) .reduce((a,b) => (a._1, a._2 + b._2)) .partitionByRange(1) .sortPartition(1, Order.DESCENDING) Then I want the most frequent label to be ID 0 and so on *in a decreasing order*. My idea was to use zipWithIndex. val result = mapping.zipWithIndex But this does not guarantee that the global order will be preserved right ? What can I do to get such mapping ? Thanks Regards Thomas
[no subject]
Hello, I'm trying to assign a unique (and deterministic) ID to a globally sorted DataSet. Given a DataSet of String, I can compute the frequency of each label as follows: val env = ExecutionEnvironment.getExecutionEnvironment val data = env.fromCollection(List("a","b","c","a","a","d","a","a","a","b","b","c","a","c","b","c")) val mapping = data.map(s => (s,1)) .groupBy(0) .reduce((a,b) => (a._1, a._2 + b._2)) .partitionByRange(1) .sortPartition(1, Order.DESCENDING) I want the most frequent label to be ID 0 and so on in decreasing order. My idea was to use zipWithIndex. But this does not guarantee that my DataSet will be
Re: FlinkML - Fail to execute QuickStart example
Hi, No problem I'm going to create a JIRA. Regards Thomas 2016-10-17 21:34 GMT+02:00 Theodore Vasiloudis < theodoros.vasilou...@gmail.com>: > That is my bad, I must have been testing against a private branch when > writing the guide, the SVM as it stands only has a predict operation for > Vector not LabeledVector. > > IMHO I would like to have a predict operator for LabeledVector for all > predictors (that would just call the existing Vector prediction > internally), but IIRC we decided to go with an Evaluate operator instead as > written in the evaluation PR <https://github.com/apache/flink/pull/1849>. > > I'll make a PR to fix the guide, any chance you can create a JIRA for this? > > Regards, > Theodore > > On Mon, Oct 17, 2016 at 6:22 PM, Thomas FOURNIER < > thomasfournier...@gmail.com> wrote: > >> Hi, >> >> Executing the following code (see QuickStart): >> >> val env = ExecutionEnvironment.getExecutionEnvironment >> val survival = env.readCsvFile[(String, String, String, >> String)]("src/main/resources/haberman.data", ",") >> >> >> val survivalLV = survival >> .map { tuple => >> val list = tuple.productIterator.toList >> val numList = list.map(_.asInstanceOf[String].toDouble) >> LabeledVector(numList(3), DenseVector(numList.take(3).toArray)) >> } >> >> >> >> val astroTrain = MLUtils.readLibSVM(env, "src/main/resources/svmguide1") >> val astroTest = MLUtils.readLibSVM(env, "src/main/resources/svmguide1.t") >> >> >> val svm = SVM() >> .setBlocks(env.getParallelism) >> .setIterations(100) >> .setRegularization(0.001) >> .setStepsize(0.1) >> .setSeed(42) >> >> svm.fit(astroTrain) >> svm.predict(astroTest) >> >> >> I encounter the following error: >> >> Exception in thread "main" java.lang.RuntimeException: There is no >> PredictOperation defined for org.apache.flink.ml.classification.SVM which >> takes a DataSet[org.apache.flink.ml.common.LabeledVector] as input. >> >> Any idea ? >> >> Thanks >> >> Thomas >> >> >> >> >> >
FlinkML - Fail to execute QuickStart example
Hi, Executing the following code (see QuickStart): val env = ExecutionEnvironment.getExecutionEnvironment val survival = env.readCsvFile[(String, String, String, String)]("src/main/resources/haberman.data", ",") val survivalLV = survival .map { tuple => val list = tuple.productIterator.toList val numList = list.map(_.asInstanceOf[String].toDouble) LabeledVector(numList(3), DenseVector(numList.take(3).toArray)) } val astroTrain = MLUtils.readLibSVM(env, "src/main/resources/svmguide1") val astroTest = MLUtils.readLibSVM(env, "src/main/resources/svmguide1.t") val svm = SVM() .setBlocks(env.getParallelism) .setIterations(100) .setRegularization(0.001) .setStepsize(0.1) .setSeed(42) svm.fit(astroTrain) svm.predict(astroTest) I encounter the following error: Exception in thread "main" java.lang.RuntimeException: There is no PredictOperation defined for org.apache.flink.ml.classification.SVM which takes a DataSet[org.apache.flink.ml.common.LabeledVector] as input. Any idea ? Thanks Thomas