[jira] [Assigned] (FLINK-12623) Flink on yarn encountered AMRMClientImpl does not update AMRM token properly

2019-05-24 Thread dalongliu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu reassigned FLINK-12623:
-

Assignee: dalongliu

> Flink on yarn encountered AMRMClientImpl does not update AMRM token properly
> 
>
> Key: FLINK-12623
> URL: https://issues.apache.org/jira/browse/FLINK-12623
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 1.9.0, 2.0.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: patch
> Fix For: 1.9.0, 2.0.0
>
>
> Hi, all! When my task running on yarn dependency flink verison is 1.7.1, I 
> encountered the following problem:
> {panel}
> [org.apache.hadoop.ipc.Client ] >>> msg=Exception encountered while 
> connecting to the server : 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 
> 03:31:04.163] [WARN] [AMRM Heartbeater thread] [org.apache.hadoop.ipc.Client 
> ] >>> msg=Exception encountered while connecting to the server : 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 
> 03:31:04.168] [INFO] [AMRM Heartbeater thread] 
> [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm1sg [2019-05-25 03:31:04.168] [INFO] [AMRM Heartbeater 
> thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm1sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater 
> thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm2sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater 
> thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm2sg
> {panel}
> Refering to the yarn issue[AMRMClientImpl does not update AMRM token 
> properly| https://issues.apache.org/jira/browse/YARN-3103] , I know this 
> problem is 2.4.1 version yarn bug, this bug is fixed in 2.7.0 verison yarn. 
> However, flink shaded hadoop version is 2.4.1, so I think this bug, flink 
> should shade 2.7.0 version hadoop or a higher version, is my idea correct?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12623) Flink on yarn encountered AMRMClientImpl does not update AMRM token properly

2019-05-24 Thread dalongliu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu reassigned FLINK-12623:
-

Assignee: dalongliu

> Flink on yarn encountered AMRMClientImpl does not update AMRM token properly
> 
>
> Key: FLINK-12623
> URL: https://issues.apache.org/jira/browse/FLINK-12623
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 1.9.0, 2.0.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: patch
> Fix For: 1.9.0, 2.0.0
>
>
> Hi, all! When my task running on yarn dependency flink verison is 1.7.1, I 
> encountered the following problem:
> {panel}
> [org.apache.hadoop.ipc.Client ] >>> msg=Exception encountered while 
> connecting to the server : 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 
> 03:31:04.163] [WARN] [AMRM Heartbeater thread] [org.apache.hadoop.ipc.Client 
> ] >>> msg=Exception encountered while connecting to the server : 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 
> 03:31:04.168] [INFO] [AMRM Heartbeater thread] 
> [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm1sg [2019-05-25 03:31:04.168] [INFO] [AMRM Heartbeater 
> thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm1sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater 
> thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm2sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater 
> thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm2sg
> {panel}
> Refering to the yarn issue[AMRMClientImpl does not update AMRM token 
> properly| https://issues.apache.org/jira/browse/YARN-3103] , I know this 
> problem is 2.4.1 version yarn bug, this bug is fixed in 2.7.0 verison yarn. 
> However, flink shaded hadoop version is 2.4.1, so I think this bug, flink 
> should shade 2.7.0 version hadoop or a higher version, is my idea correct?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12623) Flink on yarn encountered AMRMClientImpl does not update AMRM token properly

2019-05-24 Thread dalongliu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu reassigned FLINK-12623:
-

Assignee: (was: dalongliu)

> Flink on yarn encountered AMRMClientImpl does not update AMRM token properly
> 
>
> Key: FLINK-12623
> URL: https://issues.apache.org/jira/browse/FLINK-12623
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 1.9.0, 2.0.0
>Reporter: dalongliu
>Priority: Major
>  Labels: patch
> Fix For: 1.9.0, 2.0.0
>
>
> Hi, all! When my task running on yarn dependency flink verison is 1.7.1, I 
> encountered the following problem:
> {panel}
> [org.apache.hadoop.ipc.Client ] >>> msg=Exception encountered while 
> connecting to the server : 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 
> 03:31:04.163] [WARN] [AMRM Heartbeater thread] [org.apache.hadoop.ipc.Client 
> ] >>> msg=Exception encountered while connecting to the server : 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 
> 03:31:04.168] [INFO] [AMRM Heartbeater thread] 
> [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm1sg [2019-05-25 03:31:04.168] [INFO] [AMRM Heartbeater 
> thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm1sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater 
> thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm2sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater 
> thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
> msg=Failing over to rm2sg
> {panel}
> Refering to the yarn issue[AMRMClientImpl does not update AMRM token 
> properly| https://issues.apache.org/jira/browse/YARN-3103] , I know this 
> problem is 2.4.1 version yarn bug, this bug is fixed in 2.7.0 verison yarn. 
> However, flink shaded hadoop version is 2.4.1, so I think this bug, flink 
> should shade 2.7.0 version hadoop or a higher version, is my idea correct?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12623) Flink on yarn encountered AMRMClientImpl does not update AMRM token properly

2019-05-24 Thread dalongliu (JIRA)
dalongliu created FLINK-12623:
-

 Summary: Flink on yarn encountered AMRMClientImpl does not update 
AMRM token properly
 Key: FLINK-12623
 URL: https://issues.apache.org/jira/browse/FLINK-12623
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hadoop Compatibility
Affects Versions: 1.9.0, 2.0.0
Reporter: dalongliu
 Fix For: 1.9.0, 2.0.0


Hi, all! When my task running on yarn dependency flink verison is 1.7.1, I 
encountered the following problem:
{panel}
[org.apache.hadoop.ipc.Client ] >>> msg=Exception encountered while connecting 
to the server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 
03:31:04.163] [WARN] [AMRM Heartbeater thread] [org.apache.hadoop.ipc.Client ] 
>>> msg=Exception encountered while connecting to the server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 
03:31:04.168] [INFO] [AMRM Heartbeater thread] 
[org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
msg=Failing over to rm1sg [2019-05-25 03:31:04.168] [INFO] [AMRM Heartbeater 
thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
msg=Failing over to rm1sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater 
thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
msg=Failing over to rm2sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater 
thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> 
msg=Failing over to rm2sg
{panel}
Refering to the yarn issue[AMRMClientImpl does not update AMRM token properly| 
https://issues.apache.org/jira/browse/YARN-3103] , I know this problem is 2.4.1 
version yarn bug, this bug is fixed in 2.7.0 verison yarn. However, flink 
shaded hadoop version is 2.4.1, so I think this bug, flink should shade 2.7.0 
version hadoop or a higher version, is my idea correct?

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] danny0405 commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-24 Thread GitBox
danny0405 commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-495854327
 
 
   CALCITE-3055 is a vital regression for VolcanoPlanner, it would be fixed in 
1.20.0, i think we should skip 1.19.0 version Calcite, and use 1.20.0 directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548639
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+/**
+ * HiveTableOutputFormat used to write data to hive table, including 
non-partition and partitioned table.
+ */
+public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase implements InitializeOnMaster,
+   FinalizeOnMaster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOutputFormat.class);
+
+   private static final long serialVersionUID = 1L;
+
+   private transient JobConf jobConf;
+   private transient String dbName;
+   private transient String tableName;
+   private transient List partitionCols;
+   private

[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548207
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
 
 Review comment:
   try avoid use guava. Flink has its own `Preconditions`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548439
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+/**
+ * HiveTableOutputFormat used to write data to hive table, including 
non-partition and partitioned table.
+ */
+public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase implements InitializeOnMaster,
+   FinalizeOnMaster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOutputFormat.class);
+
+   private static final long serialVersionUID = 1L;
+
+   private transient JobConf jobConf;
+   private transient String dbName;
+   private transient String tableName;
+   private transient List partitionCols;
+   private

[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548474
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+/**
+ * HiveTableOutputFormat used to write data to hive table, including 
non-partition and partitioned table.
+ */
+public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase implements InitializeOnMaster,
+   FinalizeOnMaster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOutputFormat.class);
+
+   private static final long serialVersionUID = 1L;
+
+   private transient JobConf jobConf;
+   private transient String dbName;
+   private transient String tableName;
+   private transient List partitionCols;
+   private

[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287526502
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -239,13 +253,10 @@ under the License.


 
-   
-

org.apache.hive
hive-exec
${hive.version}
-   test
 
 Review comment:
   can you open an immediate followup PR and add new libraries packaged in 
flink-connector-hive to license notice file? 
https://github.com/apache/flink/pull/8205 is a good example


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548246
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+/**
+ * HiveTableOutputFormat used to write data to hive table, including 
non-partition and partitioned table.
+ */
+public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase implements InitializeOnMaster,
+   FinalizeOnMaster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOutputFormat.class);
+
+   private static final long serialVersionUID = 1L;
 
 Review comment:
   use a more meaningful serial uuid it needs to be serde?


This is an automated message from the Apache Gi

[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548780
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -590,6 +578,17 @@ private  static Table instantiateHiveTable(ObjectPath 
tablePath, CatalogBaseTabl
return hiveTable;
}
 
+   private static void setStorageFormat(StorageDescriptor sd, Map properties) {
+   // TODO: simply use text format for now
+   String storageFormatName = DEFAULT_HIVE_TABLE_STORAGE_FORMAT;
+   StorageFormatDescriptor sfDescriptor = 
storageFormatFactory.get(storageFormatName);
+   checkArgument(sfDescriptor != null, "Unknown storage format " + 
storageFormatName);
 
 Review comment:
   move input checks to the top?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548230
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
 
 Review comment:
   let's not static import a method


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548916
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
 ##
 @@ -50,6 +54,7 @@ private static HiveConf getHiveConf() throws IOException {
HiveConf hiveConf = new HiveConf();
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, 
TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, 
warehouseUri);
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "");
 
 Review comment:
   why set it to empty string? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548573
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+/**
+ * HiveTableOutputFormat used to write data to hive table, including 
non-partition and partitioned table.
+ */
+public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase implements InitializeOnMaster,
+   FinalizeOnMaster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOutputFormat.class);
+
+   private static final long serialVersionUID = 1L;
+
+   private transient JobConf jobConf;
+   private transient String dbName;
+   private transient String tableName;
+   private transient List partitionCols;
+   private

[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548805
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -590,6 +578,17 @@ private  static Table instantiateHiveTable(ObjectPath 
tablePath, CatalogBaseTabl
return hiveTable;
}
 
+   private static void setStorageFormat(StorageDescriptor sd, Map properties) {
+   // TODO: simply use text format for now
+   String storageFormatName = DEFAULT_HIVE_TABLE_STORAGE_FORMAT;
+   StorageFormatDescriptor sfDescriptor = 
storageFormatFactory.get(storageFormatName);
 
 Review comment:
   nit: can we use full name for variables and not abbreviation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548672
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+/**
+ * HiveTableOutputFormat used to write data to hive table, including 
non-partition and partitioned table.
+ */
+public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase implements InitializeOnMaster,
+   FinalizeOnMaster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOutputFormat.class);
+
+   private static final long serialVersionUID = 1L;
+
+   private transient JobConf jobConf;
+   private transient String dbName;
+   private transient String tableName;
+   private transient List partitionCols;
+   private

[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548343
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+/**
+ * HiveTableOutputFormat used to write data to hive table, including 
non-partition and partitioned table.
+ */
+public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase implements InitializeOnMaster,
+   FinalizeOnMaster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOutputFormat.class);
+
+   private static final long serialVersionUID = 1L;
+
+   private transient JobConf jobConf;
+   private transient String dbName;
+   private transient String tableName;
+   private transient List partitionCols;
+   private

[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548859
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.io.IOException;
+
+/**
+ * Util class for accessing Hive tables.
+ */
+public class HiveTableUtil {
+
+   private HiveTableUtil() {
+   }
+
+   /**
+* Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
+*/
+   public static ObjectInspector getObjectInspector(TypeInformation 
flinkType) throws IOException {
+   return getObjectInspector(toHiveTypeInfo(flinkType));
+   }
+
+   // TODO: reuse Hive's TypeInfoUtils?
+   private static ObjectInspector getObjectInspector(TypeInfo type) throws 
IOException {
+   switch (type.getCategory()) {
+
+   case PRIMITIVE:
+   PrimitiveTypeInfo primitiveType = 
(PrimitiveTypeInfo) type;
+   return 
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
+
+   // TODO: support complex types
+   default:
+   throw new IOException("Unsupported Hive type 
category " + type.getCategory());
+   }
+   }
+
+   /**
+* Converts a Flink {@link TypeInformation} to corresponding Hive 
{@link TypeInfo}.
+*/
+   public static TypeInfo toHiveTypeInfo(TypeInformation flinkType) {
 
 Review comment:
   isn't this dup of `HiveTypeUtil.toHiveType()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287548881
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfigOptions;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveCatalogTable;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.runtime.utils.BatchTableEnvUtil;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import scala.collection.JavaConversions;
+
+/**
+ * Tests {@link HiveTableSink}.
+ */
+public class HiveTableSinkTest {
+
+   private static HiveCatalog hiveCatalog;
+   private static HiveConf hiveConf;
+
+   @BeforeClass
+   public static void createCatalog() throws IOException {
+   hiveConf = HiveTestUtils.getHiveConf();
+   hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
+   hiveCatalog.open();
+   }
+
+   @AfterClass
+   public static void closeCatalog() {
+   if (hiveCatalog != null) {
+   hiveCatalog.close();
+   }
+   }
+
+   @Test
+   public void testInsertIntoNonPartitionTable() throws Exception {
+   final String dbName = "default";
+   final String tblName = "dest";
+   ObjectPath tablePath = new ObjectPath(dbName, tblName);
+   TableSchema tableSchema = new TableSchema(
+   new String[]{"a", "b", "c", "d", "e"},
+   new TypeInformation[]{
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.LONG_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO}
+   );
+   HiveCatalogTable catalogTable = new 
HiveCatalogTable(tableSchema, new HashMap<>(), "");
+   hiveCatalog.createTable(tablePath, catalogTable, false);
+   BatchTableEnvironment tableEnv = createTableEnv(1);
+   Table table = getSmall5TupleDataSet(tableEnv);
+   HiveTableSink hiveTableSink = new HiveTableSink(new 
JobConf(hiveConf),
+   new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames()), dbName, tblName, Collections.emptyList());
+   tableEnv.writeToSink(table, hiveTableSink, null);
+   tableEnv.execute();
+   // TODO: verify data is written properly
+   }
+
+   private BatchTableEnvironment createTableEnv(int parallelism) {
+   Configuration config = new Configuration();
+   
config.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1);
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(parallelism, config);
+   return BatchTableEnviro

[GitHub] [flink] yanghua commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN

2019-05-24 Thread GitBox
yanghua commented on a change in pull request #8438: [FLINK-12152] Make the 
vcore that Application Master used configurable for Flink on YARN
URL: https://github.com/apache/flink/pull/8438#discussion_r287548190
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 ##
 @@ -49,6 +49,14 @@
.defaultValue(-1)
.withDescription("The port where the application master 
RPC system is listening.");
 
+   /**
+* The vcores used by YARN application master.
+*/
+   public static final ConfigOption APP_MASTER_VCORES =
+   key("yarn.appmaster.vcores")
+   .defaultValue(1)
+   .withDescription("The number of virtual cores (vcores) used by 
YARN application master.");
 
 Review comment:
   I have tried, but it does not take effect. In addition, it seems all the 
generated HTML documentation are not tracked by git.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file

2019-05-24 Thread GitBox
flinkbot commented on issue #8541: [FLINK-9172][sql-client] Support catalogs in 
SQL-Client yaml config file
URL: https://github.com/apache/flink/pull/8541#issuecomment-495825993
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 opened a new pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file

2019-05-24 Thread GitBox
bowenli86 opened a new pull request #8541: [FLINK-9172][sql-client] Support 
catalogs in SQL-Client yaml config file
URL: https://github.com/apache/flink/pull/8541
 
 
   ## What is the purpose of the change
   
   This PR adds support for **basic** catalog entries in SQL-Client yaml config 
file, adds `CatalogFactory` and `CatalogDescriptor`, and hooks them up with SQL 
Client thru table factory discovery service.
   
   Please refer to the design in 
[doc](https://docs.google.com/document/d/1ALxfiGZBaZ8KUNJtoT443hReoPoJEdt9Db2wiJ2LuWA/edit).
   
   Note that more fields of catalog entries and factories for existing catalogs 
will be added in the next few PRs. 
   
   ## Brief change log
   
   - Created `CatalogFactory`, `CatalogDescriptor`, 
`CatalogDescriptorValidator` in flink table modules
   - Created `CatalogEntry` and made SQL CLI handle catalog entries in yaml file
   - Added unit tests
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - see new unit tests in this PR
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? docs will be added later when the 
feature completes
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12622) The process function compiles error in MyProcessWindowFunction in windows.md

2019-05-24 Thread chaiyongqiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848003#comment-16848003
 ] 

chaiyongqiang commented on FLINK-12622:
---

[https://github.com/apache/flink/pull/8540/commits]

> The process function compiles error in MyProcessWindowFunction in windows.md
> 
>
> Key: FLINK-12622
> URL: https://issues.apache.org/jira/browse/FLINK-12622
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.8.0
>Reporter: chaiyongqiang
>Priority: Minor
>
> The process function defined as below in windows.md : 
> {quote}class MyProcessWindowFunction extends ProcessWindowFunction[(String, 
> Long), String, String, TimeWindow] {
>   def process(key: String, context: Context, input: Iterable[(String, Long)], 
> out: Collector[String]): () = {
> var count = 0L
> for (in <- input) {
>   count = count + 1
> }
> out.collect(s"Window ${context.window} count: $count")
>   }
> }{quote}
> The process function defined in ProcessWindowFunction  has a return vlue of 
> Unit , But the override in MyProcessWindowFunction doesn't match it well. 
> When compiling MyProcessWindowFunction , it comes an error like the following 
> :
> {quote}Error:(37, 109) '=>' expected but '=' found.
>   def process(key: String, context: Context, input: Iterable[(String, Long)], 
> out: Collector[String]) : ()  = {{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9172) Support catalogs in SQL-Client yaml config file

2019-05-24 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-9172:

Summary: Support catalogs in SQL-Client yaml config file  (was: Support 
external catalogs in SQL-Client)

> Support catalogs in SQL-Client yaml config file
> ---
>
> Key: FLINK-9172
> URL: https://issues.apache.org/jira/browse/FLINK-9172
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Reporter: Rong Rong
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> It doesn't seem that the configuration (YAML) file allows specifications of 
> external catalogs currently. The request here is to add support for external 
> catalog specifications in YAML file. User should also be able to specify one 
> catalog is the default.
> It will be great to have SQL-Client to support some external catalogs 
> out-of-the-box for SQL users to configure and utilize easily. I am currently 
> think of having an external catalog factory that spins up both streaming and 
> batch external catalog table sources and sinks. This could greatly unify and 
> provide easy access for SQL users. 
> The catalog-related configurations then need to be processed and passed to 
> TableEnvironment accordingly by calling relevant APIs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12297) Clean the closure for OutputTags in PatternStream

2019-05-24 Thread aitozi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848002#comment-16848002
 ] 

aitozi commented on FLINK-12297:


Hi,[~aljoscha]
Since you have noticed this issue, can you take a look on this PR to give me 
some suggestion since you are the author for the ClosureCleaner . 3Q.

> Clean the closure for OutputTags in PatternStream
> -
>
> Key: FLINK-12297
> URL: https://issues.apache.org/jira/browse/FLINK-12297
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.8.0
>Reporter: Dawid Wysakowicz
>Assignee: aitozi
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Right now we do not invoke closure cleaner on output tags. Therefore such 
> code:
> {code}
>   @Test
>   public void testFlatSelectSerialization() throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   DataStreamSource elements = env.fromElements(1, 2, 3);
>   OutputTag outputTag = new OutputTag("AAA") {};
>   CEP.pattern(elements, Pattern.begin("A")).flatSelect(
>   outputTag,
>   new PatternFlatTimeoutFunction() {
>   @Override
>   public void timeout(
>   Map> pattern,
>   long timeoutTimestamp,
>   Collector out) throws 
> Exception {
>   }
>   },
>   new PatternFlatSelectFunction() {
>   @Override
>   public void flatSelect(Map List> pattern, Collector out) throws Exception {
>   }
>   }
>   );
>   env.execute();
>   }
> {code}
> will fail with {{The implementation of the PatternFlatSelectAdapter is not 
> serializable. }} exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yanghua commented on issue #8538: [FLINK-11283] Accessing the key when processing connected keyed stream

2019-05-24 Thread GitBox
yanghua commented on issue #8538: [FLINK-11283] Accessing the key when 
processing connected keyed stream
URL: https://github.com/apache/flink/pull/8538#issuecomment-495824161
 
 
   There is a failure when downloading kafka_2.11-2.2.0.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8540: Change the return value for process function in MyProcessWindowFunction

2019-05-24 Thread GitBox
flinkbot commented on issue #8540: Change the return value for process function 
in MyProcessWindowFunction
URL: https://github.com/apache/flink/pull/8540#issuecomment-495821941
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] cyq89051127 opened a new pull request #8540: Change the return value for process function in MyProcessWindowFunction

2019-05-24 Thread GitBox
cyq89051127 opened a new pull request #8540: Change the return value for 
process function in MyProcessWindowFunction
URL: https://github.com/apache/flink/pull/8540
 
 
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12622) The process function compiles error in MyProcessWindowFunction in windows.md

2019-05-24 Thread chaiyongqiang (JIRA)
chaiyongqiang created FLINK-12622:
-

 Summary: The process function compiles error in 
MyProcessWindowFunction in windows.md
 Key: FLINK-12622
 URL: https://issues.apache.org/jira/browse/FLINK-12622
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.8.0
Reporter: chaiyongqiang


The process function defined as below in windows.md : 

{quote}class MyProcessWindowFunction extends ProcessWindowFunction[(String, 
Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], 
out: Collector[String]): () = {
var count = 0L
for (in <- input) {
  count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
  }
}{quote}

The process function defined in ProcessWindowFunction  has a return vlue of 
Unit , But the override in MyProcessWindowFunction doesn't match it well. When 
compiling MyProcessWindowFunction , it comes an error like the following :

{quote}Error:(37, 109) '=>' expected but '=' found.
  def process(key: String, context: Context, input: Iterable[(String, Long)], 
out: Collector[String]) : ()  = {{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12621) Use MiniCluster instead of JobExecutor

2019-05-24 Thread Jeff Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-12621:
---
Description: JobExecutor is specifically used for local mode, I don't think 
we need to introduce new class/interface for local mode, we should use the 
existing MiniCluster.  (was: JobExecutor is specifically used for flink mode, I 
don't think we need to introduce new class/interface for local mode, we should 
use the existing MiniCluster.)

> Use MiniCluster instead of JobExecutor
> --
>
> Key: FLINK-12621
> URL: https://issues.apache.org/jira/browse/FLINK-12621
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.8.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>
> JobExecutor is specifically used for local mode, I don't think we need to 
> introduce new class/interface for local mode, we should use the existing 
> MiniCluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12621) Use MiniCluster instead of JobExecutor

2019-05-24 Thread Jeff Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang reassigned FLINK-12621:
--

Assignee: Jeff Zhang

> Use MiniCluster instead of JobExecutor
> --
>
> Key: FLINK-12621
> URL: https://issues.apache.org/jira/browse/FLINK-12621
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.8.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>
> JobExecutor is specifically used for flink mode, I don't think we need to 
> introduce new class/interface for local mode, we should use the existing 
> MiniCluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12621) Use MiniCluster instead of JobExecutor

2019-05-24 Thread Jeff Zhang (JIRA)
Jeff Zhang created FLINK-12621:
--

 Summary: Use MiniCluster instead of JobExecutor
 Key: FLINK-12621
 URL: https://issues.apache.org/jira/browse/FLINK-12621
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.8.0
Reporter: Jeff Zhang


JobExecutor is specifically used for flink mode, I don't think we need to 
introduce new class/interface for local mode, we should use the existing 
MiniCluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zorro786 commented on a change in pull request #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods

2019-05-24 Thread GitBox
zorro786 commented on a change in pull request #8154: [FLINK-12167] Reset 
context classloader in run and getOptimizedPlan methods
URL: https://github.com/apache/flink/pull/8154#discussion_r287540914
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 ##
 @@ -197,26 +197,33 @@ public LeaderConnectionInfo getClusterConnectionInfo() 
throws LeaderRetrievalExc
// 

 
public static String getOptimizedPlanAsJson(Optimizer compiler, 
PackagedProgram prog, int parallelism)
-   throws CompilerException, ProgramInvocationException {
+   throws CompilerException, ProgramInvocationException {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) 
getOptimizedPlan(compiler, prog, parallelism));
}
 
public static FlinkPlan getOptimizedPlan(Optimizer compiler, 
PackagedProgram prog, int parallelism)
-   throws CompilerException, ProgramInvocationException {
-   
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-   if (prog.isUsingProgramEntryPoint()) {
-   return getOptimizedPlan(compiler, 
prog.getPlanWithJars(), parallelism);
-   } else if (prog.isUsingInteractiveMode()) {
-   // temporary hack to support the optimizer plan preview
-   OptimizerPlanEnvironment env = new 
OptimizerPlanEnvironment(compiler);
-   if (parallelism > 0) {
-   env.setParallelism(parallelism);
-   }
+   throws CompilerException, ProgramInvocationException {
 
 Review comment:
   Is there any style check plugin per Flink codebase I can use to avoid this? 
My editor moves throws to new line without double indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zorro786 commented on a change in pull request #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods

2019-05-24 Thread GitBox
zorro786 commented on a change in pull request #8154: [FLINK-12167] Reset 
context classloader in run and getOptimizedPlan methods
URL: https://github.com/apache/flink/pull/8154#discussion_r287540080
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 ##
 @@ -247,44 +252,48 @@ public static OptimizedPlan getOptimizedPlan(Optimizer 
compiler, Plan p, int par
 */
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
throws ProgramInvocationException, 
ProgramMissingJobException {
-   
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-   if (prog.isUsingProgramEntryPoint()) {
-
-   final JobWithJars jobWithJars = prog.getPlanWithJars();
-
-   return run(jobWithJars, parallelism, 
prog.getSavepointSettings());
-   }
-   else if (prog.isUsingInteractiveMode()) {
-   log.info("Starting program in interactive mode 
(detached: {})", isDetached());
-
-   final List libraries = prog.getAllLibraries();
-
-   ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, libraries,
-   prog.getClasspaths(), 
prog.getUserCodeClassLoader(), parallelism, isDetached(),
-   prog.getSavepointSettings());
-   ContextEnvironment.setAsContext(factory);
-
-   try {
-   // invoke main method
-   prog.invokeInteractiveModeForExecution();
-   if (lastJobExecutionResult == null && 
factory.getLastEnvCreated() == null) {
-   throw new 
ProgramMissingJobException("The program didn't contain a Flink job.");
-   }
-   if (isDetached()) {
-   // in detached mode, we execute the 
whole user code to extract the Flink job, afterwards we run it here
-   return ((DetachedEnvironment) 
factory.getLastEnvCreated()).finalizeExecute();
+   final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+   try {
+   
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+   if (prog.isUsingProgramEntryPoint()) {
+   final JobWithJars jobWithJars = 
prog.getPlanWithJars();
+   return run(jobWithJars, parallelism, 
prog.getSavepointSettings());
+   }
+   else if (prog.isUsingInteractiveMode()) {
+   log.info("Starting program in interactive mode 
(detached: {})", isDetached());
+
+   final List libraries = 
prog.getAllLibraries();
+
+   ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, libraries,
+   prog.getClasspaths(), 
prog.getUserCodeClassLoader(), parallelism, isDetached(),
+   prog.getSavepointSettings());
+   ContextEnvironment.setAsContext(factory);
+
+   try {
+   // invoke main method
+   
prog.invokeInteractiveModeForExecution();
+   if (lastJobExecutionResult == null && 
factory.getLastEnvCreated() == null) {
+   throw new 
ProgramMissingJobException("The program didn't contain a Flink job.");
+   }
+   if (isDetached()) {
+   // in detached mode, we execute 
the whole user code to extract the Flink job, afterwards we run it here
+   return ((DetachedEnvironment) 
factory.getLastEnvCreated()).finalizeExecute();
+   }
+   else {
+   // in blocking mode, we execute 
all Flink jobs contained in the user code and then return here
+   return 
this.lastJobExecutionResult;
+   }
}
-   else {
-   // in blocking mode, we execute all 
Flink jobs contained in the user code and then return here
-   return this.lastJobExecutionResult;
+   finally {
+   ContextEnvironment.unsetContext();
 
 Re

[GitHub] [flink] zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods

2019-05-24 Thread GitBox
zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in 
run and getOptimizedPlan methods
URL: https://github.com/apache/flink/pull/8154#issuecomment-495814478
 
 
   retest


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods

2019-05-24 Thread GitBox
zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in 
run and getOptimizedPlan methods
URL: https://github.com/apache/flink/pull/8154#issuecomment-495814469
 
 
   Thanks for review and fixing the issues @tillrohrmann 
   Tests failing here:
   
   ```
   
==
   Running 'Kafka 0.10 end-to-end test'
   
==
   TEST_DATA_DIR: 
/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-00251075286
   Flink dist directory: 
/home/travis/build/apache/flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT
   Downloading Kafka from 
https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
 % Total% Received % Xferd  Average Speed   TimeTime Time  
Current
Dload  Upload   Total   SpentLeft  Speed
   
 0 00 00 0  0  0 --:--:-- --:--:-- --:--:-- 0
   100   607  100   6070 0753  0 --:--:-- --:--:-- --:--:--   
754
   
   gzip: stdin: not in gzip format
   tar: Child returned status 1
   tar: Error is not recoverable: exiting now
   [FAIL] Test script contains errors.
   Checking for errors...
   No errors in log files.
   Checking for exceptions...
   No exceptions in log files.
   Checking for non-empty .out files...
   grep: 
/home/travis/build/apache/flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT/log/*.out:
 No such file or directory
   No non-empty .out files.
   
   [FAIL] 'Kafka 0.10 end-to-end test' failed after 0 minutes and 1 seconds! 
Test exited with exit code 1
   
   No taskexecutor daemon to stop on host 
travis-job-4a07ec5f-c3be-4797-999e-9b26b211b8e6.
   No standalonesession daemon to stop on host 
travis-job-4a07ec5f-c3be-4797-999e-9b26b211b8e6.
   ```
   Is there any command to re trigger tests? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods

2019-05-24 Thread GitBox
zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in 
run and getOptimizedPlan methods
URL: https://github.com/apache/flink/pull/8154#issuecomment-495814490
 
 
   test


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287533827
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+/**
+ * HiveTableOutputFormat used to write data to hive table, including 
non-partition and partitioned table.
+ */
+public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase implements InitializeOnMaster,
+   FinalizeOnMaster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOutputFormat.class);
+
+   private static final long serialVersionUID = 1L;
+
+   private transient JobConf jobConf;
+   private transient String dbName;
+   private transient String tableName;
+   private transient List partitionCols;
+   private tr

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8472: [FLINK-12327][python] Adds support to submit Python Table API job in CliFrontend

2019-05-24 Thread GitBox
sunjincheng121 commented on a change in pull request #8472: 
[FLINK-12327][python] Adds support to submit Python Table API job in CliFrontend
URL: https://github.com/apache/flink/pull/8472#discussion_r287532751
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
 ##
 @@ -771,30 +775,50 @@ PackagedProgram buildProgram(ProgramOptions options) 
throws FileNotFoundExceptio
String jarFilePath = options.getJarFilePath();
List classpaths = options.getClasspaths();
 
-   if (jarFilePath == null) {
-   throw new IllegalArgumentException("The program JAR 
file was not specified.");
+   String entryPointClass;
+   File jarFile = null;
+   if (options.isPython()) {
+   // If the job is specified a jar file
+   if (jarFilePath != null) {
+   jarFile = getJarFile(jarFilePath);
+   }
+   // The entry point class of python job is PythonDriver
+   entryPointClass = PythonDriver.class.getCanonicalName();
+   } else {
+   if (jarFilePath == null) {
+   throw new IllegalArgumentException("The program 
JAR file was not specified.");
 
 Review comment:
   Java program should be specified a JAR file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287528948
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -590,6 +578,17 @@ private  static Table instantiateHiveTable(ObjectPath 
tablePath, CatalogBaseTabl
return hiveTable;
}
 
+   private static void setStorageFormat(StorageDescriptor sd, Map properties) {
+   // TODO: simply use text format for now
 
 Review comment:
   TODO is supposed to list what to do in the future. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287527460
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.io.IOException;
+
+/**
+ * Util class for accessing Hive tables.
+ */
+public class HiveTableUtil {
+
+   private HiveTableUtil() {
+   }
+
+   /**
+* Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
+*/
+   public static ObjectInspector getObjectInspector(TypeInformation 
flinkType) throws IOException {
+   return getObjectInspector(toHiveTypeInfo(flinkType));
+   }
+
+   // TODO: reuse Hive's TypeInfoUtils?
+   private static ObjectInspector getObjectInspector(TypeInfo type) throws 
IOException {
+   switch (type.getCategory()) {
+
+   case PRIMITIVE:
+   PrimitiveTypeInfo primitiveType = 
(PrimitiveTypeInfo) type;
+   return 
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
+
+   // TODO: support complex types
+   default:
+   throw new IOException("Unsupported Hive type 
category " + type.getCategory());
+   }
+   }
+
+   /**
+* Converts a Flink {@link TypeInformation} to corresponding Hive 
{@link TypeInfo}.
+*/
+   public static TypeInfo toHiveTypeInfo(TypeInformation flinkType) {
+   if (flinkType.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
 
 Review comment:
   switch statement instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables

2019-05-24 Thread GitBox
xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement TableSink and OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r287526558
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.TypeConverters;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A table sink to write to Hive tables.
+ */
+public class HiveTableSink implements BatchTableSink {
+
+   private final JobConf jobConf;
+   private final RowTypeInfo rowTypeInfo;
+   private final String dbName;
+   private final String tableName;
+   private final List partitionCols;
+
+   public HiveTableSink(JobConf jobConf, RowTypeInfo rowTypeInfo, String 
dbName,
+   String tableName, List 
partitionCols) {
+   this.jobConf = jobConf;
+   this.rowTypeInfo = rowTypeInfo;
+   this.dbName = dbName;
+   this.tableName = tableName;
+   this.partitionCols = partitionCols;
+   }
+
+   @Override
+   public DataStreamSink emitBoundedStream(DataStream 
boundedStream, TableConfig tableConfig, ExecutionConfig executionConfig) {
+   // TODO: support partitioning
+   final boolean isPartitioned = false;
+   // TODO: support overwrite
+   final boolean overwrite = false;
+   HiveTablePartition hiveTablePartition;
+   HiveTableOutputFormat outputFormat;
+   IMetaStoreClient client = HMSClientFactory.create(new 
HiveConf(jobConf, HiveConf.class));
+   try {
+   Table table = client.getTable(dbName, tableName);
+   StorageDescriptor sd = table.getSd();
+   // here we use the sdLocation to store the output path 
of the job, which is always a staging dir
+   String sdLocation = sd.getLocation();
+   if (isPartitioned) {
+   // TODO: implement this
+   } else {
+   sd.setLocation(toStagingDir(sdLocation, 
jobConf));
+   hiveTablePartition = new HiveTablePartition(sd, 
null);
+   }
+   outputFormat = new HiveTableOutputFormat(jobConf, 
dbName, tableName, partitionCols,
+   rowTypeInfo, hiveTablePartition, 
MetaStoreUtils.getTableMetadata(table), overwrite);
+   } catch (TException e) {
+   throw new CatalogException("Failed to query Hive 
metastore", e);
+   } catch (I

[jira] [Created] (FLINK-12620) Deadlock in task deserialization

2019-05-24 Thread Mike Kaplinskiy (JIRA)
Mike Kaplinskiy created FLINK-12620:
---

 Summary: Deadlock in task deserialization
 Key: FLINK-12620
 URL: https://issues.apache.org/jira/browse/FLINK-12620
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.8.0
Reporter: Mike Kaplinskiy


When running a batch job, I ran into an issue where task deserialization caused 
a deadlock. Specifically, if you have a static initialization dependency graph 
that looks like this (these are all classes):
{code:java}
Task1 depends on A
A depends on B
B depends on C
C depends on B [cycle]
Task2 depends on C{code}
What seems to happen is a deadlock. Specifically, threads are started on the 
task managers that simultaneously call BatchTask.instantiateUserCode on both 
Task1 and Task2. This starts deserializing the classes and initializing them. 
Here's the deadlock scenario, as a stack:
{code:java}
Time>
T1: [deserialize] -> Task1 -> A -> B -> (wait for 
C)
T2: [deserialize] -> Task2              -> C -> (wait for 
B){code}
 

A similar scenario from the web: 
[https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] .

 

For my specific problem, I'm running into this within Clojure - 
{{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with 
{{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. 
Deserializing different clojure functions calls one or the other first which 
deadlocks task managers.

 

I built a version of flink-core that had 
{{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} synchronized, 
but I'm not sure that it's the proper fix. I'm happy to submit that as a patch, 
but I'm not familiar enough with the codebase to say that it's the correct 
solution - ideally all Java class loading is synchronized, but I'm not sure how 
to do that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-12416) Docker build script fails on symlink creation ln -s

2019-05-24 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-12416.
---
   Resolution: Fixed
Fix Version/s: 1.8.1
   1.9.0

Fixed via
1.9.0: f1c3ac47b67a8940fcfdfb96ce3de5a32b34901d
1.8.1: 0094df9dc284d8748c80db7b5c7993f995dc59b0

> Docker build script fails on symlink creation ln -s
> ---
>
> Key: FLINK-12416
> URL: https://issues.apache.org/jira/browse/FLINK-12416
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Docker
>Affects Versions: 1.8.0
>Reporter: Slava D
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When using script 'build.sh' from 'flink-container/docker' it fails on 
> {code:java}
> + ln -s /opt/flink-1.8.0-bin-hadoop28-scala_2.12.tgz /opt/flink
> + ln -s /opt/job.jar /opt/flink/lib
> ln: /opt/flink/lib: Not a directory
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-12375) flink-container job jar does not have read permissions

2019-05-24 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-12375.
---
   Resolution: Fixed
Fix Version/s: 1.8.1
   1.9.0

Fixed via
1.9.0: f1c3ac47b67a8940fcfdfb96ce3de5a32b34901d
1.8.1: 0094df9dc284d8748c80db7b5c7993f995dc59b0

> flink-container job jar does not have read permissions
> --
>
> Key: FLINK-12375
> URL: https://issues.apache.org/jira/browse/FLINK-12375
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Docker
>Reporter: Adam Lamar
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.9.0, 1.8.1
>
>
> When building a custom job container using flink-container, the job can't be 
> launched if the provided job jar does not have world-readable permission.
> This is because the job jar in the container is owned by root:root, but the 
> docker container executes as the flink user.
> In environments with restrictive umasks (e.g. company laptops) that create 
> files without group and other read permissions by default, this causes the 
> instructions to fail.
> To reproduce on master:
> {code:java}
> cd flink-container/docker
> cp ../../flink-examples/flink-examples-streaming/target/WordCount.jar .
> chmod go-r WordCount.jar  # still maintain user read permission
> ./build.sh --job-jar WordCount.jar --from-archive 
> flink-1.8.0-bin-scala_2.11.tgz --image-name flink-job:latest
> FLINK_DOCKER_IMAGE_NAME=flink-job 
> FLINK_JOB=org.apache.flink.streaming.examples.wordcount.WordCount 
> docker-compose up{code}
> which results in the following error:
> {code:java}
> job-cluster_1 | 2019-04-30 18:40:57,787 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not start 
> cluster entrypoint StandaloneJobClusterEntryPoint.
> job-cluster_1 | 
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
> initialize the cluster entrypoint StandaloneJobClusterEntryPoint.
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:535)
> job-cluster_1 | at 
> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:105)
> job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not 
> create the DispatcherResourceManagerComponent.
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:257)
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:224)
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
> job-cluster_1 | at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
> job-cluster_1 | ... 2 more
> job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not 
> load the provided entrypoint class.
> job-cluster_1 | at 
> org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:119)
> job-cluster_1 | at 
> org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.retrieveJobGraph(ClassPathJobGraphRetriever.java:96)
> job-cluster_1 | at 
> org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:62)
> job-cluster_1 | at 
> org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:41)
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:184)
> job-cluster_1 | ... 6 more
> job-cluster_1 | Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.streaming.examples.wordcount.WordCount
> job-cluster_1 | at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> job-cluster_1 | at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> job-cluster_1 | at 
> org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:116)
> job-cluster_1 | ... 10 more{code}
> This issue can be fixed by chown'ing the job.jar file to flink:flink in the 
> Dockerfile.



--
This message was sent by Atlassian

[GitHub] [flink] tillrohrmann closed pull request #8391: [FLINK-12416][FLINK-12375] Fix docker build scripts on Flink-1.8

2019-05-24 Thread GitBox
tillrohrmann closed pull request #8391: [FLINK-12416][FLINK-12375] Fix docker 
build scripts on Flink-1.8
URL: https://github.com/apache/flink/pull/8391
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] Refactor test to not do assertions in @After methods

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] 
Refactor test to not do assertions in @After methods 
URL: https://github.com/apache/flink/pull/8534#discussion_r287437037
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
 ##
 @@ -212,30 +212,45 @@ public void checkClusterEmpty() {
 * Sleep a bit between the tests (we are re-using the YARN cluster for 
the tests).
 */
@After
-   public void sleep() throws IOException, YarnException {
-   Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10));
+   public void shutdownYarnClient() {
+   yarnClient.stop();
+   }
 
-   boolean isAnyJobRunning = yarnClient.getApplications().stream()
-   .anyMatch(YarnTestBase::isApplicationRunning);
+   protected void runTest(RunnableWithException test) {
+   Throwable testFailure = null;
+   try {
+   test.run();
+   } catch (Throwable t) {
+   testFailure = t;
+   }
 
-   while (deadline.hasTimeLeft() && isAnyJobRunning) {
-   try {
-   Thread.sleep(500);
-   } catch (InterruptedException e) {
-   Assert.fail("Should not happen");
-   }
-   isAnyJobRunning = yarnClient.getApplications().stream()
+   try {
+   Deadline deadline = 
Deadline.now().plus(Duration.ofSeconds(10));
+
+   boolean isAnyJobRunning = 
yarnClient.getApplications().stream()
.anyMatch(YarnTestBase::isApplicationRunning);
-   }
 
-   if (isAnyJobRunning) {
-   final List runningApps = 
yarnClient.getApplications().stream()
-   .filter(YarnTestBase::isApplicationRunning)
-   .map(app -> "App " + app.getApplicationId() + " 
is in state " + app.getYarnApplicationState() + '.')
-   .collect(Collectors.toList());
-   if (!runningApps.isEmpty()) {
-   Assert.fail("There is at least one application 
on the cluster that is not finished." + runningApps);
+   while (deadline.hasTimeLeft() && isAnyJobRunning) {
+   try {
+   Thread.sleep(500);
+   } catch (InterruptedException e) {
+   Assert.fail("Should not happen");
+   }
+   isAnyJobRunning = 
yarnClient.getApplications().stream()
+   
.anyMatch(YarnTestBase::isApplicationRunning);
+   }
+
+   if (isAnyJobRunning) {
+   final List runningApps = 
yarnClient.getApplications().stream()
+   
.filter(YarnTestBase::isApplicationRunning)
+   .map(app -> "App " + 
app.getApplicationId() + " is in state " + app.getYarnApplicationState() + '.')
+   .collect(Collectors.toList());
+   if (!runningApps.isEmpty()) {
+   Assert.fail("There is at least one 
application on the cluster that is not finished." + runningApps);
+   }
}
+   } catch (Throwable t) {
+   throw new 
AssertionError(ExceptionUtils.firstOrSuppressed(t, testFailure));
 
 Review comment:
   I think this won't report the `testFailure` if there is no `t`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] Refactor test to not do assertions in @After methods

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] 
Refactor test to not do assertions in @After methods 
URL: https://github.com/apache/flink/pull/8534#discussion_r287437184
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
 ##
 @@ -212,30 +212,45 @@ public void checkClusterEmpty() {
 * Sleep a bit between the tests (we are re-using the YARN cluster for 
the tests).
 */
@After
-   public void sleep() throws IOException, YarnException {
-   Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10));
+   public void shutdownYarnClient() {
+   yarnClient.stop();
+   }
 
-   boolean isAnyJobRunning = yarnClient.getApplications().stream()
-   .anyMatch(YarnTestBase::isApplicationRunning);
+   protected void runTest(RunnableWithException test) {
+   Throwable testFailure = null;
+   try {
+   test.run();
+   } catch (Throwable t) {
+   testFailure = t;
+   }
 
-   while (deadline.hasTimeLeft() && isAnyJobRunning) {
-   try {
-   Thread.sleep(500);
-   } catch (InterruptedException e) {
-   Assert.fail("Should not happen");
-   }
-   isAnyJobRunning = yarnClient.getApplications().stream()
+   try {
 
 Review comment:
   I think this block should be the `finally` block of the `try { test.run() } 
finally { ... }`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] Refactor test to not do assertions in @After methods

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] 
Refactor test to not do assertions in @After methods 
URL: https://github.com/apache/flink/pull/8534#discussion_r287437317
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
 ##
 @@ -212,30 +212,45 @@ public void checkClusterEmpty() {
 * Sleep a bit between the tests (we are re-using the YARN cluster for 
the tests).
 */
@After
-   public void sleep() throws IOException, YarnException {
-   Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10));
+   public void shutdownYarnClient() {
+   yarnClient.stop();
+   }
 
-   boolean isAnyJobRunning = yarnClient.getApplications().stream()
-   .anyMatch(YarnTestBase::isApplicationRunning);
+   protected void runTest(RunnableWithException test) {
+   Throwable testFailure = null;
+   try {
+   test.run();
+   } catch (Throwable t) {
+   testFailure = t;
 
 Review comment:
   No need to catch `t` if the application shut down is done in a `finally` 
block.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12546) Base Docker images on `library/flink`

2019-05-24 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847710#comment-16847710
 ] 

Yun Tang edited comment on FLINK-12546 at 5/24/19 4:42 PM:
---

[~knaufk] Thanks for your interpretation, and I would agree with you that 
{{from-archive}} option seems not so useful in this view. The only thing I use 
{{from-archive}} is when I try to fix FLINK-12416. I found if I always use 
{{from-release}} to verify the {{Dockerfile}} change, it has to download the 
archive Flink package from release site every time which really consume much 
time for me. For users, if he has to verify its user jar many times with 
{{from-release}} option, downloading the release archive would take too much 
time. I think we should add a cache feature.

However, I just wonder if we base our image on 
[https://github.com/docker-flink/docker-flink], how we handle the Flink images 
before Flink-1.7. As you can see, only Flink-1.7 and Flink-1.8 images would be 
built. Moreover, if {{Flink}} relays on {{docker-flink}} to build docker image 
while {{docker-flink}} would also depend on {{Flink}} release, do you think 
this is a bit weird? How about involve {{docker-flink}} within {{Flink}}?


was (Author: yunta):
[~knaufk] Thanks for your interpretation, and I would agree with you that 
{{from-archive}} option seems not so useful in this view. The only thing I use 
{{from-archive}} is when I try to fix FLINK-12416. I found if I always use 
{{from-release}} to verify the {{Dockerfile}} change, it has to download the 
archive Flink package from release site every time which really consume much 
time for me. For users, if he has to verify its user jar many times with 
{{from-release}} option, downloading the release archive would take too much 
time. I think we should add a cache future.

However, I just wonder if we base our image on 
[https://github.com/docker-flink/docker-flink], how we handle the Flink images 
before Flink-1.7. As you can see, only Flink-1.7 and Flink-1.8 images would be 
built. Moreover, if {{Flink}} relays on {{docker-flink}} to build docker image 
while {{docker-flink}} would also depend on {{Flink}} release, do you think 
this is a bit weird? How about involve {{docker-flink}} within {{Flink}}?

> Base Docker images on `library/flink`
> -
>
> Key: FLINK-12546
> URL: https://issues.apache.org/jira/browse/FLINK-12546
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Docker
>Affects Versions: 1.8.0
>Reporter: Konstantin Knauf
>Priority: Major
>
> Currently, Flink users turn to two different places when looking for 
> "official" community Docker images. 
> * https://github.com/docker-flink/docker-flink (Flink community maintained, 
> no official Apache releases) or https://hub.docker.com/_/flink/ 
> * The tooling and Dockerfile in the {{flink-container}} component
> While users should turn to the Flink images on Docker Hub in general, the 
> {{flink-container}} component is used by many users, because it contains some 
> tooling to build images for the {{StandaloneJobClusterEntrypoint}}. Overall, 
> this causes confusion for users and the community needs to maintain two 
> Dockerfiles, which build Flink images FROM alpine or debian.
> Therefore, I propose to change the tooling ({{build.sh}}) in 
> {{flink-container}} to have only two options: 
> a) {{from-release}}, which uses `library/flink` as base image and basically 
> only adds the user jar.  (<--- for Flink users)
> b)  {{from-local-dist}}, which also uses `library/flink` as base image but 
> replaces the flink-dist by the local flink-dist ( <--- for Flink developer)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12546) Base Docker images on `library/flink`

2019-05-24 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847710#comment-16847710
 ] 

Yun Tang commented on FLINK-12546:
--

[~knaufk] Thanks for your interpretation, and I would agree with you that 
{{from-archive}} option seems not so useful in this view. The only thing I use 
{{from-archive}} is when I try to fix FLINK-12416. I found if I always use 
{{from-release}} to verify the {{Dockerfile}} change, it has to download the 
archive Flink package from release site every time which really consume much 
time for me. For users, if he has to verify its user jar many times with 
{{from-release}} option, downloading the release archive would take too much 
time. I think we should add a cache future.

However, I just wonder if we base our image on 
[https://github.com/docker-flink/docker-flink], how we handle the Flink images 
before Flink-1.7. As you can see, only Flink-1.7 and Flink-1.8 images would be 
built. Moreover, if {{Flink}} relays on {{docker-flink}} to build docker image 
while {{docker-flink}} would also depend on {{Flink}} release, do you think 
this is a bit weird? How about involve {{docker-flink}} within {{Flink}}?

> Base Docker images on `library/flink`
> -
>
> Key: FLINK-12546
> URL: https://issues.apache.org/jira/browse/FLINK-12546
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Docker
>Affects Versions: 1.8.0
>Reporter: Konstantin Knauf
>Priority: Major
>
> Currently, Flink users turn to two different places when looking for 
> "official" community Docker images. 
> * https://github.com/docker-flink/docker-flink (Flink community maintained, 
> no official Apache releases) or https://hub.docker.com/_/flink/ 
> * The tooling and Dockerfile in the {{flink-container}} component
> While users should turn to the Flink images on Docker Hub in general, the 
> {{flink-container}} component is used by many users, because it contains some 
> tooling to build images for the {{StandaloneJobClusterEntrypoint}}. Overall, 
> this causes confusion for users and the community needs to maintain two 
> Dockerfiles, which build Flink images FROM alpine or debian.
> Therefore, I propose to change the tooling ({{build.sh}}) in 
> {{flink-container}} to have only two options: 
> a) {{from-release}}, which uses `library/flink` as base image and basically 
> only adds the user jar.  (<--- for Flink users)
> b)  {{from-local-dist}}, which also uses `library/flink` as base image but 
> replaces the flink-dist by the local flink-dist ( <--- for Flink developer)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-24 Thread GitBox
walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-495702974
 
 
   @fhueske @twalthr could you take a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints

2019-05-24 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847702#comment-16847702
 ] 

vinoyang commented on FLINK-10855:
--

[~yunta] I think [~till.rohrmann] and I have reached an agreement on how to 
achieve that (I'm just looking to see if [~srichter] still has any additions).
I usually assign issues to me because of two considerations: (1) this is also a 
problem we encounter internally; (2) I have the ability to handle it;
I have a lot of issues and PR to follow up. But in the period after the 1.8 
release, I focused mainly on checkpoint modules, so it seems that I just 
randomly assigned an issue to myself, which is not the case. We also have our 
own solutions to this problem. Thank you for your suggestion. I don't think a 
job with too many tasks will have a significant impact on cleanup. Basically, I 
think a simple strategy is that the cleaner records the metadata information 
(intervals, timeouts) of checkpoints, which it uses, and it doesn't always scan 
the file system.
Thank you for your suggestion. I will try to take it into account when dealing 
with other checkpoints issue. I will try my best to balance priorities. Thank 
you.

> CheckpointCoordinator does not delete checkpoint directory of late/failed 
> checkpoints
> -
>
> Key: FLINK-10855
> URL: https://issues.apache.org/jira/browse/FLINK-10855
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>
> In case that an acknowledge checkpoint message is late or a checkpoint cannot 
> be acknowledged, we discard the subtask state in the 
> {{CheckpointCoordinator}}. What's not happening in this case is that we 
> delete the parent directory of the checkpoint. This only happens when we 
> dispose a {{PendingCheckpoint#dispose}}. 
> Due to this behaviour it can happen that a checkpoint fails (e.g. a task not 
> being ready) and we delete the checkpoint directory. Next another task writes 
> its checkpoint data to the checkpoint directory (thereby creating it again) 
> and sending an acknowledge message back to the {{CheckpointCoordinator}}. The 
> {{CheckpointCoordinator}} will realize that there is no longer a 
> {{PendingCheckpoint}} and will discard the sub task state. This will remove 
> the state files from the checkpoint directory but will leave the checkpoint 
> directory untouched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r287433479
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
 ##
 @@ -493,6 +497,59 @@ public boolean accept(File dir, String name) {
}
}
 
+   public static boolean verifyTokenKindInContainerCredentials(final 
String[] tokens, final String containerId) {
+   List tokenList = Arrays.asList(tokens);
 
 Review comment:
   if `tokens` is of type `Collection`, then we would not need to 
convert it here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r287432149
 
 

 ##
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
 ##
 @@ -80,6 +80,9 @@
/** Yarn site xml file name populated in YARN container for secure IT 
run. */
public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
 
+   /** AM RM delegation token name populated in YARN container. */
+   public static final String AM_RM_TOKEN_NAME = "YARN_AM_RM_TOKEN";
 
 Review comment:
   Where is it specified that the AM_RM token is called like this across all 
Yarn versions?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r287433179
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
 ##
 @@ -493,6 +497,59 @@ public boolean accept(File dir, String name) {
}
}
 
+   public static boolean verifyTokenKindInContainerCredentials(final 
String[] tokens, final String containerId) {
+   List tokenList = Arrays.asList(tokens);
+   File cwd = new File("target/" + 
YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
+   if (!cwd.exists() || !cwd.isDirectory()) {
+   return false;
+   }
+
+   File containerTokens = findFile(cwd.getAbsolutePath(), new 
FilenameFilter() {
+   @Override
+   public boolean accept(File dir, String name) {
+   return name.equals(containerId + ".tokens");
+   }
+   });
+
+   if (containerTokens != null) {
+   LOG.info("Verifying tokens in {}", 
containerTokens.getAbsolutePath());
+
+   Credentials tmCredentials = null;
+   try {
+   tmCredentials = 
Credentials.readTokenStorageFile(containerTokens, new Configuration());
+   } catch (IOException e) {
+   LOG.warn("Unable to read credential file: " + 
e.getMessage() + " file: " + containerTokens.getAbsolutePath());
 
 Review comment:
   SHouldn't we simply let the exception bubble up and let the test fail?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r287431982
 
 

 ##
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
 ##
 @@ -565,7 +568,20 @@ static ContainerLaunchContext createTaskExecutorContext(
new File(fileLocation),

HadoopUtils.getHadoopConfiguration(flinkConfig));
 
-   cred.writeTokenStorageToStream(dob);
+   // Filter out AMRMToken before setting the 
tokens to the TaskManager container context.
+   Method getAllTokensMethod = 
Credentials.class.getMethod("getAllTokens");
+   Credentials taskManagerCred = new Credentials();
+   final Text amRmTokenKind = new 
Text(AM_RM_TOKEN_NAME);
+   Collection> 
userTokens =
+   (Collection>) getAllTokensMethod.invoke(cred);
 
 Review comment:
   No need for reflection. Simply call `cred.getAllTokens()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken commented on issue #8265: [FLINK-12302][runtime] Fixed the wrong finalStatus of yarn application when application finished

2019-05-24 Thread GitBox
lamber-ken commented on issue #8265: [FLINK-12302][runtime] Fixed the wrong 
finalStatus of yarn application when application finished
URL: https://github.com/apache/flink/pull/8265#issuecomment-495695056
 
 
   > Thanks, I'll take a look @lamber-ken
   
   you're welcome, if have any question, ping me any time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10713) RestartIndividualStrategy does not restore state

2019-05-24 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847683#comment-16847683
 ] 

Yun Tang commented on FLINK-10713:
--

Since new scheduling strategy is already started in FLINK-10429 , I think this 
Jira would be useless.

> RestartIndividualStrategy does not restore state
> 
>
> Key: FLINK-10713
> URL: https://issues.apache.org/jira/browse/FLINK-10713
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
>
> RestartIndividualStrategy does not perform any state restore. This is big 
> problem because all restored regions will be restarted with empty state. We 
> need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make 
the vcore that Application Master used configurable for Flink on YARN
URL: https://github.com/apache/flink/pull/8438#discussion_r287425485
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 ##
 @@ -49,6 +49,14 @@
.defaultValue(-1)
.withDescription("The port where the application master 
RPC system is listening.");
 
+   /**
+* The vcores used by YARN application master.
+*/
+   public static final ConfigOption APP_MASTER_VCORES =
+   key("yarn.appmaster.vcores")
+   .defaultValue(1)
+   .withDescription("The number of virtual cores (vcores) used by 
YARN application master.");
 
 Review comment:
   I guess we need to regenerate the configuration documentation because we 
added a new config option: `mvn package -Dgenerate-config-docs -pl flink-docs`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make 
the vcore that Application Master used configurable for Flink on YARN
URL: https://github.com/apache/flink/pull/8438#discussion_r287425131
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ##
 @@ -229,6 +229,13 @@ private void isReadyForDeployment(ClusterSpecification 
clusterSpecification) thr
throw new YarnDeploymentException("Flink configuration 
object has not been set");
}
 
+   int configuredAmVcores = 
flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES);
+   if (configuredAmVcores < 
YarnConfigOptions.APP_MASTER_VCORES.defaultValue()) {
 
 Review comment:
   I think we should compare `configuredAmVcores` against `numYarnMaxVcores` 
which we calculate a bit further down. I think we don't have to check that we 
request fewer cores than `1` because it won't take any effect.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make 
the vcore that Application Master used configurable for Flink on YARN
URL: https://github.com/apache/flink/pull/8438#discussion_r287425587
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
 ##
 @@ -147,6 +147,41 @@ public void testFailIfTaskSlotsHigherThanMaxVcores() 
throws ClusterDeploymentExc
}
}
 
+   @Test
+   public void testFailIfAppMasterVCoreLessThanDefaultVcores() throws 
ClusterDeploymentException {
+   final Configuration flinkConfiguration = new Configuration();
+   
flinkConfiguration.setInteger(YarnConfigOptions.APP_MASTER_VCORES, 0);
+
+   YarnClusterDescriptor clusterDescriptor = new 
YarnClusterDescriptor(
+   flinkConfiguration,
+   yarnConfiguration,
+   temporaryFolder.getRoot().getAbsolutePath(),
+   yarnClient,
+   true);
+
+   clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
+
+   ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
+   .setMasterMemoryMB(1)
+   .setTaskManagerMemoryMB(1024)
+   .setNumberTaskManagers(1)
+   .setSlotsPerTaskManager(1)
+   .createClusterSpecification();
+
+   try {
+   
clusterDescriptor.deploySessionCluster(clusterSpecification);
+
+   fail("The deploy call should have failed.");
+   } catch (ClusterDeploymentException e) {
+   // we expect the cause to be an 
IllegalConfigurationException
+   if (!(e.getCause() instanceof 
IllegalConfigurationException)) {
+   throw e;
+   }
+   } finally {
+   clusterDescriptor.close();
+   }
+   }
 
 Review comment:
   I think we don't need this test here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints

2019-05-24 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847677#comment-16847677
 ] 

Yun Tang commented on FLINK-10855:
--

Thanks [~yanghua] to start this Jira again. I just wonder whether a checkpoint 
cleaner could really solve FLINK-11662 . If we have 1000 tasks here and task-1 
just declined the checkpoint but other tasks would keep checkpointing. The 
timing to clear the checkpoint directory is a tough problem since we cannot 
guarantee other tasks would create that parent checkpoint directory again. 
Generally, filesystem like HDFS would create the non-existing parent folder 
when writing to a file in the sub folder. Then should we need to clean up the 
parent folder again at what time?

I planed to refactor the checkpoint directory layout in FLINK-10930 but seems 
not so widely accepted.

 

If we put FLINK-11662 aside to ignore that bug and focus on the checkpoint 
cleaner logic, we already implemented an version of cleanup in our internal 
Flink last year.There exist several points to implement this based on our 
experience:
 # Reduce the cost of listing files: I already created a issue before: 
FLINK-11868 .
 # When to list files: at least when job failover or region failover.
 # When to delete useless files: we found delete would take more time than 
listing, it should stay in the async thread.
 # Scan files synchronously or asynchronously: asynchronously scan would not 
block theJM main thread but could not delete files in time.

Since you assign this issue to yourself too eagerly but did not take any 
progress until now, and from the other JIRA's lesson, I think I could also 
attach a design doc based on our internal version of cleaner.

 

 

 

> CheckpointCoordinator does not delete checkpoint directory of late/failed 
> checkpoints
> -
>
> Key: FLINK-10855
> URL: https://issues.apache.org/jira/browse/FLINK-10855
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>
> In case that an acknowledge checkpoint message is late or a checkpoint cannot 
> be acknowledged, we discard the subtask state in the 
> {{CheckpointCoordinator}}. What's not happening in this case is that we 
> delete the parent directory of the checkpoint. This only happens when we 
> dispose a {{PendingCheckpoint#dispose}}. 
> Due to this behaviour it can happen that a checkpoint fails (e.g. a task not 
> being ready) and we delete the checkpoint directory. Next another task writes 
> its checkpoint data to the checkpoint directory (thereby creating it again) 
> and sending an acknowledge message back to the {{CheckpointCoordinator}}. The 
> {{CheckpointCoordinator}} will realize that there is no longer a 
> {{PendingCheckpoint}} and will discard the sub task state. This will remove 
> the state files from the checkpoint directory but will leave the checkpoint 
> directory untouched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287419849
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 ##
 @@ -102,4 +102,20 @@
public static final ConfigOption OFFLOAD_MINSIZE = 
key("blob.offload.minsize")
.defaultValue(1_024 * 1_024) // 1MiB by default
.withDescription("The minimum size for messages to be offloaded 
to the BlobServer.");
+
+   /**
+* The socket timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption SO_TIMEOUT =
+   key("blob.client.socket.timeout")
+   .defaultValue(120_000)
 
 Review comment:
   How did you decide on 2 minutes for the timeouts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287419923
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 ##
 @@ -102,4 +102,20 @@
public static final ConfigOption OFFLOAD_MINSIZE = 
key("blob.offload.minsize")
.defaultValue(1_024 * 1_024) // 1MiB by default
.withDescription("The minimum size for messages to be offloaded 
to the BlobServer.");
+
+   /**
+* The socket timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption SO_TIMEOUT =
+   key("blob.client.socket.timeout")
+   .defaultValue(120_000)
+   .withDescription("The socket timeout in milliseconds 
for the blob client.");
+
+   /**
+* The connection timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption CONNECT_TIMEOUT =
+   key("blob.client.connect.timeout")
+   .defaultValue(120_000)
 
 Review comment:
   Same here, why 2 minutes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287419036
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
 ##
 @@ -487,4 +488,58 @@ private static void uploadJarFile(
validateGetAndClose(blobClient.getInternal(jobId, 
blobKeys.get(0)), testFile);
}
}
+
+
+   /**
+* Tests the socket operation timeout.
+*/
+   @Test
+   public void testSocketTimeout() {
+   Configuration clientConfig = getBlobClientConfig();
+   int oldSoTimeout = 
clientConfig.getInteger(BlobServerOptions.SO_TIMEOUT);
+
+   clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, 50);
+   getBlobServer().setBlockingMillis(10_000);
+
+   try {
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+
+   try (BlobClient client = new BlobClient(serverAddress, 
clientConfig)) {
+   client.getInternal(new JobID(), 
BlobKey.createKey(TRANSIENT_BLOB));
+
+   fail("Should throw a exception.");
+   } catch (Throwable t) {
+   
assertEquals(java.net.SocketTimeoutException.class, 
ExceptionUtils.stripException(t, IOException.class).getClass());
+   }
+   } finally {
+   clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, 
oldSoTimeout);
 
 Review comment:
   We could create a new test class which contains all blob server tests which 
need to start a new blob server for each test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287419668
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ##
 @@ -70,7 +70,7 @@ public static void startNonSSLServer() throws IOException {
config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporarySslFolder.newFolder().getAbsolutePath());
config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
 
-   blobNonSslServer = new BlobServer(config, new VoidBlobStore());
+   blobNonSslServer = new TestBlobServer(config, new 
VoidBlobStore());
 
 Review comment:
   Why do we use a `TestBlobServer` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287419507
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
 ##
 @@ -487,4 +488,58 @@ private static void uploadJarFile(
validateGetAndClose(blobClient.getInternal(jobId, 
blobKeys.get(0)), testFile);
}
}
+
+
+   /**
+* Tests the socket operation timeout.
+*/
+   @Test
+   public void testSocketTimeout() {
+   Configuration clientConfig = getBlobClientConfig();
+   int oldSoTimeout = 
clientConfig.getInteger(BlobServerOptions.SO_TIMEOUT);
+
+   clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, 50);
+   getBlobServer().setBlockingMillis(10_000);
+
+   try {
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+
+   try (BlobClient client = new BlobClient(serverAddress, 
clientConfig)) {
+   client.getInternal(new JobID(), 
BlobKey.createKey(TRANSIENT_BLOB));
+
+   fail("Should throw an exception.");
+   } catch (Throwable t) {
+   
assertEquals(java.net.SocketTimeoutException.class, 
ExceptionUtils.stripException(t, IOException.class).getClass());
 
 Review comment:
   We could also use `assertThat(ExceptionUtils.findThrowable(t, 
java.net.SocketTimeoutException.class).isDefined(), is(true))`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-24 Thread GitBox
eaglewatcherwb commented on a change in pull request #8446: [FLINK-12414] 
[runtime] Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r287422649
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adapter;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.EMPTY;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.RELEASED;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default implementation of {@link SchedulingResultPartition}.
+ */
+public class DefaultResultPartition implements SchedulingResultPartition {
+
+   private final IntermediateResultPartitionID resultPartitionId;
+
+   private final IntermediateDataSetID intermediateDataSetId;
+
+   private final ResultPartitionType partitionType;
+
+   private SchedulingExecutionVertex producer;
+
+   private final List consumers;
+
+   DefaultResultPartition(
+   IntermediateResultPartitionID partitionId,
+   IntermediateDataSetID intermediateDataSetId,
+   ResultPartitionType partitionType) {
+   this.resultPartitionId = checkNotNull(partitionId);
+   this.intermediateDataSetId = 
checkNotNull(intermediateDataSetId);
+   this.partitionType = checkNotNull(partitionType);
+   this.consumers = new ArrayList<>();
+   }
+
+   @Override
+   public IntermediateResultPartitionID getId() {
+   return resultPartitionId;
+   }
+
+   @Override
+   public IntermediateDataSetID getResultId() {
+   return intermediateDataSetId;
+   }
+
+   @Override
+   public ResultPartitionType getPartitionType() {
+   return partitionType;
+   }
+
+   @Override
+   public ResultPartitionState getState() {
+   switch (producer.getState()) {
+   case CREATED:
+   case SCHEDULED:
+   case DEPLOYING:
+   return EMPTY;
+   case RUNNING:
+   return PRODUCING;
+   case FINISHED:
+   return DONE;
+   default:
+   return RELEASED;
 
 Review comment:
   OK, in the latest PR, I return `EMPTY` for the failure state.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287421208
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 ##
 @@ -102,4 +102,20 @@
public static final ConfigOption OFFLOAD_MINSIZE = 
key("blob.offload.minsize")
.defaultValue(1_024 * 1_024) // 1MiB by default
.withDescription("The minimum size for messages to be offloaded 
to the BlobServer.");
+
+   /**
+* The socket timeout in milliseconds for the blob client.
+*/
+   public static final ConfigOption SO_TIMEOUT =
+   key("blob.client.socket.timeout")
+   .defaultValue(120_000)
 
 Review comment:
   By setting it to 2 minutes, we might risk that we break existing setups if 2 
minutes is too low for them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add 
connection and socket timeouts for the blob client
URL: https://github.com/apache/flink/pull/8484#discussion_r287419709
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ##
 @@ -58,7 +58,7 @@ public static void startSSLServer() throws IOException {
Configuration config = 
SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores();
config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporarySslFolder.newFolder().getAbsolutePath());
 
-   blobSslServer = new BlobServer(config, new VoidBlobStore());
+   blobSslServer = new TestBlobServer(config, new VoidBlobStore());
 
 Review comment:
   Why `TestBlobServer`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11947) Support MapState key / value schema evolution for RocksDB

2019-05-24 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847669#comment-16847669
 ] 

Cliff Resnick commented on FLINK-11947:
---

Does the deescalation from Blocker mean that this will not likely be fixed
by 1.9?

On Fri, May 24, 2019, 10:15 AM Aljoscha Krettek (JIRA) 



> Support MapState key / value schema evolution for RocksDB
> -
>
> Key: FLINK-11947
> URL: https://issues.apache.org/jira/browse/FLINK-11947
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Currently, we do not attempt to perform state schema evolution if the key or 
> value's schema of a user {{MapState}} has changed when using {{RocksDB}}:
> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
> This was disallowed in the initial support for state schema evolution because 
> the way we did state evolution in the RocksDB state backend was simply 
> overwriting values.
> For {{MapState}} key evolution, only overwriting RocksDB values does not 
> work, since RocksDB entries for {{MapState}} uses a composite key containing 
> the map state key. This means that when evolving {{MapState}} in this case 
> with an evolved key schema, we will have new entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Aitozi commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-05-24 Thread GitBox
Aitozi commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the 
incorrect inputBufferUsage metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#issuecomment-495687292
 
 
   Hi @pnowojski , what is the reason for the second point ? 
   > This is not true for even slight data skew
   
   And +1 for adding the `inFloatingPoolUsage` specialized for detecting 
bottleneck.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12426) TM occasionally hang in deploying state

2019-05-24 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847653#comment-16847653
 ] 

Till Rohrmann commented on FLINK-12426:
---

Could it be that FLINK-12547 tries to solve the same problem [~QiLuo]? If yes, 
then let's close this issue as a duplicate, because the other issue already 
contains a fix.

> TM occasionally hang in deploying state
> ---
>
> Key: FLINK-12426
> URL: https://issues.apache.org/jira/browse/FLINK-12426
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Qi
>Priority: Major
>
> Hi all,
>   
>  We use Flink batch and start thousands of jobs per day. Occasionally we 
> observed some stuck jobs, due to some TM hang in “DEPLOYING” state. 
>   
>  It seems that the TM is calling BlobClient to download jars from 
> JM/BlobServer. Under hood it’s calling Socket.connect() and then 
> Socket.read() to retrieve results. 
>   
>  These jobs usually have many TM slots (1~2k). We checked the TM log and 
> dumped the TM thread. It indeed hung on socket read to download jar from Blob 
> server. 
>   
>  We're using Flink 1.5 but this may also affect later versions since related 
> code are not changed much. We've tried to add socket timeout in BlobClient, 
> but still no luck.
>   
>  
>  TM log
>  
>  ...
>  INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task 
> DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) 
> (184/2000).
> INFO org.apache.flink.runtime.taskmanager.Task - DataSource (at 
> createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000) switched 
> from CREATED to DEPLOYING.
> INFO org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream 
> leak safety net for task DataSource (at 
> createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000) [DEPLOYING]
> INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task 
> DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) 
> (184/2000) [DEPLOYING].
> INFO org.apache.flink.runtime.blob.BlobClient - Downloading 
> 19e65c0caa41f264f9ffe4ca2a48a434/p-3ecd6341bf97d5512b14c93f6c9f51f682b6db26-37d5e69d156ee00a924c1ebff0c0d280
>  from some-host-ip-port
> {color:#22}no more logs...{color}
>   
>  
>  TM thread dump:
>  
>  _"DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) 
> (1999/2000)" #72 prio=5 os_prio=0 tid=0x7fb9a1521000 nid=0xa0994 runnable 
> [0x7fb97cfbf000]_
>     _java.lang.Thread.State: RUNNABLE_
>          _at java.net.SocketInputStream.socketRead0(Native Method)_
>          _at 
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)_
>          _at java.net.SocketInputStream.read(SocketInputStream.java:171)_
>          _at java.net.SocketInputStream.read(SocketInputStream.java:141)_
>          _at 
> org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152)_
>          _at 
> org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140)_
>          _at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:170)_
>          _at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)_
>          _at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206)_
>          _at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)_
>          _- locked <0x00078ab60ba8> (a java.lang.Object)_
>          _at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:893)_
>          _at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)_
>          _at java.lang.Thread.run(Thread.java:748)_
>  __
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12547) Deadlock when the task thread downloads jars using BlobClient

2019-05-24 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-12547:
--
Component/s: (was: Runtime / Operators)
 Runtime / Coordination

> Deadlock when the task thread downloads jars using BlobClient
> -
>
> Key: FLINK-12547
> URL: https://issues.apache.org/jira/browse/FLINK-12547
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The jstack is as follows (this jstack is from an old Flink version, but the 
> master branch has the same problem).
> {code:java}
> "Source: Custom Source (76/400)" #68 prio=5 os_prio=0 tid=0x7f8139cd3000 
> nid=0xe2 runnable [0x7f80da5fd000]
> java.lang.Thread.State: RUNNABLE
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> at java.net.SocketInputStream.read(SocketInputStream.java:170)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at 
> org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152)
> at 
> org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140)
> at 
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:164)
> at 
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
> at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206)
> at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
> - locked <0x00062cf2a188> (a java.lang.Object)
> at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:968)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:604)
> at java.lang.Thread.run(Thread.java:834)
> Locked ownable synchronizers:
> - None
> {code}
>  
> The reason is that SO_TIMEOUT is not set in the socket connection of the blob 
> client. When the network packet loss seriously due to the high CPU load of 
> the machine, the blob client connection fails to perceive that the server has 
> been disconnected, which results in blocking in the native method. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset 
context classloader in run and getOptimizedPlan methods
URL: https://github.com/apache/flink/pull/8154#discussion_r287414640
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 ##
 @@ -197,26 +197,33 @@ public LeaderConnectionInfo getClusterConnectionInfo() 
throws LeaderRetrievalExc
// 

 
public static String getOptimizedPlanAsJson(Optimizer compiler, 
PackagedProgram prog, int parallelism)
-   throws CompilerException, ProgramInvocationException {
+   throws CompilerException, ProgramInvocationException {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) 
getOptimizedPlan(compiler, prog, parallelism));
}
 
public static FlinkPlan getOptimizedPlan(Optimizer compiler, 
PackagedProgram prog, int parallelism)
-   throws CompilerException, ProgramInvocationException {
-   
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-   if (prog.isUsingProgramEntryPoint()) {
-   return getOptimizedPlan(compiler, 
prog.getPlanWithJars(), parallelism);
-   } else if (prog.isUsingInteractiveMode()) {
-   // temporary hack to support the optimizer plan preview
-   OptimizerPlanEnvironment env = new 
OptimizerPlanEnvironment(compiler);
-   if (parallelism > 0) {
-   env.setParallelism(parallelism);
-   }
+   throws CompilerException, ProgramInvocationException {
+   ClassLoader contextCl = 
Thread.currentThread().getContextClassLoader();
+   try {
+   
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+   if (prog.isUsingProgramEntryPoint()) {
+   return getOptimizedPlan(compiler, 
prog.getPlanWithJars(), parallelism);
+   } else if (prog.isUsingInteractiveMode()) {
+   // temporary hack to support the optimizer plan 
preview
+   OptimizerPlanEnvironment env = new 
OptimizerPlanEnvironment(compiler);
+   if (parallelism > 0) {
+   env.setParallelism(parallelism);
+   }
 
-   return env.getOptimizedPlan(prog);
-   } else {
-   throw new RuntimeException("Couldn't determine program 
mode.");
+   return env.getOptimizedPlan(prog);
+   } else {
+   throw new RuntimeException("Couldn't determine 
program mode.");
+   }
+   } finally {
+   if (contextCl != null) {
 
 Review comment:
   I think we don't need the null check here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset 
context classloader in run and getOptimizedPlan methods
URL: https://github.com/apache/flink/pull/8154#discussion_r287414688
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 ##
 @@ -197,26 +197,33 @@ public LeaderConnectionInfo getClusterConnectionInfo() 
throws LeaderRetrievalExc
// 

 
public static String getOptimizedPlanAsJson(Optimizer compiler, 
PackagedProgram prog, int parallelism)
-   throws CompilerException, ProgramInvocationException {
+   throws CompilerException, ProgramInvocationException {
 
 Review comment:
   Unrelated change. Please revert.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset 
context classloader in run and getOptimizedPlan methods
URL: https://github.com/apache/flink/pull/8154#discussion_r287414860
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 ##
 @@ -246,45 +253,51 @@ public static OptimizedPlan getOptimizedPlan(Optimizer 
compiler, Plan p, int par
 * @throws ProgramInvocationException
 */
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
-   throws ProgramInvocationException, 
ProgramMissingJobException {
-   
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-   if (prog.isUsingProgramEntryPoint()) {
-
-   final JobWithJars jobWithJars = prog.getPlanWithJars();
-
-   return run(jobWithJars, parallelism, 
prog.getSavepointSettings());
-   }
-   else if (prog.isUsingInteractiveMode()) {
-   log.info("Starting program in interactive mode 
(detached: {})", isDetached());
-
-   final List libraries = prog.getAllLibraries();
-
-   ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, libraries,
-   prog.getClasspaths(), 
prog.getUserCodeClassLoader(), parallelism, isDetached(),
-   prog.getSavepointSettings());
-   ContextEnvironment.setAsContext(factory);
-
-   try {
-   // invoke main method
-   prog.invokeInteractiveModeForExecution();
-   if (lastJobExecutionResult == null && 
factory.getLastEnvCreated() == null) {
-   throw new 
ProgramMissingJobException("The program didn't contain a Flink job.");
-   }
-   if (isDetached()) {
-   // in detached mode, we execute the 
whole user code to extract the Flink job, afterwards we run it here
-   return ((DetachedEnvironment) 
factory.getLastEnvCreated()).finalizeExecute();
+   throws ProgramInvocationException, ProgramMissingJobException {
+   ClassLoader contextCl = 
Thread.currentThread().getContextClassLoader();
+   try {
+   
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+   if (prog.isUsingProgramEntryPoint()) {
+   final JobWithJars jobWithJars = 
prog.getPlanWithJars();
+   return run(jobWithJars, parallelism, 
prog.getSavepointSettings());
+   }
+   else if (prog.isUsingInteractiveMode()) {
+   log.info("Starting program in interactive mode 
(detached: {})", isDetached());
+
+   final List libraries = 
prog.getAllLibraries();
+
+   ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, libraries,
+   prog.getClasspaths(), 
prog.getUserCodeClassLoader(), parallelism, isDetached(),
+   prog.getSavepointSettings());
+   ContextEnvironment.setAsContext(factory);
+
+   try {
+   // invoke main method
+   
prog.invokeInteractiveModeForExecution();
+   if (lastJobExecutionResult == null && 
factory.getLastEnvCreated() == null) {
+   throw new 
ProgramMissingJobException("The program didn't contain a Flink job.");
+   }
+   if (isDetached()) {
+   // in detached mode, we execute 
the whole user code to extract the Flink job, afterwards we run it here
+   return ((DetachedEnvironment) 
factory.getLastEnvCreated()).finalizeExecute();
+   }
+   else {
+   // in blocking mode, we execute 
all Flink jobs contained in the user code and then return here
+   return 
this.lastJobExecutionResult;
+   }
}
-   else {
-   // in blocking mode, we execute all 
Flink jobs contained in the user code and then return here
-   return this.lastJobExecutionResult;

[GitHub] [flink] tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset 
context classloader in run and getOptimizedPlan methods
URL: https://github.com/apache/flink/pull/8154#discussion_r287414740
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 ##
 @@ -197,26 +197,33 @@ public LeaderConnectionInfo getClusterConnectionInfo() 
throws LeaderRetrievalExc
// 

 
public static String getOptimizedPlanAsJson(Optimizer compiler, 
PackagedProgram prog, int parallelism)
-   throws CompilerException, ProgramInvocationException {
+   throws CompilerException, ProgramInvocationException {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) 
getOptimizedPlan(compiler, prog, parallelism));
}
 
public static FlinkPlan getOptimizedPlan(Optimizer compiler, 
PackagedProgram prog, int parallelism)
-   throws CompilerException, ProgramInvocationException {
-   
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-   if (prog.isUsingProgramEntryPoint()) {
-   return getOptimizedPlan(compiler, 
prog.getPlanWithJars(), parallelism);
-   } else if (prog.isUsingInteractiveMode()) {
-   // temporary hack to support the optimizer plan preview
-   OptimizerPlanEnvironment env = new 
OptimizerPlanEnvironment(compiler);
-   if (parallelism > 0) {
-   env.setParallelism(parallelism);
-   }
+   throws CompilerException, ProgramInvocationException {
 
 Review comment:
   Same here with the formatting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8446: [FLINK-12414] 
[runtime] Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r287402622
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adapter;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.EMPTY;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.RELEASED;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default implementation of {@link SchedulingResultPartition}.
+ */
+public class DefaultResultPartition implements SchedulingResultPartition {
+
+   private final IntermediateResultPartitionID resultPartitionId;
+
+   private final IntermediateDataSetID intermediateDataSetId;
+
+   private final ResultPartitionType partitionType;
+
+   private SchedulingExecutionVertex producer;
+
+   private final List consumers;
+
+   DefaultResultPartition(
+   IntermediateResultPartitionID partitionId,
+   IntermediateDataSetID intermediateDataSetId,
+   ResultPartitionType partitionType) {
+   this.resultPartitionId = checkNotNull(partitionId);
+   this.intermediateDataSetId = 
checkNotNull(intermediateDataSetId);
+   this.partitionType = checkNotNull(partitionType);
+   this.consumers = new ArrayList<>();
+   }
+
+   @Override
+   public IntermediateResultPartitionID getId() {
+   return resultPartitionId;
+   }
+
+   @Override
+   public IntermediateDataSetID getResultId() {
+   return intermediateDataSetId;
+   }
+
+   @Override
+   public ResultPartitionType getPartitionType() {
+   return partitionType;
+   }
+
+   @Override
+   public ResultPartitionState getState() {
+   switch (producer.getState()) {
+   case CREATED:
+   case SCHEDULED:
+   case DEPLOYING:
+   return EMPTY;
+   case RUNNING:
+   return PRODUCING;
+   case FINISHED:
+   return DONE;
+   default:
+   return RELEASED;
 
 Review comment:
   I guess that in case of a failure we should return the partition state to 
the initial state (e.g. `EMPTY`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8537: [FLINK-12115][fs] Add support for AzureFS

2019-05-24 Thread GitBox
tillrohrmann commented on a change in pull request #8537: [FLINK-12115][fs] Add 
support for AzureFS
URL: https://github.com/apache/flink/pull/8537#discussion_r287400354
 
 

 ##
 File path: flink-filesystems/flink-azure-fs-hadoop/pom.xml
 ##
 @@ -119,17 +111,69 @@ under the License.



+   


org.apache.hadoop
-   
org.apache.flink.fs.shaded.hadoop.org.apache.hadoop
+   
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop

-   
+
+   
+   
+   
org.apache.commons
+   
org.apache.flink.fs.shaded.hadoop3.org.apache.commons
+   
+
+   

-   
com.microsoft.azure.storage
-   
org.apache.flink.fs.shaded.com.microsoft.azure.storage
+   
com.microsoft.azure
+   
org.apache.flink.fs.azure.shaded.com.microsoft.azure
+   
+
+   
+   
+   
org.apache.httpcomponents
 
 Review comment:
   If the plugins are used then not. If people drop the jar into `lib` it would 
still be beneficial. Therefore, I opted for this approach.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8539: [FLINK-12617] [container] StandaloneJobClusterEntrypoint defaults to random JobID for non-HA setups

2019-05-24 Thread GitBox
flinkbot commented on issue #8539: [FLINK-12617] [container] 
StandaloneJobClusterEntrypoint defaults to random JobID for non-HA setups
URL: https://github.com/apache/flink/pull/8539#issuecomment-495666848
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12617) StandaloneJobClusterEntrypoint should default to random JobID for non-HA setups

2019-05-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12617:
---
Labels: pull-request-available  (was: )

> StandaloneJobClusterEntrypoint should default to random JobID for non-HA 
> setups 
> 
>
> Key: FLINK-12617
> URL: https://issues.apache.org/jira/browse/FLINK-12617
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Affects Versions: 1.8.0
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8538: [FLINK-11283] Accessing the key when processing connected keyed stream

2019-05-24 Thread GitBox
flinkbot commented on issue #8538: [FLINK-11283] Accessing the key when 
processing connected keyed stream
URL: https://github.com/apache/flink/pull/8538#issuecomment-495666295
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] knaufk opened a new pull request #8539: [FLINK-12617] [container] StandaloneJobClusterEntrypoint defaults to random JobID for non-HA setups

2019-05-24 Thread GitBox
knaufk opened a new pull request #8539: [FLINK-12617] [container] 
StandaloneJobClusterEntrypoint defaults to random JobID for non-HA setups
URL: https://github.com/apache/flink/pull/8539
 
 
   ## What is the purpose of the change
   
   ## Brief change log
   
   * StandaloneJobClusterEntrypoint defaults to random JobID for non-HA setups
   * refactor StandaloneJobClusterConfigurationParserFactoryTest to use 
temporary folder for global Flink configuration
   
   ## Verifying this change
 - run unit tests in flink-container to check the correct defaults are 
 - build Flink locally, build job-cluster image
 -  run a job without HA and check that JobID is random
   ```
   ./build.sh --job-jar 
../../flink-examples/flink-examples-streaming/target/flink-examples-streaming_2.11-1.9-SNAPSHOT-TopSpeedWindowing.jar
 --from-local-dist
   FLINK_JOB=org.apache.flink.streaming.examples.windowing.TopSpeedWindowing 
docker-compose up
   ```
- run a job without HA and explicit JobID and check that parameter 
takes precedence
   ```
   FLINK_JOB_ARGUMENTS="--job-id " 
FLINK_JOB=org.apache.flink.streaming.examples.windowing.TopSpeedWindowing 
docker-compose up
   ```
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha opened a new pull request #8538: [FLINK-11283] Accessing the key when processing connected keyed stream

2019-05-24 Thread GitBox
aljoscha opened a new pull request #8538: [FLINK-11283] Accessing the key when 
processing connected keyed stream
URL: https://github.com/apache/flink/pull/8538
 
 
   Cleaned up and rebased version of #7470, for running Travis tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-05-24 Thread GitBox
pnowojski commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the 
incorrect inputBufferUsage metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#issuecomment-495661903
 
 
   > The current inputBufferUsage is only reflecting the usage of floating 
buffers, and it could give an indicator whether the setting of 
floating-buffers-per-gate is enough for performance tuning.
   
   I agree, there is a value of the current semantic. 
   
   I would even build upon that. If you want to detect bottleneck, current 
semantic of tracking just floating buffers is perfect for that. If current 
operator (reading from this input) is causing a bottleneck it well can be that 
only a single one input channel is back-pressured. At the moment we could 
detect this situation with "all floating buffers are exhausted/used".
   
   > We could always assume if the exclusive buffers are enough for one 
channel, it will not request floating buffers. In other words, if we see the 
floating buffers are used, that means the exclusive buffers should also be used.
   
   This is not true for even slight data skew.
   
   I think that there is a value of tracking separately floating and exclusive 
buffers. 
   
   Maybe we should make `inPoolUsage` track `floating + exclusive` buffers (it 
would make it consistent with non credit based mode), while we could add 
`inFloatingPoolUsage` metric with current semantic? What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8537: [FLINK-12115][fs] Add support for AzureFS

2019-05-24 Thread GitBox
zentol commented on a change in pull request #8537: [FLINK-12115][fs] Add 
support for AzureFS
URL: https://github.com/apache/flink/pull/8537#discussion_r287392198
 
 

 ##
 File path: flink-filesystems/flink-azure-fs-hadoop/pom.xml
 ##
 @@ -119,17 +111,69 @@ under the License.



+   


org.apache.hadoop
-   
org.apache.flink.fs.shaded.hadoop.org.apache.hadoop
+   
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop

-   
+
+   
+   
+   
org.apache.commons
+   
org.apache.flink.fs.shaded.hadoop3.org.apache.commons
+   
+
+   

-   
com.microsoft.azure.storage
-   
org.apache.flink.fs.shaded.com.microsoft.azure.storage
+   
com.microsoft.azure
+   
org.apache.flink.fs.azure.shaded.com.microsoft.azure
+   
+
+   
+   
+   
org.apache.httpcomponents
 
 Review comment:
   do we still need all this now that we have plugins?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12222) 'start-scala-shell -a ' doesn't ship jars to yarn container

2019-05-24 Thread Jeff Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847597#comment-16847597
 ] 

Jeff Zhang commented on FLINK-1:


Sorry [~Zentol]  It is due to my environment issue, I will close it. 

> 'start-scala-shell -a ' doesn't ship jars to yarn container
> -
>
> Key: FLINK-1
> URL: https://issues.apache.org/jira/browse/FLINK-1
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Scala Shell
>Affects Versions: 1.8.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>
> If user use `flink run` to submit flink jars, he could build uber jar to 
> include all its dependencies, but in flink scala-shell, there's no way to 
> ship custom jars.  Currently `start-scala-shell -a ` only put the jars 
> on the classpath of client side, but doesn't ship it to yarn container. This 
> make it impossible to use these jars in flink tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12222) 'start-scala-shell -a ' doesn't ship jars to yarn container

2019-05-24 Thread Jeff Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang closed FLINK-1.
--
Resolution: Invalid

> 'start-scala-shell -a ' doesn't ship jars to yarn container
> -
>
> Key: FLINK-1
> URL: https://issues.apache.org/jira/browse/FLINK-1
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Scala Shell
>Affects Versions: 1.8.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>
> If user use `flink run` to submit flink jars, he could build uber jar to 
> include all its dependencies, but in flink scala-shell, there's no way to 
> ship custom jars.  Currently `start-scala-shell -a ` only put the jars 
> on the classpath of client side, but doesn't ship it to yarn container. This 
> make it impossible to use these jars in flink tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12297) Clean the closure for OutputTags in PatternStream

2019-05-24 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847590#comment-16847590
 ] 

Aljoscha Krettek commented on FLINK-12297:
--

{{OutputTag}} needs to be an anonymous inner class (or a concrete subclass, at 
least) so that the Java compiler will put the information about the generic 
parameters in the generated code. Thats why you see {{new OutputTag()}}, 
notice the {{{}}}. 

> Clean the closure for OutputTags in PatternStream
> -
>
> Key: FLINK-12297
> URL: https://issues.apache.org/jira/browse/FLINK-12297
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.8.0
>Reporter: Dawid Wysakowicz
>Assignee: aitozi
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Right now we do not invoke closure cleaner on output tags. Therefore such 
> code:
> {code}
>   @Test
>   public void testFlatSelectSerialization() throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   DataStreamSource elements = env.fromElements(1, 2, 3);
>   OutputTag outputTag = new OutputTag("AAA") {};
>   CEP.pattern(elements, Pattern.begin("A")).flatSelect(
>   outputTag,
>   new PatternFlatTimeoutFunction() {
>   @Override
>   public void timeout(
>   Map> pattern,
>   long timeoutTimestamp,
>   Collector out) throws 
> Exception {
>   }
>   },
>   new PatternFlatSelectFunction() {
>   @Override
>   public void flatSelect(Map List> pattern, Collector out) throws Exception {
>   }
>   }
>   );
>   env.execute();
>   }
> {code}
> will fail with {{The implementation of the PatternFlatSelectAdapter is not 
> serializable. }} exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11107) Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured

2019-05-24 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-11107:
-
Summary: Avoid memory stateBackend to create arbitrary folders under HA 
path when no checkpoint path configured  (was: [state] Avoid memory 
stateBackend to create arbitrary folders under HA path when no checkpoint path 
configured)

> Avoid memory stateBackend to create arbitrary folders under HA path when no 
> checkpoint path configured
> --
>
> Key: FLINK-11107
> URL: https://issues.apache.org/jira/browse/FLINK-11107
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Currently, memory state-backend would create a folder named with random UUID 
> under HA directory if no checkpoint path ever configured. (the code logic 
> locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) 
> However, the default memory state-backend would not only be created on JM 
> side, but also on each task manager's side, which means many folders with 
> random UUID would be created under HA directory. It would result in exception 
> like:
> {noformat}
> The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 
> items=1048576{noformat}
>  If this happens, no new jobs could be submitted only if we clean up those 
> directories manually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11947) Support MapState key / value schema evolution for RocksDB

2019-05-24 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-11947:
-
Priority: Critical  (was: Blocker)

> Support MapState key / value schema evolution for RocksDB
> -
>
> Key: FLINK-11947
> URL: https://issues.apache.org/jira/browse/FLINK-11947
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System, Runtime / State Backends
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Currently, we do not attempt to perform state schema evolution if the key or 
> value's schema of a user {{MapState}} has changed when using {{RocksDB}}:
> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
> This was disallowed in the initial support for state schema evolution because 
> the way we did state evolution in the RocksDB state backend was simply 
> overwriting values.
> For {{MapState}} key evolution, only overwriting RocksDB values does not 
> work, since RocksDB entries for {{MapState}} uses a composite key containing 
> the map state key. This means that when evolving {{MapState}} in this case 
> with an evolved key schema, we will have new entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] piyushnarang commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS

2019-05-24 Thread GitBox
piyushnarang commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS
URL: https://github.com/apache/flink/pull/8537#issuecomment-495638250
 
 
   Thanks for spinning this up @tillrohrmann, will review and chime in.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-24 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287368246
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
 ##
 @@ -504,26 +441,11 @@ public void testUpdateTaskInputPartitionsFailure() 
throws Exception {
 */
@Test(timeout = 1L)
public void testLocalPartitionNotFound() throws Exception {
-   final ExecutionAttemptID eid = new ExecutionAttemptID();
-
-   final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
-   final ResultPartitionID partitionId = new ResultPartitionID();
-
-   final ResultPartitionLocation loc = 
ResultPartitionLocation.createLocal();
-
-   final InputChannelDeploymentDescriptor[] 
inputChannelDeploymentDescriptors =
-   new InputChannelDeploymentDescriptor[] {
-   new 
InputChannelDeploymentDescriptor(partitionId, loc)};
-
-   final InputGateDeploymentDescriptor 
inputGateDeploymentDescriptor =
-   new InputGateDeploymentDescriptor(resultId, 
ResultPartitionType.PIPELINED, 0, inputChannelDeploymentDescriptors);
-
-   final TaskDeploymentDescriptor tdd =
-   createTestTaskDeploymentDescriptor("Receiver",
-   eid,
-   Tasks.AgnosticReceiver.class,
-   1, Collections.emptyList(),
-   
Collections.singletonList(inputGateDeploymentDescriptor));
+   ResourceID producerLocation = new ResourceID("local");
+   DefaultShuffleDeploymentDescriptor sdd =
+   createSddWithLocalConnection(new 
IntermediateResultPartitionID(), producerLocation, 1);
+   TaskDeploymentDescriptor tdd = createReceiver(sdd, 
producerLocation);
 
 Review comment:
   it should result in identifying that the partition is located locally in 
SingleInputGateFactory on the consumer side, by comparing producer and consumer 
resource ids.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS

2019-05-24 Thread GitBox
tillrohrmann commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS
URL: https://github.com/apache/flink/pull/8537#issuecomment-495633301
 
 
   @shuai-xu do you have time to test the Azure FS implementation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS

2019-05-24 Thread GitBox
flinkbot commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS
URL: https://github.com/apache/flink/pull/8537#issuecomment-495632841
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann opened a new pull request #8537: [FLINK-12115][fs] Add support for AzureFS

2019-05-24 Thread GitBox
tillrohrmann opened a new pull request #8537: [FLINK-12115][fs] Add support for 
AzureFS
URL: https://github.com/apache/flink/pull/8537
 
 
   This PR is based on #8117. It decreases the dependency footprint and adds a 
proper NOTICE file to it.
   
   cc @piyushnarang


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-24 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287362965
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
 ##
 @@ -483,12 +419,13 @@ public void testUpdateTaskInputPartitionsFailure() 
throws Exception {
tmGateway.submitTask(tdd, env.getJobMasterId(), 
timeout).get();
taskRunningFuture.get();
 
+   ResourceID producerLocation = new ResourceID("local");
 
 Review comment:
   I think it would be better to use random `ResourceID.generate()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >