[jira] [Commented] (FLINK-4927) Implement FLI-6 YARN Resource Manager

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704581#comment-15704581
 ] 

ASF GitHub Bot commented on FLINK-4927:
---

Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/2808#discussion_r89955907
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
+/**
+ * The yarn implementation of the resource manager. Used when the system 
is started
+ * via the resource framework YARN.
+ */
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+   protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The process environment variables */
+   private final Map ENV;
+
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The maximum time that TaskExecutors 

[GitHub] flink pull request #2808: [FLINK-4927] [yarn]Implement FLI-6 YARN Resource M...

2016-11-28 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/2808#discussion_r89955907
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
+/**
+ * The yarn implementation of the resource manager. Used when the system 
is started
+ * via the resource framework YARN.
+ */
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+   protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The process environment variables */
+   private final Map ENV;
+
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The maximum time that TaskExecutors may be waiting to register at 
the ResourceManager before they quit */
+   private static final FiniteDuration TASKEXECUTOR_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** Environment variable name of 

[GitHub] flink pull request #2808: [FLINK-4927] [yarn]Implement FLI-6 YARN Resource M...

2016-11-28 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/2808#discussion_r89955140
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
+/**
+ * The yarn implementation of the resource manager. Used when the system 
is started
+ * via the resource framework YARN.
+ */
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+   protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The process environment variables */
+   private final Map ENV;
+
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The maximum time that TaskExecutors may be waiting to register at 
the ResourceManager before they quit */
+   private static final FiniteDuration TASKEXECUTOR_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** Environment variable name of 

[jira] [Commented] (FLINK-4927) Implement FLI-6 YARN Resource Manager

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704565#comment-15704565
 ] 

ASF GitHub Bot commented on FLINK-4927:
---

Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/2808#discussion_r89955140
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
+/**
+ * The yarn implementation of the resource manager. Used when the system 
is started
+ * via the resource framework YARN.
+ */
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+   protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The process environment variables */
+   private final Map ENV;
+
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The maximum time that TaskExecutors 

[jira] [Commented] (FLINK-4927) Implement FLI-6 YARN Resource Manager

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704562#comment-15704562
 ] 

ASF GitHub Bot commented on FLINK-4927:
---

Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/2808#discussion_r89955045
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
+/**
+ * The yarn implementation of the resource manager. Used when the system 
is started
+ * via the resource framework YARN.
+ */
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+   protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The process environment variables */
+   private final Map ENV;
+
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The maximum time that TaskExecutors 

[GitHub] flink pull request #2808: [FLINK-4927] [yarn]Implement FLI-6 YARN Resource M...

2016-11-28 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/2808#discussion_r89955045
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
+/**
+ * The yarn implementation of the resource manager. Used when the system 
is started
+ * via the resource framework YARN.
+ */
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+   protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+   /** The process environment variables */
+   private final Map ENV;
+
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The maximum time that TaskExecutors may be waiting to register at 
the ResourceManager before they quit */
+   private static final FiniteDuration TASKEXECUTOR_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** Environment variable name of 

[jira] [Updated] (FLINK-5185) Decouple BatchTableSourceScan with TableSourceTable

2016-11-28 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-5185:
--
Description: 
As the components' relationship show in this design doc:
https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
We found it's been annoying for {{BatchTableSourceScan}} directly holding 
{{TableSourceTable}}, and refer to {{TableSource}} further. It's ok if the 
relationship is immutable, but when we want to change the {{TableSource}} when 
applying optimizations, it will cause some conflicts and misunderstanding. 

Since there is only one way to change {{TableSource}}, which is creating a new 
{{TableSourceTable}} to hold the new {{TableSource}}, and create a new 
{{BatchTableSourceScan}} pointing to the {{TableSourceTable}} which just 
created. The annoying part is the {{RelOptTable}} comes from the super class 
{{TableScan}} still holds the connection to the original {{TableSourceTable}} 
and {{TableSource}}. It will cause some misunderstanding, which one should the 
{{Scan}} rely to, and what's difference between these tables. 

Besides, {{TableSourceTable}} is not very useful in {{BatchTableSourceScan}}, 
the only thing {{Scan}} cares is the {{RowType}} it returns, since this is and 
should be decided by {{TableSource}}. So we can let {{BatchTableSourceScan}} 
directly holding {{TableSource}} instead of holding {{TableSourceTable}}.If 
some original information are needed, find table through {{RelOptTable}}. 


  was:
As the components' relationship show in this design doc:
https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
We found it's been annoying for {{BatchTableSourceScan}} directly holding 
{{TableSourceTable}}, and refer to {{TableSource}} further. It's ok if the 
relationship is immutable, but when we want to change the {{TableSource}} when 
applying optimizations, it will cause some conflicts and misunderstanding. 

Since there is only one way to change {{TableSource}}, which is creating a new 
{{TableSourceTable}} to hold the new {{TableSource}}, and create a new 
{{BatchTableSourceScan}} pointing to that {{TableSourceTable}} which just 
created. The annoying part is the {{RelOptTable}} comes from the super class 
{{TableScan}} still holds the connection to the original {{TableSourceTable}} 
and {{TableSource}}. It will cause some misunderstanding, which one should the 
{{Scan}} rely to, and what's difference between these tables. 

Besides, {{TableSourceTable}} is not very useful in {{BatchTableSourceScan}}, 
the only thing {{Scan}} cares is the {{RowType}} it returns, since this is and 
should be decided by {{TableSource}}. So we can let {{BatchTableSourceScan}} 
directly holding {{TableSource}} instead of holding {{TableSourceTable}}.If 
some original information are needed, find table through {{RelOptTable}}. 



> Decouple BatchTableSourceScan with TableSourceTable
> ---
>
> Key: FLINK-5185
> URL: https://issues.apache.org/jira/browse/FLINK-5185
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Kurt Young
>Assignee: zhangjing
>Priority: Minor
>
> As the components' relationship show in this design doc:
> https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
> We found it's been annoying for {{BatchTableSourceScan}} directly holding 
> {{TableSourceTable}}, and refer to {{TableSource}} further. It's ok if the 
> relationship is immutable, but when we want to change the {{TableSource}} 
> when applying optimizations, it will cause some conflicts and 
> misunderstanding. 
> Since there is only one way to change {{TableSource}}, which is creating a 
> new {{TableSourceTable}} to hold the new {{TableSource}}, and create a new 
> {{BatchTableSourceScan}} pointing to the {{TableSourceTable}} which just 
> created. The annoying part is the {{RelOptTable}} comes from the super class 
> {{TableScan}} still holds the connection to the original {{TableSourceTable}} 
> and {{TableSource}}. It will cause some misunderstanding, which one should 
> the {{Scan}} rely to, and what's difference between these tables. 
> Besides, {{TableSourceTable}} is not very useful in {{BatchTableSourceScan}}, 
> the only thing {{Scan}} cares is the {{RowType}} it returns, since this is 
> and should be decided by {{TableSource}}. So we can let 
> {{BatchTableSourceScan}} directly holding {{TableSource}} instead of holding 
> {{TableSourceTable}}.If some original information are needed, find table 
> through {{RelOptTable}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5185) Decouple BatchTableSourceScan with TableSourceTable

2016-11-28 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-5185:
--
Description: 
As the components' relationship show in this design doc:
https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
We found it's been annoying for {{BatchTableSourceScan}} directly holding 
{{TableSourceTable}}, and refer to {{TableSource}} further. It's ok if the 
relationship is immutable, but when we want to change the {{TableSource}} when 
applying optimizations, it will cause some conflicts and misunderstanding. 

Since there is only one way to change {{TableSource}}, which is creating a new 
{{TableSourceTable}} to hold the new {{TableSource}}, and create a new 
{{BatchTableSourceScan}} pointing to that {{TableSourceTable}} which just 
created. The annoying part is the {{RelOptTable}} comes from the super class 
{{TableScan}} still holds the connection to the original {{TableSourceTable}} 
and {{TableSource}}. It will cause some misunderstanding, which one should the 
{{Scan}} rely to, and what's difference between these tables. 

Besides, {{TableSourceTable}} is not very useful in {{BatchTableSourceScan}}, 
the only thing {{Scan}} cares is the {{RowType}} it returns, since this is and 
should be decided by {{TableSource}}. So we can let {{BatchTableSourceScan}} 
directly holding {{TableSource}} instead of holding {{TableSourceTable}}.If 
some original information are needed, find table through {{RelOptTable}}. 


  was:
As the components' relationship show in this design doc:
https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
We found it's been annoying for ``BatchTableSourceScan`` directly holding 
TableSourceTable, and refer to TableSource further. It's ok if the relationship 
is immutable, but when we want to change the TableSource when applying 
optimizations, it will cause some conflicts and misunderstanding. 

Since there is only one way to change TableSource, which is creating a new 
TableSourceTable to hold the new TableSource, and create a new 
BatchTableSourceScan pointing to that TableSourceTable which just created. The 
annoying part is the RelOptTable comes from the super class TableScan still 
holds the connection to the original TableSourceTable and TableSource. It will 
cause some misunderstanding, which one should the Scan rely to, and what's 
difference between these tables. 

Besides, TableSourceTable is not very useful in BatchTableSourceScan, the only 
thing Scan cares is the RowType it returns, since this is and should be decided 
by TableSource. So we can let BatchTableSourceScan directly holding TableSource 
instead of holding TableSourceTable.If some original information are needed, 
find table through RelOptTable. 



> Decouple BatchTableSourceScan with TableSourceTable
> ---
>
> Key: FLINK-5185
> URL: https://issues.apache.org/jira/browse/FLINK-5185
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Kurt Young
>Assignee: zhangjing
>Priority: Minor
>
> As the components' relationship show in this design doc:
> https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
> We found it's been annoying for {{BatchTableSourceScan}} directly holding 
> {{TableSourceTable}}, and refer to {{TableSource}} further. It's ok if the 
> relationship is immutable, but when we want to change the {{TableSource}} 
> when applying optimizations, it will cause some conflicts and 
> misunderstanding. 
> Since there is only one way to change {{TableSource}}, which is creating a 
> new {{TableSourceTable}} to hold the new {{TableSource}}, and create a new 
> {{BatchTableSourceScan}} pointing to that {{TableSourceTable}} which just 
> created. The annoying part is the {{RelOptTable}} comes from the super class 
> {{TableScan}} still holds the connection to the original {{TableSourceTable}} 
> and {{TableSource}}. It will cause some misunderstanding, which one should 
> the {{Scan}} rely to, and what's difference between these tables. 
> Besides, {{TableSourceTable}} is not very useful in {{BatchTableSourceScan}}, 
> the only thing {{Scan}} cares is the {{RowType}} it returns, since this is 
> and should be decided by {{TableSource}}. So we can let 
> {{BatchTableSourceScan}} directly holding {{TableSource}} instead of holding 
> {{TableSourceTable}}.If some original information are needed, find table 
> through {{RelOptTable}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5185) Decouple BatchTableSourceScan with TableSourceTable

