This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c9e7b2  [FLINK-17795][example] Add MatrixVectorMul example
0c9e7b2 is described below

commit 0c9e7b21897cc3b4258ff21b295c9e0e8d7cd13f
Author: Yangze Guo <karma...@gmail.com>
AuthorDate: Mon May 25 18:31:41 2020 +0800

    [FLINK-17795][example] Add MatrixVectorMul example
    
    This closes #12398.
---
 flink-dist/src/main/assemblies/bin.xml             |   1 +
 flink-examples/flink-examples-streaming/pom.xml    |  68 ++++++
 .../streaming/examples/gpu/MatrixVectorMul.java    | 244 +++++++++++++++++++++
 3 files changed, 313 insertions(+)

diff --git a/flink-dist/src/main/assemblies/bin.xml 
b/flink-dist/src/main/assemblies/bin.xml
index bc8bb4f..c68f9d6 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -235,6 +235,7 @@ under the License.
                        <excludes>
                                <exclude>flink-examples-streaming*.jar</exclude>
                                <exclude>original-*.jar</exclude>
+                               <exclude>MatrixVectorMul.jar</exclude>
                        </excludes>
                </fileSet>
 
diff --git a/flink-examples/flink-examples-streaming/pom.xml 
b/flink-examples/flink-examples-streaming/pom.xml
index a055d5e..0bd841e 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -34,6 +34,11 @@ under the License.
 
        <packaging>jar</packaging>
 
+       <!-- Allow users to pass custom jcuda versions -->
+       <properties>
+               <jcuda.version>10.0.0</jcuda.version>
+       </properties>
+
        <dependencies>
 
                <!-- core dependencies -->
@@ -88,6 +93,33 @@ under the License.
             <version>${project.version}</version>
         </dependency>
 
