[
https://issues.apache.org/jira/browse/FLINK-337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14212312#comment-14212312
]
ASF GitHub Bot commented on FLINK-337:
--------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/202#discussion_r20361477
--- Diff:
flink-addons/flink-language-binding/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
---
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the
License.
+ */
+package org.apache.flink.languagebinding.api.java.python;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashMap;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.SortedGrouping;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.languagebinding.api.java.common.PlanBinder;
+import org.apache.flink.languagebinding.api.java.common.OperationInfo;
+import
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo;
+//CHECKSTYLE.OFF: AvoidStarImport - enum/function import
+import static
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo.*;
+import org.apache.flink.languagebinding.api.java.python.functions.*;
+//CHECKSTYLE.ON: AvoidStarImport
+import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
+import
org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
+import org.apache.flink.runtime.filecache.FileCache;
+
+/**
+ * This class allows the execution of a Flink plan written in python.
+ */
+public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
+ public static final String FLINK_PYTHON_ID = "flink";
+ public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py";
+ public static final String FLINK_PYTHON_EXECUTOR_NAME = "/executor.py";
+
+ private static final String FLINK_PYTHON_FILE_PATH =
System.getProperty("java.io.tmpdir") + "/flink_plan";
+ private static final String FLINK_PYTHON_REL_LOCAL_PATH =
"/resources/python";
+ private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
+
+ private Process process;
+
+ /**
+ * Entry point for the execution of a python plan.
+ *
+ * @param args planPath [package1[packageX[|parameter1[parameterX]]]]
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ PythonPlanBinder binder = new PythonPlanBinder();
+ binder.go(args);
+ }
+
+ private void go(String[] args) throws Exception {
+ env = ExecutionEnvironment.getExecutionEnvironment();
+
+ int split = 0;
+ for (int x = 0; x < args.length; x++) {
+ if (args[x].compareTo("|") == 0) {
+ split = x;
+ }
+ }
+
+ prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 :
split));
+ startPython(Arrays.copyOfRange(args, split == 0 ? args.length :
split + 1, args.length));
+ receivePlan();
+ distributeFiles(env);
+
+ env.execute();
+ close();
+ }
+
+
//=====Setup========================================================================================================
+ private void prepareFiles(String... filePaths) throws IOException,
URISyntaxException {
+ prepareFlinkPythonPackage();
+
+ String planPath = filePaths[0];
+ if (planPath.endsWith("/")) {
+ planPath = planPath.substring(0, planPath.length() - 1);
+ }
+ String tmpPlanPath = FLINK_PYTHON_FILE_PATH +
FLINK_PYTHON_PLAN_NAME;
+ clearPath(tmpPlanPath);
+ FileCache.copy(new Path(planPath), new Path(tmpPlanPath),
false);
+
+ for (int x = 1; x < filePaths.length; x++) {
+ copyFile(filePaths[x]);
+ }
+ }
+
+ private void startPython(String[] args) throws IOException {
+ sets = new HashMap();
+ StringBuilder argsBuilder = new StringBuilder();
+ for (String arg : args) {
+ argsBuilder.append(" ").append(arg);
+ }
+ receiver = new Receiver(null);
+ receiver.open(null);
+ process = Runtime.getRuntime().exec("python -B " +
FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
+
+ new StreamPrinter(process.getInputStream()).start();
+ new StreamPrinter(process.getErrorStream()).start();
+ }
+
+ private void close() throws IOException, URISyntaxException {
+ FileSystem hdfs = FileSystem.get(new URI(FLINK_HDFS_PATH));
+ hdfs.delete(new Path(FLINK_HDFS_PATH), true);
+
+ FileSystem local = FileSystem.getLocalFileSystem();
+ local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
+ local.delete(new Path(FLINK_TMP_DATA_DIR), true);
+
+ try {
+ receiver.close();
+ } catch (NullPointerException npe) {
+ }
+ process.destroy();
+ }
+
+ public static void prepareFlinkPythonPackage() throws IOException,
URISyntaxException {
+ String originalFilePath = FLINK_DIR.substring(0,
FLINK_DIR.length() - 7) + FLINK_PYTHON_REL_LOCAL_PATH;
+ String tempFilePath = FLINK_PYTHON_FILE_PATH;
+ clearPath(tempFilePath);
+ FileCache.copy(new Path(originalFilePath), new
Path(tempFilePath), false);
+ }
+
+ public static void prepareFlinkPythonPackage(String path) throws
IOException {
+ FileCache.copy(new Path(path), new
Path(FLINK_PYTHON_FILE_PATH), true);
+ }
+
+ public static void distributeFiles(ExecutionEnvironment env) throws
IOException, URISyntaxException {
+ clearPath(FLINK_HDFS_PATH);
+ FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new
Path(FLINK_HDFS_PATH), true);
+ env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_ID);
+ clearPath(FLINK_PYTHON_FILE_PATH);
+ }
+
+ private static void clearPath(String path) throws IOException,
URISyntaxException {
+ FileSystem fs = FileSystem.get(new URI(path));
+ if (fs.exists(new Path(path))) {
+ fs.delete(new Path(path), true);
+ }
+ }
+
+ public static String copyFile(String path) throws IOException,
URISyntaxException {
+ if (path.endsWith("/")) {
+ path = path.substring(0, path.length() - 1);
+ }
+ String identifier = path.substring(path.lastIndexOf("/"));
+ String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier;
+ clearPath(tmpFilePath);
+ FileCache.copy(new Path(path), new Path(tmpFilePath), true);
+ return identifier;
+ }
+
+ //=====Plan
Binding=================================================================================================
+ protected class PythonOperationInfo extends OperationInfo {
+ protected static final int
INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED = -1;
+ protected static final int INFO_MODE_UDF_DOUBLE_KEYED_TYPED = 0;
+ protected static final int INFO_MODE_UDF_DOUBLE_TYPED = 1;
+ protected static final int INFO_MODE_UDF_SINGLE_TYPED = 2;
+ protected static final int INFO_MODE_UDF_SINGLE_TYPED_COMBINE =
9;
+ protected static final int INFO_MODE_UDF = 3;
+ protected static final int INFO_MODE_GROUP = 4;
+ protected static final int INFO_MODE_SORT = 5;
+ protected static final int INFO_MODE_UNION = 6;
+ protected static final int INFO_MODE_PROJECT = 7;
+ protected static final int
INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED = 8;
+ protected String operator;
+ protected String meta;
+ protected boolean combine;
+
+ protected PythonOperationInfo(int mode) throws IOException {
+ parentID = (Integer) receiver.getRecord();
+ childID = (Integer) receiver.getRecord();
+ switch (mode) {
+ case
INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED:
+ keys1 = (Tuple) receiver.getRecord();
+ keys2 = (Tuple) receiver.getRecord();
+ otherID = (Integer)
receiver.getRecord();
+ types = receiver.getRecord();
+ operator = (String)
receiver.getRecord();
+ meta = (String) receiver.getRecord();
+ projectionKeys1 = (Tuple)
receiver.getRecord();
+ projectionKeys2 = (Tuple)
receiver.getRecord();
+ break;
+ case INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED:
+ otherID = (Integer)
receiver.getRecord();
+ types = receiver.getRecord();
+ operator = (String)
receiver.getRecord();
+ meta = (String) receiver.getRecord();
+ projectionKeys1 = (Tuple)
receiver.getRecord();
+ projectionKeys2 = (Tuple)
receiver.getRecord();
+ break;
+ case INFO_MODE_UDF_DOUBLE_KEYED_TYPED:
+ keys1 = (Tuple) receiver.getRecord();
+ keys2 = (Tuple) receiver.getRecord();
+ otherID = (Integer)
receiver.getRecord();
+ types = receiver.getRecord();
+ operator = (String)
receiver.getRecord();
+ meta = (String) receiver.getRecord();
+ break;
+ case INFO_MODE_UDF_DOUBLE_TYPED:
+ otherID = (Integer)
receiver.getRecord();
+ types = receiver.getRecord();
+ operator = (String)
receiver.getRecord();
+ meta = (String) receiver.getRecord();
+ break;
+ case INFO_MODE_UDF_SINGLE_TYPED:
+ types = receiver.getRecord();
+ operator = (String)
receiver.getRecord();
+ meta = (String) receiver.getRecord();
+ break;
+ case INFO_MODE_UDF_SINGLE_TYPED_COMBINE:
+ types = receiver.getRecord();
+ operator = (String)
receiver.getRecord();
+ meta = (String) receiver.getRecord();
+ combine = (Boolean)
receiver.getRecord();
+ break;
+ case INFO_MODE_UDF:
+ operator = (String)
receiver.getRecord();
+ meta = (String) receiver.getRecord();
+ break;
+ case INFO_MODE_GROUP:
+ keys1 = (Tuple) receiver.getRecord();
+ break;
+ case INFO_MODE_SORT:
+ field = (Integer) receiver.getRecord();
+ order = (Integer) receiver.getRecord();
+ break;
+ case INFO_MODE_UNION:
+ otherID = (Integer)
receiver.getRecord();
+ break;
+ case INFO_MODE_PROJECT:
+ keys1 = (Tuple) receiver.getRecord();
+ types = receiver.getRecord();
+ break;
+ }
+ }
+ }
+
+ @Override
+ protected PythonOperationInfo createOperationInfo(String identifier)
throws IOException {
+ switch (Operations.valueOf(identifier.toUpperCase())) {
+ case COGROUP:
+ return new
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_TYPED);
+ case CROSS:
+ return new
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
+ case CROSS_H:
+ return new
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
+ case CROSS_T:
+ return new
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
+ case FILTER:
+ return new PythonOperationInfo(INFO_MODE_UDF);
+ case FLATMAP:
+ return new
PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED);
+ case GROUPREDUCE:
+ return new
PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED_COMBINE);
+ case JOIN:
+ return new
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
+ case JOIN_H:
+ return new
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
+ case JOIN_T:
+ return new
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
+ case MAP:
+ return new
PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED);
+ case PROJECTION:
+ return new
PythonOperationInfo(INFO_MODE_PROJECT);
+ case REDUCE:
+ return new PythonOperationInfo(INFO_MODE_UDF);
+ case GROUPBY:
+ return new PythonOperationInfo(INFO_MODE_GROUP);
+ case SORT:
+ return new PythonOperationInfo(INFO_MODE_SORT);
+ case UNION:
+ return new PythonOperationInfo(INFO_MODE_UNION);
+ }
+ return new
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_TYPED);
+ }
+
+ @Override
+ protected DataSet applyCoGroupOperation(DataSet op1, DataSet op2, int[]
firstKeys, int[] secondKeys, PythonOperationInfo info) {
+ return
op1.coGroup(op2).where(firstKeys).equalTo(secondKeys).with(new
PythonCoGroup(info.operator, info.types, info.meta));
+ }
+
+ public static class PseudoKeySelector<X> implements KeySelector<X,
Integer> {
+ @Override
+ public Integer getKey(X value) throws Exception {
+ return 0;
+ }
+ }
+
+ @Override
+ protected DataSet applyCrossOperation(DataSet op1, DataSet op2, int
mode, PythonOperationInfo info) {
+ switch (mode) {
+ case 0:
+ return op1.join(op2).where(new
PseudoKeySelector()).equalTo(new PseudoKeySelector()).with(new
PythonCross(info.operator, info.types, info.meta));
--- End diff --
A Cross is implemented as a join where every pair matches. I don't know the
implications of doing it in such a hacky way. (The comparison overhead should
be negligible considering the current performance.)
> Fixed all JavaDoc warnings in pact-common.
> ------------------------------------------
>
> Key: FLINK-337
> URL: https://issues.apache.org/jira/browse/FLINK-337
> Project: Flink
> Issue Type: Bug
> Reporter: GitHub Import
> Labels: github-import
> Fix For: pre-apache
>
> Attachments: pull-request-337-8997712741697485992.patch
>
>
> Fixed all JavaDoc warnings in pact-common.
> I focused on fixing the build warnings but also added and updated some
> JavaDoc on the way.
> Adresses issue
> ([#330|https://github.com/stratosphere/stratosphere/issues/330] |
> [FLINK-330|https://issues.apache.org/jira/browse/FLINK-330]).
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/pull/337
> Created by: [fhueske|https://github.com/fhueske]
> Labels:
> Created at: Wed Dec 11 17:50:03 CET 2013
> State: closed
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)