2016-11-28 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-5185:
--
Description: 
As the components' relationship show in this design doc:
https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
We found it's been annoying for ``BatchTableSourceScan`` directly holding 
TableSourceTable, and refer to TableSource further. It's ok if the relationship 
is immutable, but when we want to change the TableSource when applying 
optimizations, it will cause some conflicts and misunderstanding. 

Since there is only one way to change TableSource, which is creating a new 
TableSourceTable to hold the new TableSource, and create a new 
BatchTableSourceScan pointing to that TableSourceTable which just created. The 
annoying part is the RelOptTable comes from the super class TableScan still 
holds the connection to the original TableSourceTable and TableSource. It will 
cause some misunderstanding, which one should the Scan rely to, and what's 
difference between these tables. 

Besides, TableSourceTable is not very useful in BatchTableSourceScan, the only 
thing Scan cares is the RowType it returns, since this is and should be decided 
by TableSource. So we can let BatchTableSourceScan directly holding TableSource 
instead of holding TableSourceTable.If some original information are needed, 
find table through RelOptTable. 


  was:
As the components' relationship show in this design doc:
https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
We found it's been annoying for 'BatchTableSourceScan' directly holding 
TableSourceTable, and refer to TableSource further. It's ok if the relationship 
is immutable, but when we want to change the TableSource when applying 
optimizations, it will cause some conflicts and misunderstanding. 

Since there is only one way to change TableSource, which is creating a new 
TableSourceTable to hold the new TableSource, and create a new 
BatchTableSourceScan pointing to that TableSourceTable which just created. The 
annoying part is the RelOptTable comes from the super class TableScan still 
holds the connection to the original TableSourceTable and TableSource. It will 
cause some misunderstanding, which one should the Scan rely to, and what's 
difference between these tables. 

Besides, TableSourceTable is not very useful in BatchTableSourceScan, the only 
thing Scan cares is the RowType it returns, since this is and should be decided 
by TableSource. So we can let BatchTableSourceScan directly holding TableSource 
instead of holding TableSourceTable.If some original information are needed, 
find table through RelOptTable. 



> Decouple BatchTableSourceScan with TableSourceTable
> ---
>
> Key: FLINK-5185
> URL: https://issues.apache.org/jira/browse/FLINK-5185
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Kurt Young
>Assignee: zhangjing
>Priority: Minor
>
> As the components' relationship show in this design doc:
> https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
> We found it's been annoying for ``BatchTableSourceScan`` directly holding 
> TableSourceTable, and refer to TableSource further. It's ok if the 
> relationship is immutable, but when we want to change the TableSource when 
> applying optimizations, it will cause some conflicts and misunderstanding. 
> Since there is only one way to change TableSource, which is creating a new 
> TableSourceTable to hold the new TableSource, and create a new 
> BatchTableSourceScan pointing to that TableSourceTable which just created. 
> The annoying part is the RelOptTable comes from the super class TableScan 
> still holds the connection to the original TableSourceTable and TableSource. 
> It will cause some misunderstanding, which one should the Scan rely to, and 
> what's difference between these tables. 
> Besides, TableSourceTable is not very useful in BatchTableSourceScan, the 
> only thing Scan cares is the RowType it returns, since this is and should be 
> decided by TableSource. So we can let BatchTableSourceScan directly holding 
> TableSource instead of holding TableSourceTable.If some original information 
> are needed, find table through RelOptTable. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5185) Decouple BatchTableSourceScan with TableSourceTable

2016-11-28 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-5185:
--
Description: 
As the components' relationship show in this design doc:
https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
We found it's been annoying for `BatchTableSourceScan` directly holding 
TableSourceTable, and refer to TableSource further. It's ok if the relationship 
is immutable, but when we want to change the TableSource when applying 
optimizations, it will cause some conflicts and misunderstanding. 

Since there is only one way to change TableSource, which is creating a new 
TableSourceTable to hold the new TableSource, and create a new 
BatchTableSourceScan pointing to that TableSourceTable which just created. The 
annoying part is the RelOptTable comes from the super class TableScan still 
holds the connection to the original TableSourceTable and TableSource. It will 
cause some misunderstanding, which one should the Scan rely to, and what's 
difference between these tables. 

Besides, TableSourceTable is not very useful in BatchTableSourceScan, the only 
thing Scan cares is the RowType it returns, since this is and should be decided 
by TableSource. So we can let BatchTableSourceScan directly holding TableSource 
instead of holding TableSourceTable.If some original information are needed, 
find table through RelOptTable. 


  was:
As the components' relationship show in this design doc:
https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
We found it's been annoying for BatchTableSourceScan directly holding 
TableSourceTable, and refer to TableSource further. It's ok if the relationship 
is immutable, but when we want to change the TableSource when applying 
optimizations, it will cause some conflicts and misunderstanding. 

Since there is only one way to change TableSource, which is creating a new 
TableSourceTable to hold the new TableSource, and create a new 
BatchTableSourceScan pointing to that TableSourceTable which just created. The 
annoying part is the RelOptTable comes from the super class TableScan still 
holds the connection to the original TableSourceTable and TableSource. It will 
cause some misunderstanding, which one should the Scan rely to, and what's 
difference between these tables. 

Besides, TableSourceTable is not very useful in BatchTableSourceScan, the only 
thing Scan cares is the RowType it returns, since this is and should be decided 
by TableSource. So we can let BatchTableSourceScan directly holding TableSource 
instead of holding TableSourceTable.If some original information are needed, 
find table through RelOptTable. 



> Decouple BatchTableSourceScan with TableSourceTable
> ---
>
> Key: FLINK-5185
> URL: https://issues.apache.org/jira/browse/FLINK-5185
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Kurt Young
>Assignee: zhangjing
>Priority: Minor
>
> As the components' relationship show in this design doc:
> https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
> We found it's been annoying for `BatchTableSourceScan` directly holding 
> TableSourceTable, and refer to TableSource further. It's ok if the 
> relationship is immutable, but when we want to change the TableSource when 
> applying optimizations, it will cause some conflicts and misunderstanding. 
> Since there is only one way to change TableSource, which is creating a new 
> TableSourceTable to hold the new TableSource, and create a new 
> BatchTableSourceScan pointing to that TableSourceTable which just created. 
> The annoying part is the RelOptTable comes from the super class TableScan 
> still holds the connection to the original TableSourceTable and TableSource. 
> It will cause some misunderstanding, which one should the Scan rely to, and 
> what's difference between these tables. 
> Besides, TableSourceTable is not very useful in BatchTableSourceScan, the 
> only thing Scan cares is the RowType it returns, since this is and should be 
> decided by TableSource. So we can let BatchTableSourceScan directly holding 
> TableSource instead of holding TableSourceTable.If some original information 
> are needed, find table through RelOptTable. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5185) Decouple BatchTableSourceScan with TableSourceTable

2016-11-28 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-5185:
--
Description: 
As the components' relationship show in this design doc:
https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
We found it's been annoying for 'BatchTableSourceScan' directly holding 
TableSourceTable, and refer to TableSource further. It's ok if the relationship 
is immutable, but when we want to change the TableSource when applying 
optimizations, it will cause some conflicts and misunderstanding. 

Since there is only one way to change TableSource, which is creating a new 
TableSourceTable to hold the new TableSource, and create a new 
BatchTableSourceScan pointing to that TableSourceTable which just created. The 
annoying part is the RelOptTable comes from the super class TableScan still 
holds the connection to the original TableSourceTable and TableSource. It will 
cause some misunderstanding, which one should the Scan rely to, and what's 
difference between these tables. 

Besides, TableSourceTable is not very useful in BatchTableSourceScan, the only 
thing Scan cares is the RowType it returns, since this is and should be decided 
by TableSource. So we can let BatchTableSourceScan directly holding TableSource 
instead of holding TableSourceTable.If some original information are needed, 
find table through RelOptTable. 


  was:
As the components' relationship show in this design doc:
https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
We found it's been annoying for `BatchTableSourceScan` directly holding 
TableSourceTable, and refer to TableSource further. It's ok if the relationship 
is immutable, but when we want to change the TableSource when applying 
optimizations, it will cause some conflicts and misunderstanding. 

Since there is only one way to change TableSource, which is creating a new 
TableSourceTable to hold the new TableSource, and create a new 
BatchTableSourceScan pointing to that TableSourceTable which just created. The 
annoying part is the RelOptTable comes from the super class TableScan still 
holds the connection to the original TableSourceTable and TableSource. It will 
cause some misunderstanding, which one should the Scan rely to, and what's 
difference between these tables. 

Besides, TableSourceTable is not very useful in BatchTableSourceScan, the only 
thing Scan cares is the RowType it returns, since this is and should be decided 
by TableSource. So we can let BatchTableSourceScan directly holding TableSource 
instead of holding TableSourceTable.If some original information are needed, 
find table through RelOptTable. 



> Decouple BatchTableSourceScan with TableSourceTable
> ---
>
> Key: FLINK-5185
> URL: https://issues.apache.org/jira/browse/FLINK-5185
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Kurt Young
>Assignee: zhangjing
>Priority: Minor
>
> As the components' relationship show in this design doc:
> https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
> We found it's been annoying for 'BatchTableSourceScan' directly holding 
> TableSourceTable, and refer to TableSource further. It's ok if the 
> relationship is immutable, but when we want to change the TableSource when 
> applying optimizations, it will cause some conflicts and misunderstanding. 
> Since there is only one way to change TableSource, which is creating a new 
> TableSourceTable to hold the new TableSource, and create a new 
> BatchTableSourceScan pointing to that TableSourceTable which just created. 
> The annoying part is the RelOptTable comes from the super class TableScan 
> still holds the connection to the original TableSourceTable and TableSource. 
> It will cause some misunderstanding, which one should the Scan rely to, and 
> what's difference between these tables. 
> Besides, TableSourceTable is not very useful in BatchTableSourceScan, the 
> only thing Scan cares is the RowType it returns, since this is and should be 
> decided by TableSource. So we can let BatchTableSourceScan directly holding 
> TableSource instead of holding TableSourceTable.If some original information 
> are needed, find table through RelOptTable. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5185) Decouple BatchTableSourceScan with TableSourceTable

