[ https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352342#comment-15352342 ]
ASF GitHub Bot commented on FLINK-3920: --------------------------------------- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r68696692 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala --- @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math.distributed + +import java.lang + +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.{createTypeInformation, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.SparseVector +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID +import org.apache.flink.util.Collector + +import scala.collection.JavaConversions._ + +/** + * Distributed Matrix represented as blocks. + * A BlockMapper instance is used to track blocks of the matrix. + * Every block in a BlockMatrix has an associated ID that also + * identifies its position in the BlockMatrix. + * @param data + * @param blockMapper + */ +class BlockMatrix( + data: DataSet[(BlockID, Block)], + blockMapper: BlockMapper +) + extends DistributedMatrix { + + val getDataset = data + + val numCols = blockMapper.numCols + val numRows = blockMapper.numRows + + + val getBlockCols = blockMapper.numBlockCols + val getBlockRows = blockMapper.numBlockRows + + val getRowsPerBlock = blockMapper.rowsPerBlock + val getColsPerBlock = blockMapper.colsPerBlock + + val getNumBlocks = blockMapper.numBlocks + + /** + * Compares the format of two block matrices + * @return + */ + def hasSameFormat(other: BlockMatrix): Boolean = + this.numRows == other.numRows && + this.numCols == other.numCols && + this.getRowsPerBlock == other.getRowsPerBlock && + this.getColsPerBlock == other.getColsPerBlock + + /** + * Perform an operation on pairs of block. Pairs are formed taking + * matching blocks from the two matrices that are placed in the same position. + * A function is then applied to the pair to return a new block. + * These blocks are then composed in a new block matrix. + */ + def blockPairOperation( + fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = { + require(hasSameFormat(other)) + + /*Full outer join on blocks. The full outer join is required because of + the sparse nature of the matrix. + Matching blocks may be missing and a block of zeros is used instead.*/ + val processedBlocks = + this.getDataset.fullOuterJoin(other.getDataset).where(0).equalTo(0) { + (left: (BlockID, Block), right: (BlockID, Block)) => + { + + val (id1, block1) = Option(left).getOrElse( + (right._1, Block.zero(right._2.getRows, right._2.getCols))) + + val (id2, block2) = Option(right).getOrElse( + (left._1, Block.zero(left._2.getRows, left._2.getCols))) --- End diff -- Same as above. > Distributed Linear Algebra: block-based matrix > ---------------------------------------------- > > Key: FLINK-3920 > URL: https://issues.apache.org/jira/browse/FLINK-3920 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library > Reporter: Simone Robutti > Assignee: Simone Robutti > -- This message was sent by Atlassian JIRA (v6.3.4#6332)