+               <!-- Dependencies for MatrixVectorMul. We exclude native 
libraries
+               because it is not available in all the operating systems and 
architectures. Moreover,
+               we also want to enable users to compile and run MatrixVectorMul 
in different runtime environments.-->
+               <dependency>
+                       <groupId>org.jcuda</groupId>
+                       <artifactId>jcuda</artifactId>
+                       <version>${jcuda.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.jcuda</groupId>
+                                       <artifactId>jcuda-natives</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.jcuda</groupId>
+                       <artifactId>jcublas</artifactId>
+                       <version>${jcuda.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.jcuda</groupId>
+                                       <artifactId>jcublas-natives</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
     </dependencies>
 
        <build>
@@ -365,6 +397,42 @@ under the License.
                                </executions>
                        </plugin>
 
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>MatrixVectorMul</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<shadeTestJar>false</shadeTestJar>
+                                                       
<finalName>MatrixVectorMul</finalName>
+                                                       <artifactSet>
+                                                               <includes>
+                                                                       
<include>org.jcuda:*</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>org.apache.flink:*</artifact>
+                                                                       
<includes>
+                                                                               
<include>org/apache/flink/streaming/examples/gpu/MatrixVectorMul.class</include>
+                                                                               
<include>org/apache/flink/streaming/examples/gpu/MatrixVectorMul$*.class</include>
+                                                                       
</includes>
+                                                               </filter>
+                                                       </filters>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.examples.gpu.MatrixVectorMul</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
 
                        <!-- Scala Compiler -->
                        <plugin>
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java
new file mode 100644
index 0000000..a0177c2
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.gpu;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Preconditions;
+
+import jcuda.Pointer;
+import jcuda.Sizeof;
+import jcuda.jcublas.JCublas;
+import jcuda.runtime.JCuda;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Implements the matrix-vector multiplication program that shows how to use 
GPU resources in Flink.
+ *
+ * <p>The input is a vector stream from a {@link RandomVectorSource}, which 
will generate random vectors with specified
+ * dimension. The data size of the vector stream could be specified by user. 
Each vector will be multiplied with a random
+ * dimension * dimension matrix in {@link Multiplier} and the result would be 
emitted to output.
+ *
+ * <p>Usage: MatrixVectorMul [--output &lt;path&gt;] [--dimension 
&lt;dimension&gt; --data-size &lt;data_size&gt;]
+ *
+ * <p>If no parameters are provided, the program is run with default vector 
dimension 10 and data size 100.
+ *
+ * <p>This example shows how to:
+ * <ul>
+ * <li>leverage external resource in operators,
+ * <li>accelerate complex calculation with GPU resources.
+ * </ul>
+ *
+ * <p>Notice that you need to add JCuda natives libraries in your Flink 
distribution by the following steps:
+ * <ul>
+ * <li>download the JCuda native libraries bundle for your CUDA version from 
http://www.jcuda.org/downloads/
+ * <li>copy the native libraries jcuda-natives and jcublas-natives for your 
CUDA version, operating system and architecture
+ * to the "lib/" folder of your Flink distribution
+ * </ul>
+ */
+public class MatrixVectorMul {
+
+       private static final int DEFAULT_DIM = 10;
+       private static final int DEFAULT_DATA_SIZE = 100;
+       private static final String DEFAULT_RESOURCE_NAME = "gpu";
+
+       public static void main(String[] args) throws Exception {
+
+               // Checking input parameters
+               final ParameterTool params = ParameterTool.fromArgs(args);
+               System.out.println("Usage: MatrixVectorMul [--output <path>] 
[--dimension <dimension> --data-size <data_size>] [--resource-name 
<resource_name>]");
+
+               // Set up the execution environment
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               // Make parameters available in the web interface
+               env.getConfig().setGlobalJobParameters(params);
+
+               final int dimension = params.getInt("dimension", DEFAULT_DIM);
+               final int dataSize = params.getInt("data-size", 
DEFAULT_DATA_SIZE);
+               final String resourceName = params.get("resource-name", 
DEFAULT_RESOURCE_NAME);
+
+               DataStream<List<Float>> result = env.addSource(new 
RandomVectorSource(dimension, dataSize))
+                                               .map(new Multiplier(dimension, 
resourceName));
+
+               // Emit result
+               if (params.has("output")) {
+                       result.addSink(StreamingFileSink.forRowFormat(new 
Path(params.get("output")),
+                                       new 
SimpleStringEncoder<List<Float>>()).build());
+               } else {
+                       System.out.println("Printing result to stdout. Use 
--output to specify output path.");
+                       result.print();
+               }
+               // Execute program
+               env.execute("Matrix-Vector Multiplication");
+       }
+
+       // 
*************************************************************************
+       // USER FUNCTIONS
+       // 
*************************************************************************
+
+       /**
+        * Random vector source which generates random vectors with specified 
dimension and total data size.
+        */
+       private static final class RandomVectorSource extends 
RichSourceFunction<List<Float>> {
+
+               private transient volatile boolean running;
+               private final int dimension;
+               private final int dataSize;
+
+               RandomVectorSource(int dimension, int dataSize) {
+                       this.dimension = dimension;
+                       this.dataSize = dataSize;
+               }
+
+               @Override
+               public void open(Configuration parameters) {
+                       running = true;
+               }
+
+               @Override
+               public void run(SourceContext<List<Float>> ctx) {
+                       int count = 0;
+                       while (running && count < dataSize) {
+                               List<Float> randomRecord = new ArrayList<>();
+                               for (int i = 0; i < dimension; ++i) {
+                                       randomRecord.add((float) Math.random());
+                               }
+                               ctx.collect(randomRecord);
+                               count += 1;
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+
+       /**
+        * Matrix-Vector multiplier using CUBLAS library.
+        */
+       private static final class Multiplier extends 
RichMapFunction<List<Float>, List<Float>> {
+               private final int dimension;
+               private final String resourceName;
+               private Pointer matrixPointer;
+
+               Multiplier(int dimension, String resourceName) {
+                       this.dimension = dimension;
+                       this.resourceName = resourceName;
+               }
+
+               @Override
+               public void open(Configuration parameters) {
+                       // When multiple instances of this class and JCuda 
exist in different class loaders, then we will get UnsatisfiedLinkError.
+                       // To avoid that, we need to temporarily override the 
java.io.tmpdir, where the JCuda store its native library, with a random path.
+                       // For more details please refer to 
https://issues.apache.org/jira/browse/FLINK-5408 and the discussion in 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Classloader-and-removal-of-native-libraries-td14808.html
+                       final String originTempDir = 
System.getProperty("java.io.tmpdir");
+                       final String newTempDir = originTempDir + "/jcuda-" + 
UUID.randomUUID();
+                       System.setProperty("java.io.tmpdir", newTempDir);
+
+                       final Set<ExternalResourceInfo> externalResourceInfos = 
getRuntimeContext().getExternalResourceInfos(resourceName);
+                       
Preconditions.checkState(!externalResourceInfos.isEmpty(), "The MatrixVectorMul 
needs at least one GPU device while finding 0 GPU.");
+                       final Optional<String> firstIndexOptional = 
externalResourceInfos.iterator().next().getProperty("index");
+                       
Preconditions.checkState(firstIndexOptional.isPresent());
+
+                       matrixPointer = new Pointer();
+                       final float[] matrix = new float[dimension * dimension];
+                       // Initialize a random matrix
+                       for (int i = 0; i < dimension * dimension; ++i) {
+                               matrix[i] = (float) Math.random();
+                       }
+
+                       // Set the CUDA device
+                       
JCuda.cudaSetDevice(Integer.parseInt(firstIndexOptional.get()));
+
+                       // Initialize JCublas
+                       JCublas.cublasInit();
+
+                       // Allocate device memory for the matrix
+                       JCublas.cublasAlloc(dimension * dimension, 
Sizeof.FLOAT, matrixPointer);
+                       JCublas.cublasSetVector(dimension * dimension, 
Sizeof.FLOAT, Pointer.to(matrix), 1, matrixPointer, 1);
+
+                       // Change the java.io.tmpdir back to its original value.
+                       System.setProperty("java.io.tmpdir", originTempDir);
+               }
+
+               @Override
+               public List<Float> map(List<Float> value) {
+                       final float[] input = new float[dimension];
+                       final float[] output = new float[dimension];
+                       final Pointer inputPointer = new Pointer();
+                       final Pointer outputPointer = new Pointer();
+
+                       // Fill the input and output vector
+                       for (int i = 0; i < dimension; i++) {
+                               input[i] = value.get(i);
+                               output[i] = 0;
+                       }
+
+                       // Allocate device memory for the input and output
+                       JCublas.cublasAlloc(dimension, Sizeof.FLOAT, 
inputPointer);
+                       JCublas.cublasAlloc(dimension, Sizeof.FLOAT, 
outputPointer);
+
+                       // Initialize the device matrices
+                       JCublas.cublasSetVector(dimension, Sizeof.FLOAT, 
Pointer.to(input), 1, inputPointer, 1);
+                       JCublas.cublasSetVector(dimension, Sizeof.FLOAT, 
Pointer.to(output), 1, outputPointer, 1);
+
+                       // Performs operation using JCublas
+                       JCublas.cublasSgemv('n', dimension, dimension, 1.0f,
+                               matrixPointer, dimension, inputPointer, 1, 
0.0f, outputPointer, 1);
+
+                       // Read the result back
+                       JCublas.cublasGetVector(dimension, Sizeof.FLOAT, 
outputPointer, 1, Pointer.to(output), 1);
+
+                       // Memory clean up
+                       JCublas.cublasFree(inputPointer);
+                       JCublas.cublasFree(outputPointer);
+
+                       List<Float> outputList = new ArrayList<>();
+                       for (int i = 0; i < dimension; ++i) {
+                               outputList.add(output[i]);
+                       }
+
+                       return outputList;
+               }
+
+               @Override
+               public void close() {
+                       // Memory clean up
+                       JCublas.cublasFree(matrixPointer);
+
+                       // Shutdown cublas
+                       JCublas.cublasShutdown();
+               }
+       }
+}

Reply via email to