2016-11-28 Thread Kurt Young (JIRA)
Kurt Young created FLINK-5185:
-

 Summary: Decouple BatchTableSourceScan with TableSourceTable
 Key: FLINK-5185
 URL: https://issues.apache.org/jira/browse/FLINK-5185
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.2.0
Reporter: Kurt Young
Assignee: zhangjing
Priority: Minor


As the components' relationship show in this design doc:
https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
We found it's been annoying for BatchTableSourceScan directly holding 
TableSourceTable, and refer to TableSource further. It's ok if the relationship 
is immutable, but when we want to change the TableSource when applying 
optimizations, it will cause some conflicts and misunderstanding. 

Since there is only one way to change TableSource, which is creating a new 
TableSourceTable to hold the new TableSource, and create a new 
BatchTableSourceScan pointing to that TableSourceTable which just created. The 
annoying part is the RelOptTable comes from the super class TableScan still 
holds the connection to the original TableSourceTable and TableSource. It will 
cause some misunderstanding, which one should the Scan rely to, and what's 
difference between these tables. 

Besides, TableSourceTable is not very useful in BatchTableSourceScan, the only 
thing Scan cares is the RowType it returns, since this is and should be decided 
by TableSource. So we can let BatchTableSourceScan directly holding TableSource 
instead of holding TableSourceTable.If some original information are needed, 
find table through RelOptTable. 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5184) Error result of compareSerialized in RowComparator class

2016-11-28 Thread godfrey he (JIRA)
godfrey he created FLINK-5184:
-

 Summary: Error result of compareSerialized in RowComparator class
 Key: FLINK-5184
 URL: https://issues.apache.org/jira/browse/FLINK-5184
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.2.0
Reporter: godfrey he


RowSerializer will write null mask for all fields in a row before serialize row 
data to  DataOutputView. 

{code:title=RowSerializer.scala|borderStyle=solid}
override def serialize(value: Row, target: DataOutputView) {
val len = fieldSerializers.length

if (value.productArity != len) {
  throw new RuntimeException("Row arity of value does not match 
serializers.")
}

// write a null mask
writeNullMask(len, value, target)

..
}

{code}

RowComparator will deserialize a row data from DataInputView when call 
compareSerialized method. However, the first parameter value of 
readIntoNullMask method is wrong, which should be the count of all fields, 
rather than the length of serializers (to deserialize the first n fields for 
comparison).

{code:title=RowComparator.scala|borderStyle=solid}
override def compareSerialized(firstSource: DataInputView, secondSource: 
DataInputView): Int = {
val len = serializers.length
val keyLen = keyPositions.length

readIntoNullMask(len, firstSource, nullMask1)
readIntoNullMask(len, secondSource, nullMask2)
..
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5128) Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704428#comment-15704428
 ] 

ASF GitHub Bot commented on FLINK-5128:
---

GitHub user Renkai opened a pull request:

https://github.com/apache/flink/pull/2893

[FLINK-5128]Get Kafka partitions in FlinkKafkaProducer only if a 
partitioner is set



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Renkai/flink FLINK-5128

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2893.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2893


commit 7c78da6f1edca675ceced255eff2fa73044edef1
Author: renkai 
Date:   2016-11-29T06:26:00Z

Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set




> Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set
> ---
>
> Key: FLINK-5128
> URL: https://issues.apache.org/jira/browse/FLINK-5128
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Renkai Ge
>Priority: Minor
>
> The fetched partitions list is only used when calling {{open(...)}} for a 
> user supplied custom partitioner in {{FlinkKafkaProducer}}.
> Therefore, we can actually only fetch the partition list if the user used a 
> partitioner (right now we always do the partition fetching).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2893: [FLINK-5128]Get Kafka partitions in FlinkKafkaProd...

2016-11-28 Thread Renkai
GitHub user Renkai opened a pull request:

https://github.com/apache/flink/pull/2893

[FLINK-5128]Get Kafka partitions in FlinkKafkaProducer only if a 
partitioner is set



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Renkai/flink FLINK-5128

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2893.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2893


commit 7c78da6f1edca675ceced255eff2fa73044edef1
Author: renkai 
Date:   2016-11-29T06:26:00Z

Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2828: [FLINK-5093] java.util.ConcurrentModificationExcep...

2016-11-28 Thread ifndef-SleePy
Github user ifndef-SleePy closed the pull request at:

https://github.com/apache/flink/pull/2828


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5093) java.util.ConcurrentModificationException is thrown when stopping TimerService

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704401#comment-15704401
 ] 

ASF GitHub Bot commented on FLINK-5093:
---

Github user ifndef-SleePy closed the pull request at:

https://github.com/apache/flink/pull/2828


> java.util.ConcurrentModificationException is thrown when stopping TimerService
> --
>
> Key: FLINK-5093
> URL: https://issues.apache.org/jira/browse/FLINK-5093
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
> Environment: FLIP-6 feature branch
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> In stop method of TimerService, removing Timeout instance while iterating the 
> map will cause a java.util.ConcurrentModificationException.
> Here is the stack:
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextEntry(HashMap.java:922)
>   at java.util.HashMap$KeyIterator.next(HashMap.java:956)
>   at 
> org.apache.flink.runtime.taskexecutor.slot.TimerService.stop(TimerService.java:63)
>   at 
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable.stop(TaskSlotTable.java:129)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.shutDown(TaskExecutor.java:224)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.shutDownInternally(TaskManagerRunner.java:135)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.shutDown(TaskManagerRunner.java:129)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdownInternally(MiniCluster.java:319)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdown(MiniCluster.java:274)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5076) Shutting down TM when shutting down new mini cluster

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704389#comment-15704389
 ] 

ASF GitHub Bot commented on FLINK-5076:
---

Github user ifndef-SleePy closed the pull request at:

https://github.com/apache/flink/pull/2817


> Shutting down TM when shutting down new mini cluster
> 
>
> Key: FLINK-5076
> URL: https://issues.apache.org/jira/browse/FLINK-5076
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
> Environment: FLIP-6 feature branch
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> Currently we don't shut down task manager when shutting down mini cluster. It 
> will cause mini cluster can not exit normally.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2817: [FLINK-5076] Shutting down TM when shutting down m...

2016-11-28 Thread ifndef-SleePy
Github user ifndef-SleePy closed the pull request at:

https://github.com/apache/flink/pull/2817


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4669) scala api createLocalEnvironment() function add default Configuration parameter

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704328#comment-15704328
 ] 

ASF GitHub Bot commented on FLINK-4669:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2541
  
@StephanEwen review it again please.


> scala api createLocalEnvironment() function add default Configuration 
> parameter
> ---
>
> Key: FLINK-4669
> URL: https://issues.apache.org/jira/browse/FLINK-4669
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: shijinkui
>
> scala program can't direct use createLocalEnvironment with custom Configure 
> object.
> such as I want to start web server in local mode, I will do such as:
> ```
> // set up execution environment
> val conf = new Configuration
> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
> ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT)
> val env = new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(
>   
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(2,
>  conf)
> )
> ```
> so we need createLocalEnvironment function have a config parameter 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2541: [FLINK-4669] scala api createLocalEnvironment() function ...

2016-11-28 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2541
  
@StephanEwen review it again please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2880: [FLINK-5171] [runtime] fix wrong use of Preconditi...

2016-11-28 Thread shuai-xu
Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/2880


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5171) Wrong use of Preconditions.checkState in TaskManagerRunner

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704030#comment-15704030
 ] 

ASF GitHub Bot commented on FLINK-5171:
---

Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/2880


> Wrong use of Preconditions.checkState in TaskManagerRunner
> --
>
> Key: FLINK-5171
> URL: https://issues.apache.org/jira/browse/FLINK-5171
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Preconditions.checkState will check the first parameter is true, if not, it 
> will throw an exception. but in TaskManagerRunner, it will throw an exception 
> if rpc port is valid.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5170) getAkkaConfig will use localhost if hostname is specified

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703870#comment-15703870
 ] 

ASF GitHub Bot commented on FLINK-5170:
---

Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/2879


> getAkkaConfig will use localhost if hostname is specified 
> --
>
> Key: FLINK-5170
> URL: https://issues.apache.org/jira/browse/FLINK-5170
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> in AkkaUtil.scala, 
> def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): 
> Config = {
> getAkkaConfig(configuration, if (hostname == null) Some((hostname, port)) 
> else None)
>   }
> when hostname is specified, it use None.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2879: [FLINK-5170] [runtime] fix mis judge of hostname i...

2016-11-28 Thread shuai-xu
Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/2879


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4692) Add tumbling group-windows for batch tables

2016-11-28 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15698090#comment-15698090
 ] 

Jark Wu edited comment on FLINK-4692 at 11/29/16 1:49 AM:
--

Hi [~fhueske] [~twalthr], I have proposed a [design 
doc|https://docs.google.com/document/d/1lzpnNUmNzn9yuCGf1RSjHuHAWm-O_v2in7y90muXI2o/edit?usp=sharing]




 for this issue and made a prototype. Could you have a look at the design ? Any 
feedbacks are welcome!




was (Author: jark):
Hi [~fhueske] [~twalthr], I have proposed a [design 
doc|https://docs.google.com/document/d/1lzpnNUmNzn9yuCGf1RSjHuHAWm-O_v2in7y90muXI2o/edit#]
 for this issue and made a prototype. Could you have a look at the design ? Any 
feedbacks are welcome!



> Add tumbling group-windows for batch tables
> ---
>
> Key: FLINK-4692
> URL: https://issues.apache.org/jira/browse/FLINK-4692
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> Add Tumble group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2892: [FLINK-5109] fix invalid content-encoding header o...

2016-11-28 Thread Hapcy
GitHub user Hapcy opened a pull request:

https://github.com/apache/flink/pull/2892

[FLINK-5109] fix invalid content-encoding header of webmonitor

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation --- not touched
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed --- Travis build failed but I had nothing to do with those 
parts of the code.

I'm kind of newbie and some other people's commits showed up as commited 
with me but I have nothing to do with them.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Hapcy/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2892.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2892


commit 7f1c76de5d2fb54ff4e9836a38bc537098649fb4
Author: tibor.moger 
Date:   2016-11-28T15:51:47Z

[FLINK-5109] fix invalid content-encoding header of webmonitor

commit 2a305c54dee1644fd787ec2d707b57c74a6b45f6
Author: Aljoscha Krettek 
Date:   2016-11-02T10:06:01Z

[FLINK-4993] Don't Allow Trigger.onMerge() to return TriggerResult

Allowing Trigger.onMerge() to return a TriggerResult is not necessary
since an onMerge() call will always be followed by an onElement() call
when adding the element that caused the merging to the merged window.
Having this complicates the internal logic of the WindowOperator and
makes writing Triggers more confusing than it has to be.

commit ad9f8dbd6aa0a460c2f763ec1041130f44beeaad
Author: Aljoscha Krettek 
Date:   2016-11-11T09:57:25Z

[FLINK-5026] Rename TimelyFlatMap to Process

commit ff3470991d0c620ae3ccedeb1326c331c9cfa443
Author: twalthr 
Date:   2016-11-17T13:17:23Z

[FLINK-4872] [types] Type erasure problem exclusively on cluster execution

This closes #2823.

commit 5d4ececb310d0d3d0d01934cb6015766bb17ca65
Author: sergey_sokur 
Date:   2016-11-17T16:28:20Z

[FLINK-5050] [build] Remove transitive JSON.org dependency

This transitive dependency has an incompatible license.

This closes #2824

commit 764739de663d1bc8de981bd7508f3aa03a4a6a0b
Author: Aljoscha Krettek 
Date:   2016-11-23T11:13:05Z

[FLINK-5181] Add Tests in StateBackendTestBase that verify Default-Value 
Behaviour

commit ba61d43c87b5228c689a10156950880ffbe23b2f
Author: Stephan Ewen 
Date:   2016-11-23T14:37:05Z

[hotfix] Flush in CsvOutputFormat before closing, to increase CI stability

commit 5cd7eb0042c71839c610f25765ac1834561bffb3
Author: Stephan Ewen 
Date:   2016-11-23T14:54:15Z

[hotfix] [tests] Harden timeout logic for TaskManager registration in 
AbstractTaskManagerProcessFailureRecoveryTest

commit dc51242a4cede4684614c86afa0d0724898343b7
Author: shijinkui 
Date:   2016-11-26T08:14:13Z

[FLINK-5168] Scaladoc annotation link use [[]] instead of {@link}

This closes #2875

commit 3ce3e104641728930f7d53a3308b74dac5cd15e5
Author: Aljoscha Krettek 
Date:   2016-11-28T12:51:51Z

[FLINK-4993] Remove Unused Import in TriggerResult

commit bb2e2d0fc2eae2a65839e937cebe0034ab81a75d
Author: Stephan Ewen 
Date:   2016-11-28T13:55:31Z

[hotfix] [docs] Add a rouch description about internal types of states and 
state backends




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5109) Invalid Content-Encoding Header in REST API responses

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703481#comment-15703481
 ] 

ASF GitHub Bot commented on FLINK-5109:
---

GitHub user Hapcy opened a pull request:

https://github.com/apache/flink/pull/2892

[FLINK-5109] fix invalid content-encoding header of webmonitor

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation --- not touched
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed --- Travis build failed but I had nothing to do with those 
parts of the code.

I'm kind of newbie and some other people's commits showed up as commited 
with me but I have nothing to do with them.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Hapcy/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2892.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2892


commit 7f1c76de5d2fb54ff4e9836a38bc537098649fb4
Author: tibor.moger 
Date:   2016-11-28T15:51:47Z

[FLINK-5109] fix invalid content-encoding header of webmonitor

commit 2a305c54dee1644fd787ec2d707b57c74a6b45f6
Author: Aljoscha Krettek 
Date:   2016-11-02T10:06:01Z

[FLINK-4993] Don't Allow Trigger.onMerge() to return TriggerResult

Allowing Trigger.onMerge() to return a TriggerResult is not necessary
since an onMerge() call will always be followed by an onElement() call
when adding the element that caused the merging to the merged window.
Having this complicates the internal logic of the WindowOperator and
makes writing Triggers more confusing than it has to be.

commit ad9f8dbd6aa0a460c2f763ec1041130f44beeaad
Author: Aljoscha Krettek 
Date:   2016-11-11T09:57:25Z

[FLINK-5026] Rename TimelyFlatMap to Process

commit ff3470991d0c620ae3ccedeb1326c331c9cfa443
Author: twalthr 
Date:   2016-11-17T13:17:23Z

[FLINK-4872] [types] Type erasure problem exclusively on cluster execution

This closes #2823.

commit 5d4ececb310d0d3d0d01934cb6015766bb17ca65
Author: sergey_sokur 
Date:   2016-11-17T16:28:20Z

[FLINK-5050] [build] Remove transitive JSON.org dependency

This transitive dependency has an incompatible license.

This closes #2824

commit 764739de663d1bc8de981bd7508f3aa03a4a6a0b
Author: Aljoscha Krettek 
Date:   2016-11-23T11:13:05Z

[FLINK-5181] Add Tests in StateBackendTestBase that verify Default-Value 
Behaviour

commit ba61d43c87b5228c689a10156950880ffbe23b2f
Author: Stephan Ewen 
Date:   2016-11-23T14:37:05Z

[hotfix] Flush in CsvOutputFormat before closing, to increase CI stability

commit 5cd7eb0042c71839c610f25765ac1834561bffb3
Author: Stephan Ewen 
Date:   2016-11-23T14:54:15Z

[hotfix] [tests] Harden timeout logic for TaskManager registration in 
AbstractTaskManagerProcessFailureRecoveryTest

commit dc51242a4cede4684614c86afa0d0724898343b7
Author: shijinkui 
Date:   2016-11-26T08:14:13Z

[FLINK-5168] Scaladoc annotation link use [[]] instead of {@link}

This closes #2875

commit 3ce3e104641728930f7d53a3308b74dac5cd15e5
Author: Aljoscha Krettek 
Date:   2016-11-28T12:51:51Z

[FLINK-4993] Remove Unused Import in TriggerResult

commit bb2e2d0fc2eae2a65839e937cebe0034ab81a75d
Author: Stephan Ewen 
Date:   2016-11-28T13:55:31Z

[hotfix] [docs] Add a rouch description about internal types of states and 
state backends




> Invalid Content-Encoding Header in REST API responses
> -
>
> 

[jira] [Commented] (FLINK-5164) Hadoop-compat IOFormat tests fail on Windows

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703251#comment-15703251
 ] 

ASF GitHub Bot commented on FLINK-5164:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2889
  
Thanks @zentol. 
Maybe add comments with a the link to the Hadoop wiki to the `assumeTrue` 
calls?

Looks good to merge otherwise.


> Hadoop-compat IOFormat tests fail on Windows
> 
>
> Key: FLINK-5164
> URL: https://issues.apache.org/jira/browse/FLINK-5164
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.1.3
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.2.0
>
>
> The HaddopMapredITCase fails on windows with the following exception:
> {code}
> java.lang.NullPointerException
>   at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
>   at org.apache.hadoop.util.Shell.runCommand(Shell.java:445)
>   at org.apache.hadoop.util.Shell.run(Shell.java:418)
>   at 
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)
>   at org.apache.hadoop.util.Shell.execCommand(Shell.java:739)
>   at org.apache.hadoop.util.Shell.execCommand(Shell.java:722)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:631)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
>   at 
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
>   at 
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:800)
>   at 
> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
>   at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:145)
>   at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:178)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I suggest to disable the test on windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2889: [FLINK-5164] Disable some Hadoop-compat tests on Windows

2016-11-28 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2889
  
Thanks @zentol. 
Maybe add comments with a the link to the Hadoop wiki to the `assumeTrue` 
calls?

Looks good to merge otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4825) Implement a RexExecutor that uses Flink's code generation

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703228#comment-15703228
 ] 

ASF GitHub Bot commented on FLINK-4825:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2884
  
+1 to merge


> Implement a RexExecutor that uses Flink's code generation
> -
>
> Key: FLINK-4825
> URL: https://issues.apache.org/jira/browse/FLINK-4825
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The added {{ReduceExpressionRule}} leads to inconsistent behavior. Because 
> some parts of an expression are evalutated using Flink's code generation and 
> some parts use Calcite's code generation.
> A very easy example: boolean expressions casted to string are represented as 
> "TRUE/FALSE" using Calcite and "true/false" using Flink.
> I propose to implement the RexExecutor interface and forward the calls to 
> Flink's code generation. Additional improvements in order to be more standard 
> compliant could be solved in new Jira issues.
> I will disable the rule and the corresponding tests till this issue is fixed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2884: [FLINK-4825] [table] Implement a RexExecutor that uses Fl...

2016-11-28 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2884
  
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703157#comment-15703157
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884981
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
+name: String,
+code: String,
+@transient returnType: TypeInformation[OUT],
+broadcastSetName: String)
+  extends RichFlatMapFunction[MULTI_IN, OUT]
+with ResultTypeQueryable[OUT]
+with FunctionCompiler[FlatJoinFunction[IN1, IN2, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  protected var function: FlatJoinFunction[IN1, IN2, OUT] = null
--- End diff --

`= null` -> `= _`


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89882087
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
 ---
@@ -32,6 +32,7 @@ import org.apache.flink.api.table.functions.ScalarFunction
 import org.apache.flink.api.table.{Row, TableEnvironment, 
ValidationException}
 import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Assert._
--- End diff --

Remove import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703161#comment-15703161
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883678
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
+   right.getRowType.getFieldList,
+   keyPairs)) {
+  createPlan(tableEnv, expectedType)
--- End diff --

We can move the logic of `createPlan` here.


> Improve perfomance of inner joins with a 

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703162#comment-15703162
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883549
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
--- End diff --

This check can be removed.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> 

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703159#comment-15703159
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884314
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowJoin}
+
+import scala.collection.JavaConversions._
--- End diff --

Please remove import.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703158#comment-15703158
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884351
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowJoin}
+
+import scala.collection.JavaConversions._
+
+class DataSetSingleRowJoinRule
+  extends ConverterRule(
+  classOf[LogicalJoin],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+if (isInnerJoin(join)) {
+  
isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+} else {
+  false
+}
+  }
+
+  private def isInnerJoin(join: LogicalJoin) = {
+join.getJoinType == JoinRelType.INNER
+  }
+
+  private def isGlobalAggregation(node: RelNode) = {
+node.isInstanceOf[LogicalAggregate] &&
+  isSingleLine(node.asInstanceOf[LogicalAggregate])
+  }
+
+  private def isSingleLine(agg: LogicalAggregate) = {
+agg.getGroupSets == null ||
+  (agg.getGroupSets.size() == 1 &&
+   agg.getGroupSets.get(0).isEmpty &&
+   agg.getGroupSet.isEmpty)
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val join = rel.asInstanceOf[LogicalJoin]
+val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+val dataSetLeftNode = RelOptRule.convert(join.getLeft, 
DataSetConvention.INSTANCE)
+val dataSetRightNode = RelOptRule.convert(join.getRight, 
DataSetConvention.INSTANCE)
+val leftIsSingle = 
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+
+new DataSetSingleRowJoin(
+  rel.getCluster,
+  traitSet,
+  dataSetLeftNode,
+  dataSetRightNode,
+  leftIsSingle,
+  rel.getRowType,
+  join.getCondition,
+  join.getRowType,
+  join.analyzeCondition.pairs.toList,
--- End diff --

this parameter can be removed. `joinCondition` includes the complete join 
predicate that we need to evaluate.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884351
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowJoin}
+
+import scala.collection.JavaConversions._
+
+class DataSetSingleRowJoinRule
+  extends ConverterRule(
+  classOf[LogicalJoin],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+if (isInnerJoin(join)) {
+  
isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+} else {
+  false
+}
+  }
+
+  private def isInnerJoin(join: LogicalJoin) = {
+join.getJoinType == JoinRelType.INNER
+  }
+
+  private def isGlobalAggregation(node: RelNode) = {
+node.isInstanceOf[LogicalAggregate] &&
+  isSingleLine(node.asInstanceOf[LogicalAggregate])
+  }
+
+  private def isSingleLine(agg: LogicalAggregate) = {
+agg.getGroupSets == null ||
+  (agg.getGroupSets.size() == 1 &&
+   agg.getGroupSets.get(0).isEmpty &&
+   agg.getGroupSet.isEmpty)
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val join = rel.asInstanceOf[LogicalJoin]
+val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+val dataSetLeftNode = RelOptRule.convert(join.getLeft, 
DataSetConvention.INSTANCE)
+val dataSetRightNode = RelOptRule.convert(join.getRight, 
DataSetConvention.INSTANCE)
+val leftIsSingle = 
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+
+new DataSetSingleRowJoin(
+  rel.getCluster,
+  traitSet,
+  dataSetLeftNode,
+  dataSetRightNode,
+  leftIsSingle,
+  rel.getRowType,
+  join.getCondition,
+  join.getRowType,
+  join.analyzeCondition.pairs.toList,
--- End diff --

this parameter can be removed. `joinCondition` includes the complete join 
predicate that we need to evaluate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883453
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
--- End diff --

We can access left and right input more easily with `this.getLeft()` and 
`this.getRight()`. No need to use `foldLeft` to aggregate the stats of the left 
and right input.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
--- End diff --

We do not need the `keyPairs` parameter. These are included in the 
`joinCondition` and have been validated before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703155#comment-15703155
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883453
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
--- End diff --

We can access left and right input more easily with `this.getLeft()` and 
`this.getRight()`. No need to use `foldLeft` to aggregate the stats of the left 
and right input.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach 

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703160#comment-15703160
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89811862
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
 ---
@@ -41,3 +41,5 @@ object GeneratedExpression {
 }
 
 case class GeneratedFunction[T](name: String, returnType: 
TypeInformation[Any], code: String)
+
+case class GeneratedField(fieldName: String, fieldType: String)
--- End diff --

This is not used anymore, right?
I think we should remove it.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703156#comment-15703156
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89882087
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
 ---
@@ -32,6 +32,7 @@ import org.apache.flink.api.table.functions.ScalarFunction
 import org.apache.flink.api.table.{Row, TableEnvironment, 
ValidationException}
 import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Assert._
--- End diff --

Remove import


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89811862
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
 ---
@@ -41,3 +41,5 @@ object GeneratedExpression {
 }
 
 case class GeneratedFunction[T](name: String, returnType: 
TypeInformation[Any], code: String)
+
+case class GeneratedField(fieldName: String, fieldType: String)
--- End diff --

This is not used anymore, right?
I think we should remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703154#comment-15703154
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884157
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
+   right.getRowType.getFieldList,
+   keyPairs)) {
+  createPlan(tableEnv, expectedType)
+} else {
+  throw TableException(
+"Join predicate on incompatible types.\n" +
+ 

[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15703163#comment-15703163
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
--- End diff --

We do not need the `keyPairs` parameter. These are included in the 
`joinCondition` and have been validated before.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884314
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowJoin}
+
+import scala.collection.JavaConversions._
--- End diff --

Please remove import.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883678
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
+   right.getRowType.getFieldList,
+   keyPairs)) {
+  createPlan(tableEnv, expectedType)
--- End diff --

We can move the logic of `createPlan` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884981
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
+name: String,
+code: String,
+@transient returnType: TypeInformation[OUT],
+broadcastSetName: String)
+  extends RichFlatMapFunction[MULTI_IN, OUT]
+with ResultTypeQueryable[OUT]
+with FunctionCompiler[FlatJoinFunction[IN1, IN2, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  protected var function: FlatJoinFunction[IN1, IN2, OUT] = null
--- End diff --

`= null` -> `= _`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89883549
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
--- End diff --

This check can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-5159] Improve perfomance of inner joins wit...

2016-11-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89884157
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, 
MapJoinRightRunner}
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinCondition: RexNode,
+joinRowType: RelDataType,
+keyPairs: List[IntPair],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinCondition,
+  joinRowType,
+  keyPairs,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  if (leftIsSingle && child.equals(right) ||
+  !leftIsSingle && child.equals(left)) {
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+  } else {
+cost
+  }
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+if (isConditionTypesCompatible(left.getRowType.getFieldList,
+   right.getRowType.getFieldList,
+   keyPairs)) {
+  createPlan(tableEnv, expectedType)
+} else {
+  throw TableException(
+"Join predicate on incompatible types.\n" +
+s"\tLeft: ${left.toString},\n" +
+s"\tRight: ${right.toString},\n" +
+s"\tCondition: ($joinConditionToString)")
+}
+  }
+
+  private def isConditionTypesCompatible(leftFields: 

[jira] [Commented] (FLINK-5169) Make consumption of input channels fair

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702953#comment-15702953
 ] 

ASF GitHub Bot commented on FLINK-5169:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2882
  
Thanks for the review, I'll address them and merge this.


> Make consumption of input channels fair
> ---
>
> Key: FLINK-5169
> URL: https://issues.apache.org/jira/browse/FLINK-5169
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Critical
> Fix For: 1.2.0, 1.1.4
>
>
> The input channels on the receiver side of the network stack queue incoming 
> data and notify the input gate about available data. These notifications 
> currently determine the order in which input channels are consumed, which can 
> lead to unfair consumption patterns where faster channels are favored over 
> slower ones.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2882: [FLINK-5169] [network] Make consumption of InputChannels ...

2016-11-28 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2882
  
Thanks for the review, I'll address them and merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5169) Make consumption of input channels fair

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702943#comment-15702943
 ] 

ASF GitHub Bot commented on FLINK-5169:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2882#discussion_r89806640
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -52,12 +51,10 @@
 
--- End diff --

Remove the unused 'LOG'


> Make consumption of input channels fair
> ---
>
> Key: FLINK-5169
> URL: https://issues.apache.org/jira/browse/FLINK-5169
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Critical
> Fix For: 1.2.0, 1.1.4
>
>
> The input channels on the receiver side of the network stack queue incoming 
> data and notify the input gate about available data. These notifications 
> currently determine the order in which input channels are consumed, which can 
> lead to unfair consumption patterns where faster channels are favored over 
> slower ones.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5169) Make consumption of input channels fair

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702944#comment-15702944
 ] 

ASF GitHub Bot commented on FLINK-5169:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2882#discussion_r89810526
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
 ---
@@ -53,10 +52,10 @@
private BufferPool bufferPool;
 
PartitionRequestServerHandler(
-   ResultPartitionProvider partitionProvider,
-   TaskEventDispatcher taskEventDispatcher,
-   PartitionRequestQueue outboundQueue,
-   NetworkBufferPool networkBufferPool) {
--- End diff --

I think this formatting change made the readability a little bit worse.


> Make consumption of input channels fair
> ---
>
> Key: FLINK-5169
> URL: https://issues.apache.org/jira/browse/FLINK-5169
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Critical
> Fix For: 1.2.0, 1.1.4
>
>
> The input channels on the receiver side of the network stack queue incoming 
> data and notify the input gate about available data. These notifications 
> currently determine the order in which input channels are consumed, which can 
> lead to unfair consumption patterns where faster channels are favored over 
> slower ones.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5169) Make consumption of input channels fair

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702946#comment-15702946
 ] 

ASF GitHub Bot commented on FLINK-5169:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2882#discussion_r89805572
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -518,7 +518,8 @@ void triggerPartitionStateCheck(ResultPartitionID 
partitionId) {
partitionId);
}
 
-   private void queueChannel(InputChannel channel) {
+   @VisibleForTesting
+   void queueChannel(InputChannel channel) {
--- End diff --

I think we can undo this change


> Make consumption of input channels fair
> ---
>
> Key: FLINK-5169
> URL: https://issues.apache.org/jira/browse/FLINK-5169
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Critical
> Fix For: 1.2.0, 1.1.4
>
>
> The input channels on the receiver side of the network stack queue incoming 
> data and notify the input gate about available data. These notifications 
> currently determine the order in which input channels are consumed, which can 
> lead to unfair consumption patterns where faster channels are favored over 
> slower ones.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2882: [FLINK-5169] [network] Make consumption of InputCh...

2016-11-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2882#discussion_r89805572
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -518,7 +518,8 @@ void triggerPartitionStateCheck(ResultPartitionID 
partitionId) {
partitionId);
}
 
-   private void queueChannel(InputChannel channel) {
+   @VisibleForTesting
+   void queueChannel(InputChannel channel) {
--- End diff --

I think we can undo this change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2882: [FLINK-5169] [network] Make consumption of InputCh...

2016-11-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2882#discussion_r89810526
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
 ---
@@ -53,10 +52,10 @@
private BufferPool bufferPool;
 
PartitionRequestServerHandler(
-   ResultPartitionProvider partitionProvider,
-   TaskEventDispatcher taskEventDispatcher,
-   PartitionRequestQueue outboundQueue,
-   NetworkBufferPool networkBufferPool) {
--- End diff --

I think this formatting change made the readability a little bit worse.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2882: [FLINK-5169] [network] Make consumption of InputCh...

2016-11-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2882#discussion_r89806640
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -52,12 +51,10 @@
 
--- End diff --

Remove the unused 'LOG'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4098) Iteration support in Python API

2016-11-28 Thread Geoffrey Mon (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702931#comment-15702931
 ] 

Geoffrey Mon commented on FLINK-4098:
-

A working implementation is available here: 
https://github.com/GEOFBOT/flink/tree/new-iterations
It probably needs more cleanup before it is production ready.

> Iteration support in Python API
> ---
>
> Key: FLINK-4098
> URL: https://issues.apache.org/jira/browse/FLINK-4098
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Affects Versions: 1.0.2
>Reporter: Geoffrey Mon
>Priority: Minor
>
> Bulk and delta iterations are not supported in the Python API. Currently 
> working on this at https://github.com/GEOFBOT/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4861) Package optional project artifacts

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702837#comment-15702837
 ] 

ASF GitHub Bot commented on FLINK-4861:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2664
  
@StephanEwen @rmetzger, why would a user copy an optional fat jar rather 
than having it included in their uber jar?

By creating fat jars, do we not have the potential for duplicate 
dependencies if more than one fat jar is included on the classpath? I don't 
think we can shade since the user may be depending on the transitive 
dependencies.


> Package optional project artifacts
> --
>
> Key: FLINK-4861
> URL: https://issues.apache.org/jira/browse/FLINK-4861
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Per the mailing list 
> [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html],
>  package the Flink libraries and connectors into subdirectories of a new 
> {{opt}} directory in the release/snapshot tarballs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts

2016-11-28 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2664
  
@StephanEwen @rmetzger, why would a user copy an optional fat jar rather 
than having it included in their uber jar?

By creating fat jars, do we not have the potential for duplicate 
dependencies if more than one fat jar is included on the classpath? I don't 
think we can shade since the user may be depending on the transitive 
dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4897) Implement Dispatcher

2016-11-28 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-4897:

Component/s: Mesos

> Implement Dispatcher
> 
>
> Key: FLINK-4897
> URL: https://issues.apache.org/jira/browse/FLINK-4897
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
> Environment: FLIP-6 feature branch
>Reporter: Eron Wright 
> Fix For: 1.3.0
>
>
> This task is to implement the dispatcher service, which acts as a remote 
> frontend for job submission.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2891: [FLINK-5129] make the BlobServer use a distributed...

2016-11-28 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/2891

[FLINK-5129] make the BlobServer use a distributed file system

Previously, the BlobServer held a local copy and in case high availability 
(HA)
is set, it also copied jar files to a distributed file system. Upon restore,
these files were copied to local store from which they are used.

This PR abstracts the BlobServer's backing file system and makes it use the
distributed file system directly in HA mode, i.e. without the local file 
system
copy. Other than that the behaviour should not change.

Secondly, BlobCache instances at the task managers also make use of this
distributed file system and download files from there instead of bothering
the blob server. As before, however, distributed files may only be deleted
by the blob server. If the distributed file system is not accessible at the 
blob
caches, the old behaviour is used.

* BlobServer: include the cluster id in the HA storage path for blobs
* make the BlobServer use the HA filesystem back-end properly:
* make the BlobCache also use a distributed file system in HA mode

@uce can you have a look?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink FLINK-5129

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2891.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2891


commit b65e74dd92bdf74b2816a0d8a26a5ebaa25ca586
Author: Nico Kruber 
Date:   2016-11-22T11:49:03Z

[hotfix] remove unused package-private BlobUtils#copyFromRecoveryPath

This was actually the same implementation as
FileSystemBlobStore#get(java.lang.String, java.io.File) and either of the 
two
could have been removed but the implementation makes most sense at the
concrete file system abstraction layer, i.e. in FileSystemBlobStore.

commit 09bdd49e6282268fd9c1b2672f0ea6222e097ca2
Author: Nico Kruber 
Date:   2016-11-23T15:11:35Z

[hotfix] do not create intermediate strings inside String.format in 
BlobUtils

commit 93938ff97fef9e39c17ac795e1e89ca9de25e028
Author: Nico Kruber 
Date:   2016-11-24T16:11:19Z

[hotfix] properly shut down the BlobServer in BlobServerRangeTest

commit c0c9d2239a767154d6071171d4c33e762e01aa62
Author: Nico Kruber 
Date:   2016-11-24T17:50:43Z

[FLINK-5129] BlobServer: include the cluster id in the HA storage path for 
blobs

Also use JUnit's TemporaryFolder in BlobRecoveryITCase, too. This makes
cleaning up simpler.

commit 8b9c7d9fd6e1ab3c7f2175a31d0e29b41b01cc61
Author: Nico Kruber 
Date:   2016-11-23T18:50:52Z

[FLINK-5129] make the BlobCache use the HA filesystem back-end properly

Previously, the BlobServer holds a local copy and in case high availability 
(HA)
is set, it also copies jar files to a distributed file system. Upon restore,
these files are copied to local store from which they are used.

This commit abstracts the BlobServer's backing file system and makes it use 
the
distributed file system directly in HA mode, i.e. without the local file 
system
copy. Other than that the behaviour does not change.

commit 249b2ea48f19c54498faa56ad45d299efaad4521
Author: Nico Kruber 
Date:   2016-11-25T16:42:05Z

[FLINK-5129] make the BlobCache also use a distributed file system in HA 
mode

* re-factor the file system abstraction in FileSystemBlobStore so that it 
can
  be used by the task managers, too, which should not be able to delete 
files
  in a distributed file system shared among different nodes
* only download blobs from the blob server if not in HA mode or the 
distributed
  file system is not accessible by the BlobCache, e.g. at the task managers

commit dd69f65a47205eb55ac8cc2c8f3aa9f7232dc8ba
Author: Nico Kruber 
Date:   2016-11-28T10:42:13Z

[FLINK-5129] restore non-HA mode unique directory setup in the blob server 
and cache

If not in high availability mode, local (and now also distributed) file 
systems
again try to set up a unique directory structure so that other instances 
with
the same configuration file or storage path do not interfere.

This was lost in 8b9c7d9fd6.

commit 76ccc9ffaaa63d6e0bd55ba7f6c08f8c1cff98cb
Author: Nico Kruber 
Date:   2016-11-28T15:19:20Z

[hotfix] add a missing "'" to FileSystemBlobStore

commit 53702add38d1087062e84a7e804b08920dfc0c23
Author: Nico Kruber 
Date:   2016-11-28T15:41:11Z

[FLINK-5129] move path-related methods from BlobUtils to 
FileSystemBlobStore 

[jira] [Commented] (FLINK-5129) make the BlobServer use a distributed file system

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702692#comment-15702692
 ] 

ASF GitHub Bot commented on FLINK-5129:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/2891

[FLINK-5129] make the BlobServer use a distributed file system

Previously, the BlobServer held a local copy and in case high availability 
(HA)
is set, it also copied jar files to a distributed file system. Upon restore,
these files were copied to local store from which they are used.

This PR abstracts the BlobServer's backing file system and makes it use the
distributed file system directly in HA mode, i.e. without the local file 
system
copy. Other than that the behaviour should not change.

Secondly, BlobCache instances at the task managers also make use of this
distributed file system and download files from there instead of bothering
the blob server. As before, however, distributed files may only be deleted
by the blob server. If the distributed file system is not accessible at the 
blob
caches, the old behaviour is used.

* BlobServer: include the cluster id in the HA storage path for blobs
* make the BlobServer use the HA filesystem back-end properly:
* make the BlobCache also use a distributed file system in HA mode

@uce can you have a look?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink FLINK-5129

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2891.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2891


commit b65e74dd92bdf74b2816a0d8a26a5ebaa25ca586
Author: Nico Kruber 
Date:   2016-11-22T11:49:03Z

[hotfix] remove unused package-private BlobUtils#copyFromRecoveryPath

This was actually the same implementation as
FileSystemBlobStore#get(java.lang.String, java.io.File) and either of the 
two
could have been removed but the implementation makes most sense at the
concrete file system abstraction layer, i.e. in FileSystemBlobStore.

commit 09bdd49e6282268fd9c1b2672f0ea6222e097ca2
Author: Nico Kruber 
Date:   2016-11-23T15:11:35Z

[hotfix] do not create intermediate strings inside String.format in 
BlobUtils

commit 93938ff97fef9e39c17ac795e1e89ca9de25e028
Author: Nico Kruber 
Date:   2016-11-24T16:11:19Z

[hotfix] properly shut down the BlobServer in BlobServerRangeTest

commit c0c9d2239a767154d6071171d4c33e762e01aa62
Author: Nico Kruber 
Date:   2016-11-24T17:50:43Z

[FLINK-5129] BlobServer: include the cluster id in the HA storage path for 
blobs

Also use JUnit's TemporaryFolder in BlobRecoveryITCase, too. This makes
cleaning up simpler.

commit 8b9c7d9fd6e1ab3c7f2175a31d0e29b41b01cc61
Author: Nico Kruber 
Date:   2016-11-23T18:50:52Z

[FLINK-5129] make the BlobCache use the HA filesystem back-end properly

Previously, the BlobServer holds a local copy and in case high availability 
(HA)
is set, it also copies jar files to a distributed file system. Upon restore,
these files are copied to local store from which they are used.

This commit abstracts the BlobServer's backing file system and makes it use 
the
distributed file system directly in HA mode, i.e. without the local file 
system
copy. Other than that the behaviour does not change.

commit 249b2ea48f19c54498faa56ad45d299efaad4521
Author: Nico Kruber 
Date:   2016-11-25T16:42:05Z

[FLINK-5129] make the BlobCache also use a distributed file system in HA 
mode

* re-factor the file system abstraction in FileSystemBlobStore so that it 
can
  be used by the task managers, too, which should not be able to delete 
files
  in a distributed file system shared among different nodes
* only download blobs from the blob server if not in HA mode or the 
distributed
  file system is not accessible by the BlobCache, e.g. at the task managers

commit dd69f65a47205eb55ac8cc2c8f3aa9f7232dc8ba
Author: Nico Kruber 
Date:   2016-11-28T10:42:13Z

[FLINK-5129] restore non-HA mode unique directory setup in the blob server 
and cache

If not in high availability mode, local (and now also distributed) file 
systems
again try to set up a unique directory structure so that other instances 
with
the same configuration file or storage path do not interfere.

This was lost in 8b9c7d9fd6.

commit 76ccc9ffaaa63d6e0bd55ba7f6c08f8c1cff98cb
Author: Nico Kruber 
Date:   2016-11-28T15:19:20Z

[hotfix] add a 

[jira] [Created] (FLINK-5182) Implement SSL file-shipping

2016-11-28 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-5182:
---

 Summary: Implement SSL file-shipping
 Key: FLINK-5182
 URL: https://issues.apache.org/jira/browse/FLINK-5182
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 
Assignee: Eron Wright 


The current handling of keystore and truststore is, the config entry is treated 
as a local file path always, and the files aren't shipped automatically.The 
behavior is problematic in YARN/Mesos deployments, where such an assumption 
doesn't always hold.  

Change the behavior to automatically ship the files and update the config 
automatically.  That behavior is consistent with how keytabs are handled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2788: [FLINK-5026] Rename TimelyFlatMap to Process

2016-11-28 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/2788


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5026) Rename TimelyFlatMap to Process

2016-11-28 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-5026.
---
Resolution: Fixed

Changed in 910f733f5ec52d2dd1e9dcc4ec6a4844cae2f2b4

> Rename TimelyFlatMap to Process
> ---
>
> Key: FLINK-5026
> URL: https://issues.apache.org/jira/browse/FLINK-5026
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0
>
>
> The method on {{KeyedDataStream}} would be called {{process()}} and the 
> function itself would be called {{ProcessFunction}}.
> The reason for this is that {{TimelyFlatMapFunction}} is a bit of a mouthful 
> and with the additions to the timer API and state the {{ProcessFunction}} 
> could become the basic, low-level, user-facing API for cases where users 
> nowadays implement their own operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5026) Rename TimelyFlatMap to Process

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702624#comment-15702624
 ] 

ASF GitHub Bot commented on FLINK-5026:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2788
  
Manually merged


> Rename TimelyFlatMap to Process
> ---
>
> Key: FLINK-5026
> URL: https://issues.apache.org/jira/browse/FLINK-5026
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0
>
>
> The method on {{KeyedDataStream}} would be called {{process()}} and the 
> function itself would be called {{ProcessFunction}}.
> The reason for this is that {{TimelyFlatMapFunction}} is a bit of a mouthful 
> and with the additions to the timer API and state the {{ProcessFunction}} 
> could become the basic, low-level, user-facing API for cases where users 
> nowadays implement their own operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2788: [FLINK-5026] Rename TimelyFlatMap to Process

2016-11-28 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2788
  
Manually merged


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5026) Rename TimelyFlatMap to Process

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702622#comment-15702622
 ] 

ASF GitHub Bot commented on FLINK-5026:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/2788


> Rename TimelyFlatMap to Process
> ---
>
> Key: FLINK-5026
> URL: https://issues.apache.org/jira/browse/FLINK-5026
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0
>
>
> The method on {{KeyedDataStream}} would be called {{process()}} and the 
> function itself would be called {{ProcessFunction}}.
> The reason for this is that {{TimelyFlatMapFunction}} is a bit of a mouthful 
> and with the additions to the timer API and state the {{ProcessFunction}} 
> could become the basic, low-level, user-facing API for cases where users 
> nowadays implement their own operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4872) Type erasure problem exclusively on cluster execution

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702591#comment-15702591
 ] 

ASF GitHub Bot commented on FLINK-4872:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2823


> Type erasure problem exclusively on cluster execution
> -
>
> Key: FLINK-4872
> URL: https://issues.apache.org/jira/browse/FLINK-4872
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.2
>Reporter: Martin Junghanns
>Assignee: Timo Walther
>
> The following codes runs fine on local and collection execution environment 
> but fails when executed on a cluster.
> {code:title=Problem.java}
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple1;
> import java.lang.reflect.Array;
> public class Problem {
>   public static class Pojo {
>   }
>   public static class Foo extends Tuple1 {
>   }
>   public static class Bar extends Tuple1 {
>   }
>   public static class UDF implements MapFunction {
> private final Class clazz;
> public UDF(Class clazz) {
>   this.clazz = clazz;
> }
> @Override
> public Bar map(Foo value) throws Exception {
>   Bar bar = new Bar<>();
>   //noinspection unchecked
>   bar.f0 = (T[]) Array.newInstance(clazz, 10);
>   return bar;
> }
>   }
>   public static void main(String[] args) throws Exception {
> // runs in local, collection and cluster execution
> withLong();
> // runs in local and collection execution, fails on cluster execution
> withPojo();
>   }
>   public static void withLong() throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> Foo foo = new Foo<>();
> foo.f0 = 42L;
> DataSet barDataSource = env.fromElements(foo);
> DataSet map = barDataSource.map(new UDF<>(Long.class));
> map.print();
>   }
>   public static void withPojo() throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> Foo foo = new Foo<>();
> foo.f0 = new Pojo();
> DataSet barDataSource = env.fromElements(foo);
> DataSet map = barDataSource.map(new UDF<>(Pojo.class));
> map.print();
>   }
> }
> {code}
> {code:title=ProblemTest.java}
> import org.apache.flink.test.util.MultipleProgramsTestBase;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> @RunWith(Parameterized.class)
> public class ProblemTest extends MultipleProgramsTestBase {
>   public ProblemTest(TestExecutionMode mode) {
> super(mode);
>   }
>   @Test
>   public void testWithLong() throws Exception {
> Problem.withLong();
>   }
>   @Test
>   public void testWithPOJO() throws Exception {
> Problem.withPojo();
>   }
> }
> {code}
> Exception:
> {code}
> The return type of function 'withPojo(Problem.java:58)' could not be 
> determined automatically, due to type erasure. You can give type information 
> hints by using the returns(...) method on the result of the transformation 
> call, or by letting your function implement the 'ResultTypeQueryable' 
> interface.
> org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
> org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
> org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> Problem.withPojo(Problem.java:60)
> Problem.main(Problem.java:38) 
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2823: [FLINK-4872] [types] Type erasure problem exclusiv...

2016-11-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2823


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5142) Resource leak in CheckpointCoordinator

2016-11-28 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5142.
-
Resolution: Fixed

Fixed via e2c53cf85c1af73c040d96dbd24b9e2cf3e8cdf6

> Resource leak in CheckpointCoordinator
> --
>
> Key: FLINK-5142
> URL: https://issues.apache.org/jira/browse/FLINK-5142
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.1, 1.1.2, 1.1.3
>Reporter: Frank Lauterwald
>Assignee: Stephan Ewen
> Fix For: 1.1.4
>
>
> We run Flink 1.1.3 with a fairly aggressive time between checkpoints and a 
> minimum interval between checkpoints to make sure that some work gets done 
> between checkpoints.
> Over time, the JobManager uses more and more CPU time until it saturates the 
> available cores. It does not show heavy I/O load and the task managers seem 
> to work without problems.
> We see lots of log messages of the form "Trying to trigger another checkpoint 
> while one was queued already" - sometimes multiple in the same millisecond.
> It seems like checkpoints are triggered way too often.
> I suspect there is a resource leak in the CheckpointCoordinator which leads 
> to this behavior:
> 
> // in triggerCheckpoint(long timestamp, long nextCheckpointId), line 414ff
> // introduced as part of FLINK-3492
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) {
> if (currentPeriodicTrigger != null) {
> currentPeriodicTrigger.cancel();
> currentPeriodicTrigger = null;
> }
> ScheduledTrigger trigger = new ScheduledTrigger();
> timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, 
> baseInterval);
> return false;
> }
> The newly created trigger is not assigned to currentPeriodicTrigger, so it 
> cannot be cancelled whenever another rescheduling is required.
> If rescheduling is common (it happens several times per minute for us), the 
> running triggers accumulate until they overwhelm the JobManager.
> Versions up to Flink 1.0.x are unaffected because FLINK-3492 is a Flink 1.1 
> feature.
> The issue seems to be already fixed in master by commit 8854d75c due to 
> (unrelated) work on FLINK-4322.
> Let me know if there's anything else I can do to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5142) Resource leak in CheckpointCoordinator

2016-11-28 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5142.
---

> Resource leak in CheckpointCoordinator
> --
>
> Key: FLINK-5142
> URL: https://issues.apache.org/jira/browse/FLINK-5142
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.1, 1.1.2, 1.1.3
>Reporter: Frank Lauterwald
>Assignee: Stephan Ewen
> Fix For: 1.1.4
>
>
> We run Flink 1.1.3 with a fairly aggressive time between checkpoints and a 
> minimum interval between checkpoints to make sure that some work gets done 
> between checkpoints.
> Over time, the JobManager uses more and more CPU time until it saturates the 
> available cores. It does not show heavy I/O load and the task managers seem 
> to work without problems.
> We see lots of log messages of the form "Trying to trigger another checkpoint 
> while one was queued already" - sometimes multiple in the same millisecond.
> It seems like checkpoints are triggered way too often.
> I suspect there is a resource leak in the CheckpointCoordinator which leads 
> to this behavior:
> 
> // in triggerCheckpoint(long timestamp, long nextCheckpointId), line 414ff
> // introduced as part of FLINK-3492
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) {
> if (currentPeriodicTrigger != null) {
> currentPeriodicTrigger.cancel();
> currentPeriodicTrigger = null;
> }
> ScheduledTrigger trigger = new ScheduledTrigger();
> timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, 
> baseInterval);
> return false;
> }
> The newly created trigger is not assigned to currentPeriodicTrigger, so it 
> cannot be cancelled whenever another rescheduling is required.
> If rescheduling is common (it happens several times per minute for us), the 
> running triggers accumulate until they overwhelm the JobManager.
> Versions up to Flink 1.0.x are unaffected because FLINK-3492 is a Flink 1.1 
> feature.
> The issue seems to be already fixed in master by commit 8854d75c due to 
> (unrelated) work on FLINK-4322.
> Let me know if there's anything else I can do to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3932) Implement State Backend Security

2016-11-28 Thread Vijay Srinivasaraghavan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vijay Srinivasaraghavan closed FLINK-3932.
--
Resolution: Fixed

> Implement State Backend Security
> 
>
> Key: FLINK-3932
> URL: https://issues.apache.org/jira/browse/FLINK-3932
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
> Fix For: 1.2.0
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Flink should protect its HA, checkpoint, and savepoint state against 
> unauthorized access.
> As described in the design doc, implement:
> - ZooKeeper authentication w/ Kerberos
> - ZooKeeper authorization (i.e. znode ACLs)
> - Checkpoint/savepoint data protection



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4918) Add SSL support to Mesos artifact server

2016-11-28 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-4918:

Issue Type: Sub-task  (was: Task)
Parent: FLINK-1984

> Add SSL support to Mesos artifact server
> 
>
> Key: FLINK-4918
> URL: https://issues.apache.org/jira/browse/FLINK-4918
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos, Security
>Reporter: Vijay Srinivasaraghavan
>Assignee: Vijay Srinivasaraghavan
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5181) Add Tests in StateBackendTestBase that verify Default-Value Behaviour

2016-11-28 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-5181.
---
Resolution: Fixed

Done in 60a4ab32e1662310da4633a97e02dca62431952e

> Add Tests in StateBackendTestBase that verify Default-Value Behaviour
> -
>
> Key: FLINK-5181
> URL: https://issues.apache.org/jira/browse/FLINK-5181
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2855: Add Tests in StateBackendTestBase that verify Defa...

2016-11-28 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/2855


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4826) Add keytab based kerberos support for Mesos environment

2016-11-28 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-4826:

Component/s: Mesos

> Add keytab based kerberos support for Mesos environment
> ---
>
> Key: FLINK-4826
> URL: https://issues.apache.org/jira/browse/FLINK-4826
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos, Security
>Reporter: Vijay Srinivasaraghavan
>Assignee: Vijay Srinivasaraghavan
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4826) Add keytab based kerberos support for Mesos environment

2016-11-28 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-4826:

Issue Type: Sub-task  (was: Task)
Parent: FLINK-1984

> Add keytab based kerberos support for Mesos environment
> ---
>
> Key: FLINK-4826
> URL: https://issues.apache.org/jira/browse/FLINK-4826
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos, Security
>Reporter: Vijay Srinivasaraghavan
>Assignee: Vijay Srinivasaraghavan
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2766: [FLINK-4898] Refactor HTTP handlers and Netty server/clie...

2016-11-28 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/2766
  
@mxm some duplication remains, yes - the KvState server/client still use a 
separate implementation (that was clearly cut/pasted).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4898) Refactor HTTP handlers and Netty server/client

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702467#comment-15702467
 ] 

ASF GitHub Bot commented on FLINK-4898:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/2766
  
@mxm some duplication remains, yes - the KvState server/client still use a 
separate implementation (that was clearly cut/pasted).


> Refactor HTTP handlers and Netty server/client
> --
>
> Key: FLINK-4898
> URL: https://issues.apache.org/jira/browse/FLINK-4898
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Minor
>
> The dispatcher requires an HTTP stack, ideally with a minimum of dependencies 
> and able to interoperate with Netty 4.0.28 (on which Flink currently 
> depends).  The `runtime-web` module has some home-grown HTTP handlers 
> already, and the `runtime` module has some low-level Netty code worth reusing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4898) Refactor HTTP handlers and Netty server/client

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702459#comment-15702459
 ] 

ASF GitHub Bot commented on FLINK-4898:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/2766
  
@uce happy to further discuss this PR, why it helps with making the Netty 
client/server code more reusable.   Also provides some REST foundation code to 
be used later for FLINK-4897.


> Refactor HTTP handlers and Netty server/client
> --
>
> Key: FLINK-4898
> URL: https://issues.apache.org/jira/browse/FLINK-4898
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Minor
>
> The dispatcher requires an HTTP stack, ideally with a minimum of dependencies 
> and able to interoperate with Netty 4.0.28 (on which Flink currently 
> depends).  The `runtime-web` module has some home-grown HTTP handlers 
> already, and the `runtime` module has some low-level Netty code worth reusing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2766: [FLINK-4898] Refactor HTTP handlers and Netty server/clie...

2016-11-28 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/2766
  
@uce happy to further discuss this PR, why it helps with making the Netty 
client/server code more reusable.   Also provides some REST foundation code to 
be used later for FLINK-4897.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2827: [FLINK-4921] Upgrade to Mesos 1.0.1

2016-11-28 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/2827
  
We do lose back-compat but gain an important feature, the ability to ship 
files to sub-directories within the container.   I've used this ability for 
FLINK-5091.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4921) Upgrade to Mesos 1.0.1

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702429#comment-15702429
 ] 

ASF GitHub Bot commented on FLINK-4921:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/2827
  
We do lose back-compat but gain an important feature, the ability to ship 
files to sub-directories within the container.   I've used this ability for 
FLINK-5091.


> Upgrade to Mesos 1.0.1
> --
>
> Key: FLINK-4921
> URL: https://issues.apache.org/jira/browse/FLINK-4921
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
> Fix For: 1.2.0
>
>
> Upgrade the client library to 1.0.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1984) Integrate Flink with Apache Mesos

2016-11-28 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-1984:

Component/s: Mesos

> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Fix For: 1.2.0
>
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Update (Oct '16): the core functionality is in the master branch.   New 
> sub-tasks track remaining work for a first release.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1984) Integrate Flink with Apache Mesos

2016-11-28 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-1984:

Fix Version/s: 1.2.0

> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Fix For: 1.2.0
>
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Update (Oct '16): the core functionality is in the master branch.   New 
> sub-tasks track remaining work for a first release.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5093) java.util.ConcurrentModificationException is thrown when stopping TimerService

2016-11-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-5093.
--
Resolution: Fixed

Fixed via 029db00c1d74222fd9f67b08213668fd0eea1e4d

> java.util.ConcurrentModificationException is thrown when stopping TimerService
> --
>
> Key: FLINK-5093
> URL: https://issues.apache.org/jira/browse/FLINK-5093
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
> Environment: FLIP-6 feature branch
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> In stop method of TimerService, removing Timeout instance while iterating the 
> map will cause a java.util.ConcurrentModificationException.
> Here is the stack:
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextEntry(HashMap.java:922)
>   at java.util.HashMap$KeyIterator.next(HashMap.java:956)
>   at 
> org.apache.flink.runtime.taskexecutor.slot.TimerService.stop(TimerService.java:63)
>   at 
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable.stop(TaskSlotTable.java:129)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.shutDown(TaskExecutor.java:224)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.shutDownInternally(TaskManagerRunner.java:135)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.shutDown(TaskManagerRunner.java:129)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdownInternally(MiniCluster.java:319)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdown(MiniCluster.java:274)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2828: [FLINK-5093] java.util.ConcurrentModificationException is...

2016-11-28 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2828
  
I've merged your PR. Thanks for your contribution @ifndef-SleePy. You can 
close this PR now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5076) Shutting down TM when shutting down new mini cluster

2016-11-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-5076.
--
Resolution: Fixed

Fixed via dc8254d4b3eb7333d1e3a2717e01bab051da33a1

> Shutting down TM when shutting down new mini cluster
> 
>
> Key: FLINK-5076
> URL: https://issues.apache.org/jira/browse/FLINK-5076
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
> Environment: FLIP-6 feature branch
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> Currently we don't shut down task manager when shutting down mini cluster. It 
> will cause mini cluster can not exit normally.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5093) java.util.ConcurrentModificationException is thrown when stopping TimerService

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702403#comment-15702403
 ] 

ASF GitHub Bot commented on FLINK-5093:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2828
  
I've merged your PR. Thanks for your contribution @ifndef-SleePy. You can 
close this PR now.


> java.util.ConcurrentModificationException is thrown when stopping TimerService
> --
>
> Key: FLINK-5093
> URL: https://issues.apache.org/jira/browse/FLINK-5093
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
> Environment: FLIP-6 feature branch
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> In stop method of TimerService, removing Timeout instance while iterating the 
> map will cause a java.util.ConcurrentModificationException.
> Here is the stack:
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextEntry(HashMap.java:922)
>   at java.util.HashMap$KeyIterator.next(HashMap.java:956)
>   at 
> org.apache.flink.runtime.taskexecutor.slot.TimerService.stop(TimerService.java:63)
>   at 
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable.stop(TaskSlotTable.java:129)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.shutDown(TaskExecutor.java:224)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.shutDownInternally(TaskManagerRunner.java:135)
>   at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.shutDown(TaskManagerRunner.java:129)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdownInternally(MiniCluster.java:319)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdown(MiniCluster.java:274)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2817: [FLINK-5076] Shutting down TM when shutting down mini clu...

2016-11-28 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2817
  
I've merged your PR. You can close it now @ifndef-SleePy. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5076) Shutting down TM when shutting down new mini cluster

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702399#comment-15702399
 ] 

ASF GitHub Bot commented on FLINK-5076:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2817
  
I've merged your PR. You can close it now @ifndef-SleePy. 


> Shutting down TM when shutting down new mini cluster
> 
>
> Key: FLINK-5076
> URL: https://issues.apache.org/jira/browse/FLINK-5076
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
> Environment: FLIP-6 feature branch
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Minor
>
> Currently we don't shut down task manager when shutting down mini cluster. It 
> will cause mini cluster can not exit normally.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5170) getAkkaConfig will use localhost if hostname is specified

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702393#comment-15702393
 ] 

ASF GitHub Bot commented on FLINK-5170:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2879
  
I've merged your PR. You can close it @shuai-xu now.


> getAkkaConfig will use localhost if hostname is specified 
> --
>
> Key: FLINK-5170
> URL: https://issues.apache.org/jira/browse/FLINK-5170
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> in AkkaUtil.scala, 
> def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): 
> Config = {
> getAkkaConfig(configuration, if (hostname == null) Some((hostname, port)) 
> else None)
>   }
> when hostname is specified, it use None.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5170) getAkkaConfig will use localhost if hostname is specified

2016-11-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-5170.
--
Resolution: Fixed

Fixed via 8265b545cff2fb5e6ca3771effd46a2fda488576

> getAkkaConfig will use localhost if hostname is specified 
> --
>
> Key: FLINK-5170
> URL: https://issues.apache.org/jira/browse/FLINK-5170
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> in AkkaUtil.scala, 
> def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): 
> Config = {
> getAkkaConfig(configuration, if (hostname == null) Some((hostname, port)) 
> else None)
>   }
> when hostname is specified, it use None.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2879: [FLINK-5170] [runtime] fix mis judge of hostname in AkkaU...

2016-11-28 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2879
  
I've merged your PR. You can close it @shuai-xu now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5171) Wrong use of Preconditions.checkState in TaskManagerRunner

2016-11-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-5171.
--
Resolution: Fixed

Fixed via 09ec78a5bd5db1b74c267c8a69e182543135a161

> Wrong use of Preconditions.checkState in TaskManagerRunner
> --
>
> Key: FLINK-5171
> URL: https://issues.apache.org/jira/browse/FLINK-5171
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Preconditions.checkState will check the first parameter is true, if not, it 
> will throw an exception. but in TaskManagerRunner, it will throw an exception 
> if rpc port is valid.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   4   >