[jira] [Assigned] (FLINK-12623) Flink on yarn encountered AMRMClientImpl does not update AMRM token properly
[ https://issues.apache.org/jira/browse/FLINK-12623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dalongliu reassigned FLINK-12623: - Assignee: dalongliu > Flink on yarn encountered AMRMClientImpl does not update AMRM token properly > > > Key: FLINK-12623 > URL: https://issues.apache.org/jira/browse/FLINK-12623 > Project: Flink > Issue Type: Bug > Components: Connectors / Hadoop Compatibility >Affects Versions: 1.9.0, 2.0.0 >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Labels: patch > Fix For: 1.9.0, 2.0.0 > > > Hi, all! When my task running on yarn dependency flink verison is 1.7.1, I > encountered the following problem: > {panel} > [org.apache.hadoop.ipc.Client ] >>> msg=Exception encountered while > connecting to the server : > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 > 03:31:04.163] [WARN] [AMRM Heartbeater thread] [org.apache.hadoop.ipc.Client > ] >>> msg=Exception encountered while connecting to the server : > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 > 03:31:04.168] [INFO] [AMRM Heartbeater thread] > [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm1sg [2019-05-25 03:31:04.168] [INFO] [AMRM Heartbeater > thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm1sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater > thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm2sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater > thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm2sg > {panel} > Refering to the yarn issue[AMRMClientImpl does not update AMRM token > properly| https://issues.apache.org/jira/browse/YARN-3103] , I know this > problem is 2.4.1 version yarn bug, this bug is fixed in 2.7.0 verison yarn. > However, flink shaded hadoop version is 2.4.1, so I think this bug, flink > should shade 2.7.0 version hadoop or a higher version, is my idea correct? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12623) Flink on yarn encountered AMRMClientImpl does not update AMRM token properly
[ https://issues.apache.org/jira/browse/FLINK-12623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dalongliu reassigned FLINK-12623: - Assignee: dalongliu > Flink on yarn encountered AMRMClientImpl does not update AMRM token properly > > > Key: FLINK-12623 > URL: https://issues.apache.org/jira/browse/FLINK-12623 > Project: Flink > Issue Type: Bug > Components: Connectors / Hadoop Compatibility >Affects Versions: 1.9.0, 2.0.0 >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Labels: patch > Fix For: 1.9.0, 2.0.0 > > > Hi, all! When my task running on yarn dependency flink verison is 1.7.1, I > encountered the following problem: > {panel} > [org.apache.hadoop.ipc.Client ] >>> msg=Exception encountered while > connecting to the server : > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 > 03:31:04.163] [WARN] [AMRM Heartbeater thread] [org.apache.hadoop.ipc.Client > ] >>> msg=Exception encountered while connecting to the server : > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 > 03:31:04.168] [INFO] [AMRM Heartbeater thread] > [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm1sg [2019-05-25 03:31:04.168] [INFO] [AMRM Heartbeater > thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm1sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater > thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm2sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater > thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm2sg > {panel} > Refering to the yarn issue[AMRMClientImpl does not update AMRM token > properly| https://issues.apache.org/jira/browse/YARN-3103] , I know this > problem is 2.4.1 version yarn bug, this bug is fixed in 2.7.0 verison yarn. > However, flink shaded hadoop version is 2.4.1, so I think this bug, flink > should shade 2.7.0 version hadoop or a higher version, is my idea correct? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12623) Flink on yarn encountered AMRMClientImpl does not update AMRM token properly
[ https://issues.apache.org/jira/browse/FLINK-12623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dalongliu reassigned FLINK-12623: - Assignee: (was: dalongliu) > Flink on yarn encountered AMRMClientImpl does not update AMRM token properly > > > Key: FLINK-12623 > URL: https://issues.apache.org/jira/browse/FLINK-12623 > Project: Flink > Issue Type: Bug > Components: Connectors / Hadoop Compatibility >Affects Versions: 1.9.0, 2.0.0 >Reporter: dalongliu >Priority: Major > Labels: patch > Fix For: 1.9.0, 2.0.0 > > > Hi, all! When my task running on yarn dependency flink verison is 1.7.1, I > encountered the following problem: > {panel} > [org.apache.hadoop.ipc.Client ] >>> msg=Exception encountered while > connecting to the server : > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 > 03:31:04.163] [WARN] [AMRM Heartbeater thread] [org.apache.hadoop.ipc.Client > ] >>> msg=Exception encountered while connecting to the server : > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 > 03:31:04.168] [INFO] [AMRM Heartbeater thread] > [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm1sg [2019-05-25 03:31:04.168] [INFO] [AMRM Heartbeater > thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm1sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater > thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm2sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater > thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> > msg=Failing over to rm2sg > {panel} > Refering to the yarn issue[AMRMClientImpl does not update AMRM token > properly| https://issues.apache.org/jira/browse/YARN-3103] , I know this > problem is 2.4.1 version yarn bug, this bug is fixed in 2.7.0 verison yarn. > However, flink shaded hadoop version is 2.4.1, so I think this bug, flink > should shade 2.7.0 version hadoop or a higher version, is my idea correct? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12623) Flink on yarn encountered AMRMClientImpl does not update AMRM token properly
dalongliu created FLINK-12623: - Summary: Flink on yarn encountered AMRMClientImpl does not update AMRM token properly Key: FLINK-12623 URL: https://issues.apache.org/jira/browse/FLINK-12623 Project: Flink Issue Type: Bug Components: Connectors / Hadoop Compatibility Affects Versions: 1.9.0, 2.0.0 Reporter: dalongliu Fix For: 1.9.0, 2.0.0 Hi, all! When my task running on yarn dependency flink verison is 1.7.1, I encountered the following problem: {panel} [org.apache.hadoop.ipc.Client ] >>> msg=Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 03:31:04.163] [WARN] [AMRM Heartbeater thread] [org.apache.hadoop.ipc.Client ] >>> msg=Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Invalid AMRMToken from appattempt_1547520251214_0582_01sg [2019-05-25 03:31:04.168] [INFO] [AMRM Heartbeater thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> msg=Failing over to rm1sg [2019-05-25 03:31:04.168] [INFO] [AMRM Heartbeater thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> msg=Failing over to rm1sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> msg=Failing over to rm2sg [2019-05-25 03:31:30.143] [INFO] [AMRM Heartbeater thread] [org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider] >>> msg=Failing over to rm2sg {panel} Refering to the yarn issue[AMRMClientImpl does not update AMRM token properly| https://issues.apache.org/jira/browse/YARN-3103] , I know this problem is 2.4.1 version yarn bug, this bug is fixed in 2.7.0 verison yarn. However, flink shaded hadoop version is 2.4.1, so I think this bug, flink should shade 2.7.0 version hadoop or a higher version, is my idea correct? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] danny0405 commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
danny0405 commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-495854327 CALCITE-3055 is a vital regression for VolcanoPlanner, it would be fixed in 1.20.0, i think we should skip 1.19.0 version Calcite, and use 1.20.0 directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548639 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java ## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.InitializeOnMaster; +import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR; + +/** + * HiveTableOutputFormat used to write data to hive table, including non-partition and partitioned table. + */ +public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase implements InitializeOnMaster, + FinalizeOnMaster { + + private static final Logger LOG = LoggerFactory.getLogger(HiveTableOutputFormat.class); + + private static final long serialVersionUID = 1L; + + private transient JobConf jobConf; + private transient String dbName; + private transient String tableName; + private transient List partitionCols; + private
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548207 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java ## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.InitializeOnMaster; +import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; Review comment: try avoid use guava. Flink has its own `Preconditions` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548439 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java ## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.InitializeOnMaster; +import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR; + +/** + * HiveTableOutputFormat used to write data to hive table, including non-partition and partitioned table. + */ +public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase implements InitializeOnMaster, + FinalizeOnMaster { + + private static final Logger LOG = LoggerFactory.getLogger(HiveTableOutputFormat.class); + + private static final long serialVersionUID = 1L; + + private transient JobConf jobConf; + private transient String dbName; + private transient String tableName; + private transient List partitionCols; + private
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548474 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java ## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.InitializeOnMaster; +import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR; + +/** + * HiveTableOutputFormat used to write data to hive table, including non-partition and partitioned table. + */ +public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase implements InitializeOnMaster, + FinalizeOnMaster { + + private static final Logger LOG = LoggerFactory.getLogger(HiveTableOutputFormat.class); + + private static final long serialVersionUID = 1L; + + private transient JobConf jobConf; + private transient String dbName; + private transient String tableName; + private transient List partitionCols; + private
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287526502 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -239,13 +253,10 @@ under the License. - - org.apache.hive hive-exec ${hive.version} - test Review comment: can you open an immediate followup PR and add new libraries packaged in flink-connector-hive to license notice file? https://github.com/apache/flink/pull/8205 is a good example This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548246 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java ## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.InitializeOnMaster; +import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR; + +/** + * HiveTableOutputFormat used to write data to hive table, including non-partition and partitioned table. + */ +public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase implements InitializeOnMaster, + FinalizeOnMaster { + + private static final Logger LOG = LoggerFactory.getLogger(HiveTableOutputFormat.class); + + private static final long serialVersionUID = 1L; Review comment: use a more meaningful serial uuid it needs to be serde? This is an automated message from the Apache Gi
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548780 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -590,6 +578,17 @@ private static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTabl return hiveTable; } + private static void setStorageFormat(StorageDescriptor sd, Map properties) { + // TODO: simply use text format for now + String storageFormatName = DEFAULT_HIVE_TABLE_STORAGE_FORMAT; + StorageFormatDescriptor sfDescriptor = storageFormatFactory.get(storageFormatName); + checkArgument(sfDescriptor != null, "Unknown storage format " + storageFormatName); Review comment: move input checks to the top? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548230 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java ## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.InitializeOnMaster; +import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI; Review comment: let's not static import a method This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548916 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java ## @@ -50,6 +54,7 @@ private static HiveConf getHiveConf() throws IOException { HiveConf hiveConf = new HiveConf(); hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath()); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, ""); Review comment: why set it to empty string? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548573 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java ## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.InitializeOnMaster; +import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR; + +/** + * HiveTableOutputFormat used to write data to hive table, including non-partition and partitioned table. + */ +public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase implements InitializeOnMaster, + FinalizeOnMaster { + + private static final Logger LOG = LoggerFactory.getLogger(HiveTableOutputFormat.class); + + private static final long serialVersionUID = 1L; + + private transient JobConf jobConf; + private transient String dbName; + private transient String tableName; + private transient List partitionCols; + private
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548805 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -590,6 +578,17 @@ private static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTabl return hiveTable; } + private static void setStorageFormat(StorageDescriptor sd, Map properties) { + // TODO: simply use text format for now + String storageFormatName = DEFAULT_HIVE_TABLE_STORAGE_FORMAT; + StorageFormatDescriptor sfDescriptor = storageFormatFactory.get(storageFormatName); Review comment: nit: can we use full name for variables and not abbreviation? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548672 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java ## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.InitializeOnMaster; +import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR; + +/** + * HiveTableOutputFormat used to write data to hive table, including non-partition and partitioned table. + */ +public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase implements InitializeOnMaster, + FinalizeOnMaster { + + private static final Logger LOG = LoggerFactory.getLogger(HiveTableOutputFormat.class); + + private static final long serialVersionUID = 1L; + + private transient JobConf jobConf; + private transient String dbName; + private transient String tableName; + private transient List partitionCols; + private
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548343 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java ## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.InitializeOnMaster; +import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR; + +/** + * HiveTableOutputFormat used to write data to hive table, including non-partition and partitioned table. + */ +public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase implements InitializeOnMaster, + FinalizeOnMaster { + + private static final Logger LOG = LoggerFactory.getLogger(HiveTableOutputFormat.class); + + private static final long serialVersionUID = 1L; + + private transient JobConf jobConf; + private transient String dbName; + private transient String tableName; + private transient List partitionCols; + private
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548859 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.io.IOException; + +/** + * Util class for accessing Hive tables. + */ +public class HiveTableUtil { + + private HiveTableUtil() { + } + + /** +* Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}. +*/ + public static ObjectInspector getObjectInspector(TypeInformation flinkType) throws IOException { + return getObjectInspector(toHiveTypeInfo(flinkType)); + } + + // TODO: reuse Hive's TypeInfoUtils? + private static ObjectInspector getObjectInspector(TypeInfo type) throws IOException { + switch (type.getCategory()) { + + case PRIMITIVE: + PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type; + return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType); + + // TODO: support complex types + default: + throw new IOException("Unsupported Hive type category " + type.getCategory()); + } + } + + /** +* Converts a Flink {@link TypeInformation} to corresponding Hive {@link TypeInfo}. +*/ + public static TypeInfo toHiveTypeInfo(TypeInformation flinkType) { Review comment: isn't this dup of `HiveTypeUtil.toHiveType()`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287548881 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfigOptions; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.HiveCatalogTable; +import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.runtime.utils.BatchTableEnvUtil; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.JobConf; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import scala.collection.JavaConversions; + +/** + * Tests {@link HiveTableSink}. + */ +public class HiveTableSinkTest { + + private static HiveCatalog hiveCatalog; + private static HiveConf hiveConf; + + @BeforeClass + public static void createCatalog() throws IOException { + hiveConf = HiveTestUtils.getHiveConf(); + hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf); + hiveCatalog.open(); + } + + @AfterClass + public static void closeCatalog() { + if (hiveCatalog != null) { + hiveCatalog.close(); + } + } + + @Test + public void testInsertIntoNonPartitionTable() throws Exception { + final String dbName = "default"; + final String tblName = "dest"; + ObjectPath tablePath = new ObjectPath(dbName, tblName); + TableSchema tableSchema = new TableSchema( + new String[]{"a", "b", "c", "d", "e"}, + new TypeInformation[]{ + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO} + ); + HiveCatalogTable catalogTable = new HiveCatalogTable(tableSchema, new HashMap<>(), ""); + hiveCatalog.createTable(tablePath, catalogTable, false); + BatchTableEnvironment tableEnv = createTableEnv(1); + Table table = getSmall5TupleDataSet(tableEnv); + HiveTableSink hiveTableSink = new HiveTableSink(new JobConf(hiveConf), + new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()), dbName, tblName, Collections.emptyList()); + tableEnv.writeToSink(table, hiveTableSink, null); + tableEnv.execute(); + // TODO: verify data is written properly + } + + private BatchTableEnvironment createTableEnv(int parallelism) { + Configuration config = new Configuration(); + config.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1); + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, config); + return BatchTableEnviro
[GitHub] [flink] yanghua commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN
yanghua commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN URL: https://github.com/apache/flink/pull/8438#discussion_r287548190 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java ## @@ -49,6 +49,14 @@ .defaultValue(-1) .withDescription("The port where the application master RPC system is listening."); + /** +* The vcores used by YARN application master. +*/ + public static final ConfigOption APP_MASTER_VCORES = + key("yarn.appmaster.vcores") + .defaultValue(1) + .withDescription("The number of virtual cores (vcores) used by YARN application master."); Review comment: I have tried, but it does not take effect. In addition, it seems all the generated HTML documentation are not tracked by git. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file
flinkbot commented on issue #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file URL: https://github.com/apache/flink/pull/8541#issuecomment-495825993 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 opened a new pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file
bowenli86 opened a new pull request #8541: [FLINK-9172][sql-client] Support catalogs in SQL-Client yaml config file URL: https://github.com/apache/flink/pull/8541 ## What is the purpose of the change This PR adds support for **basic** catalog entries in SQL-Client yaml config file, adds `CatalogFactory` and `CatalogDescriptor`, and hooks them up with SQL Client thru table factory discovery service. Please refer to the design in [doc](https://docs.google.com/document/d/1ALxfiGZBaZ8KUNJtoT443hReoPoJEdt9Db2wiJ2LuWA/edit). Note that more fields of catalog entries and factories for existing catalogs will be added in the next few PRs. ## Brief change log - Created `CatalogFactory`, `CatalogDescriptor`, `CatalogDescriptorValidator` in flink table modules - Created `CatalogEntry` and made SQL CLI handle catalog entries in yaml file - Added unit tests ## Verifying this change This change added tests and can be verified as follows: - see new unit tests in this PR ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? docs will be added later when the feature completes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12622) The process function compiles error in MyProcessWindowFunction in windows.md
[ https://issues.apache.org/jira/browse/FLINK-12622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848003#comment-16848003 ] chaiyongqiang commented on FLINK-12622: --- [https://github.com/apache/flink/pull/8540/commits] > The process function compiles error in MyProcessWindowFunction in windows.md > > > Key: FLINK-12622 > URL: https://issues.apache.org/jira/browse/FLINK-12622 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.8.0 >Reporter: chaiyongqiang >Priority: Minor > > The process function defined as below in windows.md : > {quote}class MyProcessWindowFunction extends ProcessWindowFunction[(String, > Long), String, String, TimeWindow] { > def process(key: String, context: Context, input: Iterable[(String, Long)], > out: Collector[String]): () = { > var count = 0L > for (in <- input) { > count = count + 1 > } > out.collect(s"Window ${context.window} count: $count") > } > }{quote} > The process function defined in ProcessWindowFunction has a return vlue of > Unit , But the override in MyProcessWindowFunction doesn't match it well. > When compiling MyProcessWindowFunction , it comes an error like the following > : > {quote}Error:(37, 109) '=>' expected but '=' found. > def process(key: String, context: Context, input: Iterable[(String, Long)], > out: Collector[String]) : () = {{quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9172) Support catalogs in SQL-Client yaml config file
[ https://issues.apache.org/jira/browse/FLINK-9172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-9172: Summary: Support catalogs in SQL-Client yaml config file (was: Support external catalogs in SQL-Client) > Support catalogs in SQL-Client yaml config file > --- > > Key: FLINK-9172 > URL: https://issues.apache.org/jira/browse/FLINK-9172 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Client >Reporter: Rong Rong >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > It doesn't seem that the configuration (YAML) file allows specifications of > external catalogs currently. The request here is to add support for external > catalog specifications in YAML file. User should also be able to specify one > catalog is the default. > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. I am currently > think of having an external catalog factory that spins up both streaming and > batch external catalog table sources and sinks. This could greatly unify and > provide easy access for SQL users. > The catalog-related configurations then need to be processed and passed to > TableEnvironment accordingly by calling relevant APIs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12297) Clean the closure for OutputTags in PatternStream
[ https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848002#comment-16848002 ] aitozi commented on FLINK-12297: Hi,[~aljoscha] Since you have noticed this issue, can you take a look on this PR to give me some suggestion since you are the author for the ClosureCleaner . 3Q. > Clean the closure for OutputTags in PatternStream > - > > Key: FLINK-12297 > URL: https://issues.apache.org/jira/browse/FLINK-12297 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.8.0 >Reporter: Dawid Wysakowicz >Assignee: aitozi >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Right now we do not invoke closure cleaner on output tags. Therefore such > code: > {code} > @Test > public void testFlatSelectSerialization() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStreamSource elements = env.fromElements(1, 2, 3); > OutputTag outputTag = new OutputTag("AAA") {}; > CEP.pattern(elements, Pattern.begin("A")).flatSelect( > outputTag, > new PatternFlatTimeoutFunction() { > @Override > public void timeout( > Map> pattern, > long timeoutTimestamp, > Collector out) throws > Exception { > } > }, > new PatternFlatSelectFunction() { > @Override > public void flatSelect(Map List> pattern, Collector out) throws Exception { > } > } > ); > env.execute(); > } > {code} > will fail with {{The implementation of the PatternFlatSelectAdapter is not > serializable. }} exception -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] yanghua commented on issue #8538: [FLINK-11283] Accessing the key when processing connected keyed stream
yanghua commented on issue #8538: [FLINK-11283] Accessing the key when processing connected keyed stream URL: https://github.com/apache/flink/pull/8538#issuecomment-495824161 There is a failure when downloading kafka_2.11-2.2.0. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8540: Change the return value for process function in MyProcessWindowFunction
flinkbot commented on issue #8540: Change the return value for process function in MyProcessWindowFunction URL: https://github.com/apache/flink/pull/8540#issuecomment-495821941 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] cyq89051127 opened a new pull request #8540: Change the return value for process function in MyProcessWindowFunction
cyq89051127 opened a new pull request #8540: Change the return value for process function in MyProcessWindowFunction URL: https://github.com/apache/flink/pull/8540 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12622) The process function compiles error in MyProcessWindowFunction in windows.md
chaiyongqiang created FLINK-12622: - Summary: The process function compiles error in MyProcessWindowFunction in windows.md Key: FLINK-12622 URL: https://issues.apache.org/jira/browse/FLINK-12622 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.8.0 Reporter: chaiyongqiang The process function defined as below in windows.md : {quote}class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = { var count = 0L for (in <- input) { count = count + 1 } out.collect(s"Window ${context.window} count: $count") } }{quote} The process function defined in ProcessWindowFunction has a return vlue of Unit , But the override in MyProcessWindowFunction doesn't match it well. When compiling MyProcessWindowFunction , it comes an error like the following : {quote}Error:(37, 109) '=>' expected but '=' found. def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) : () = {{quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12621) Use MiniCluster instead of JobExecutor
[ https://issues.apache.org/jira/browse/FLINK-12621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang updated FLINK-12621: --- Description: JobExecutor is specifically used for local mode, I don't think we need to introduce new class/interface for local mode, we should use the existing MiniCluster. (was: JobExecutor is specifically used for flink mode, I don't think we need to introduce new class/interface for local mode, we should use the existing MiniCluster.) > Use MiniCluster instead of JobExecutor > -- > > Key: FLINK-12621 > URL: https://issues.apache.org/jira/browse/FLINK-12621 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.8.0 >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > > JobExecutor is specifically used for local mode, I don't think we need to > introduce new class/interface for local mode, we should use the existing > MiniCluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12621) Use MiniCluster instead of JobExecutor
[ https://issues.apache.org/jira/browse/FLINK-12621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang reassigned FLINK-12621: -- Assignee: Jeff Zhang > Use MiniCluster instead of JobExecutor > -- > > Key: FLINK-12621 > URL: https://issues.apache.org/jira/browse/FLINK-12621 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.8.0 >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > > JobExecutor is specifically used for flink mode, I don't think we need to > introduce new class/interface for local mode, we should use the existing > MiniCluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12621) Use MiniCluster instead of JobExecutor
Jeff Zhang created FLINK-12621: -- Summary: Use MiniCluster instead of JobExecutor Key: FLINK-12621 URL: https://issues.apache.org/jira/browse/FLINK-12621 Project: Flink Issue Type: Improvement Affects Versions: 1.8.0 Reporter: Jeff Zhang JobExecutor is specifically used for flink mode, I don't think we need to introduce new class/interface for local mode, we should use the existing MiniCluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zorro786 commented on a change in pull request #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods
zorro786 commented on a change in pull request #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods URL: https://github.com/apache/flink/pull/8154#discussion_r287540914 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ## @@ -197,26 +197,33 @@ public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalExc // public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) - throws CompilerException, ProgramInvocationException { + throws CompilerException, ProgramInvocationException { PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism)); } public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) - throws CompilerException, ProgramInvocationException { - Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); - if (prog.isUsingProgramEntryPoint()) { - return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism); - } else if (prog.isUsingInteractiveMode()) { - // temporary hack to support the optimizer plan preview - OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler); - if (parallelism > 0) { - env.setParallelism(parallelism); - } + throws CompilerException, ProgramInvocationException { Review comment: Is there any style check plugin per Flink codebase I can use to avoid this? My editor moves throws to new line without double indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zorro786 commented on a change in pull request #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods
zorro786 commented on a change in pull request #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods URL: https://github.com/apache/flink/pull/8154#discussion_r287540080 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ## @@ -247,44 +252,48 @@ public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int par */ public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException { - Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); - if (prog.isUsingProgramEntryPoint()) { - - final JobWithJars jobWithJars = prog.getPlanWithJars(); - - return run(jobWithJars, parallelism, prog.getSavepointSettings()); - } - else if (prog.isUsingInteractiveMode()) { - log.info("Starting program in interactive mode (detached: {})", isDetached()); - - final List libraries = prog.getAllLibraries(); - - ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, - prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(), - prog.getSavepointSettings()); - ContextEnvironment.setAsContext(factory); - - try { - // invoke main method - prog.invokeInteractiveModeForExecution(); - if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) { - throw new ProgramMissingJobException("The program didn't contain a Flink job."); - } - if (isDetached()) { - // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here - return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute(); + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); + if (prog.isUsingProgramEntryPoint()) { + final JobWithJars jobWithJars = prog.getPlanWithJars(); + return run(jobWithJars, parallelism, prog.getSavepointSettings()); + } + else if (prog.isUsingInteractiveMode()) { + log.info("Starting program in interactive mode (detached: {})", isDetached()); + + final List libraries = prog.getAllLibraries(); + + ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, + prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(), + prog.getSavepointSettings()); + ContextEnvironment.setAsContext(factory); + + try { + // invoke main method + prog.invokeInteractiveModeForExecution(); + if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) { + throw new ProgramMissingJobException("The program didn't contain a Flink job."); + } + if (isDetached()) { + // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here + return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute(); + } + else { + // in blocking mode, we execute all Flink jobs contained in the user code and then return here + return this.lastJobExecutionResult; + } } - else { - // in blocking mode, we execute all Flink jobs contained in the user code and then return here - return this.lastJobExecutionResult; + finally { + ContextEnvironment.unsetContext(); Re
[GitHub] [flink] zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods
zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods URL: https://github.com/apache/flink/pull/8154#issuecomment-495814478 retest This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods
zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods URL: https://github.com/apache/flink/pull/8154#issuecomment-495814469 Thanks for review and fixing the issues @tillrohrmann Tests failing here: ``` == Running 'Kafka 0.10 end-to-end test' == TEST_DATA_DIR: /home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-00251075286 Flink dist directory: /home/travis/build/apache/flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT Downloading Kafka from https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz % Total% Received % Xferd Average Speed TimeTime Time Current Dload Upload Total SpentLeft Speed 0 00 00 0 0 0 --:--:-- --:--:-- --:--:-- 0 100 607 100 6070 0753 0 --:--:-- --:--:-- --:--:-- 754 gzip: stdin: not in gzip format tar: Child returned status 1 tar: Error is not recoverable: exiting now [FAIL] Test script contains errors. Checking for errors... No errors in log files. Checking for exceptions... No exceptions in log files. Checking for non-empty .out files... grep: /home/travis/build/apache/flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT/log/*.out: No such file or directory No non-empty .out files. [FAIL] 'Kafka 0.10 end-to-end test' failed after 0 minutes and 1 seconds! Test exited with exit code 1 No taskexecutor daemon to stop on host travis-job-4a07ec5f-c3be-4797-999e-9b26b211b8e6. No standalonesession daemon to stop on host travis-job-4a07ec5f-c3be-4797-999e-9b26b211b8e6. ``` Is there any command to re trigger tests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods
zorro786 commented on issue #8154: [FLINK-12167] Reset context classloader in run and getOptimizedPlan methods URL: https://github.com/apache/flink/pull/8154#issuecomment-495814490 test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287533827 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java ## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.InitializeOnMaster; +import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI; +import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR; + +/** + * HiveTableOutputFormat used to write data to hive table, including non-partition and partitioned table. + */ +public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase implements InitializeOnMaster, + FinalizeOnMaster { + + private static final Logger LOG = LoggerFactory.getLogger(HiveTableOutputFormat.class); + + private static final long serialVersionUID = 1L; + + private transient JobConf jobConf; + private transient String dbName; + private transient String tableName; + private transient List partitionCols; + private tr
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8472: [FLINK-12327][python] Adds support to submit Python Table API job in CliFrontend
sunjincheng121 commented on a change in pull request #8472: [FLINK-12327][python] Adds support to submit Python Table API job in CliFrontend URL: https://github.com/apache/flink/pull/8472#discussion_r287532751 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ## @@ -771,30 +775,50 @@ PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundExceptio String jarFilePath = options.getJarFilePath(); List classpaths = options.getClasspaths(); - if (jarFilePath == null) { - throw new IllegalArgumentException("The program JAR file was not specified."); + String entryPointClass; + File jarFile = null; + if (options.isPython()) { + // If the job is specified a jar file + if (jarFilePath != null) { + jarFile = getJarFile(jarFilePath); + } + // The entry point class of python job is PythonDriver + entryPointClass = PythonDriver.class.getCanonicalName(); + } else { + if (jarFilePath == null) { + throw new IllegalArgumentException("The program JAR file was not specified."); Review comment: Java program should be specified a JAR file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287528948 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -590,6 +578,17 @@ private static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTabl return hiveTable; } + private static void setStorageFormat(StorageDescriptor sd, Map properties) { + // TODO: simply use text format for now Review comment: TODO is supposed to list what to do in the future. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287527460 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.io.IOException; + +/** + * Util class for accessing Hive tables. + */ +public class HiveTableUtil { + + private HiveTableUtil() { + } + + /** +* Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}. +*/ + public static ObjectInspector getObjectInspector(TypeInformation flinkType) throws IOException { + return getObjectInspector(toHiveTypeInfo(flinkType)); + } + + // TODO: reuse Hive's TypeInfoUtils? + private static ObjectInspector getObjectInspector(TypeInfo type) throws IOException { + switch (type.getCategory()) { + + case PRIMITIVE: + PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type; + return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType); + + // TODO: support complex types + default: + throw new IOException("Unsupported Hive type category " + type.getCategory()); + } + } + + /** +* Converts a Flink {@link TypeInformation} to corresponding Hive {@link TypeInfo}. +*/ + public static TypeInfo toHiveTypeInfo(TypeInformation flinkType) { + if (flinkType.equals(BasicTypeInfo.STRING_TYPE_INFO)) { Review comment: switch statement instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables
xuefuz commented on a change in pull request #8536: [FLINK-12568][hive] Implement TableSink and OutputFormat to write Hive tables URL: https://github.com/apache/flink/pull/8536#discussion_r287526558 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.HMSClientFactory; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.sinks.BatchTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.type.TypeConverters; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; + +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.mapred.JobConf; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.List; + +/** + * A table sink to write to Hive tables. + */ +public class HiveTableSink implements BatchTableSink { + + private final JobConf jobConf; + private final RowTypeInfo rowTypeInfo; + private final String dbName; + private final String tableName; + private final List partitionCols; + + public HiveTableSink(JobConf jobConf, RowTypeInfo rowTypeInfo, String dbName, + String tableName, List partitionCols) { + this.jobConf = jobConf; + this.rowTypeInfo = rowTypeInfo; + this.dbName = dbName; + this.tableName = tableName; + this.partitionCols = partitionCols; + } + + @Override + public DataStreamSink emitBoundedStream(DataStream boundedStream, TableConfig tableConfig, ExecutionConfig executionConfig) { + // TODO: support partitioning + final boolean isPartitioned = false; + // TODO: support overwrite + final boolean overwrite = false; + HiveTablePartition hiveTablePartition; + HiveTableOutputFormat outputFormat; + IMetaStoreClient client = HMSClientFactory.create(new HiveConf(jobConf, HiveConf.class)); + try { + Table table = client.getTable(dbName, tableName); + StorageDescriptor sd = table.getSd(); + // here we use the sdLocation to store the output path of the job, which is always a staging dir + String sdLocation = sd.getLocation(); + if (isPartitioned) { + // TODO: implement this + } else { + sd.setLocation(toStagingDir(sdLocation, jobConf)); + hiveTablePartition = new HiveTablePartition(sd, null); + } + outputFormat = new HiveTableOutputFormat(jobConf, dbName, tableName, partitionCols, + rowTypeInfo, hiveTablePartition, MetaStoreUtils.getTableMetadata(table), overwrite); + } catch (TException e) { + throw new CatalogException("Failed to query Hive metastore", e); + } catch (I
[jira] [Created] (FLINK-12620) Deadlock in task deserialization
Mike Kaplinskiy created FLINK-12620: --- Summary: Deadlock in task deserialization Key: FLINK-12620 URL: https://issues.apache.org/jira/browse/FLINK-12620 Project: Flink Issue Type: Bug Affects Versions: 1.8.0 Reporter: Mike Kaplinskiy When running a batch job, I ran into an issue where task deserialization caused a deadlock. Specifically, if you have a static initialization dependency graph that looks like this (these are all classes): {code:java} Task1 depends on A A depends on B B depends on C C depends on B [cycle] Task2 depends on C{code} What seems to happen is a deadlock. Specifically, threads are started on the task managers that simultaneously call BatchTask.instantiateUserCode on both Task1 and Task2. This starts deserializing the classes and initializing them. Here's the deadlock scenario, as a stack: {code:java} Time> T1: [deserialize] -> Task1 -> A -> B -> (wait for C) T2: [deserialize] -> Task2 -> C -> (wait for B){code} A similar scenario from the web: [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] . For my specific problem, I'm running into this within Clojure - {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. Deserializing different clojure functions calls one or the other first which deadlocks task managers. I built a version of flink-core that had {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} synchronized, but I'm not sure that it's the proper fix. I'm happy to submit that as a patch, but I'm not familiar enough with the codebase to say that it's the correct solution - ideally all Java class loading is synchronized, but I'm not sure how to do that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-12416) Docker build script fails on symlink creation ln -s
[ https://issues.apache.org/jira/browse/FLINK-12416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-12416. --- Resolution: Fixed Fix Version/s: 1.8.1 1.9.0 Fixed via 1.9.0: f1c3ac47b67a8940fcfdfb96ce3de5a32b34901d 1.8.1: 0094df9dc284d8748c80db7b5c7993f995dc59b0 > Docker build script fails on symlink creation ln -s > --- > > Key: FLINK-12416 > URL: https://issues.apache.org/jira/browse/FLINK-12416 > Project: Flink > Issue Type: Bug > Components: Deployment / Docker >Affects Versions: 1.8.0 >Reporter: Slava D >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 10m > Remaining Estimate: 0h > > When using script 'build.sh' from 'flink-container/docker' it fails on > {code:java} > + ln -s /opt/flink-1.8.0-bin-hadoop28-scala_2.12.tgz /opt/flink > + ln -s /opt/job.jar /opt/flink/lib > ln: /opt/flink/lib: Not a directory > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-12375) flink-container job jar does not have read permissions
[ https://issues.apache.org/jira/browse/FLINK-12375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-12375. --- Resolution: Fixed Fix Version/s: 1.8.1 1.9.0 Fixed via 1.9.0: f1c3ac47b67a8940fcfdfb96ce3de5a32b34901d 1.8.1: 0094df9dc284d8748c80db7b5c7993f995dc59b0 > flink-container job jar does not have read permissions > -- > > Key: FLINK-12375 > URL: https://issues.apache.org/jira/browse/FLINK-12375 > Project: Flink > Issue Type: Bug > Components: Deployment / Docker >Reporter: Adam Lamar >Assignee: Yun Tang >Priority: Major > Fix For: 1.9.0, 1.8.1 > > > When building a custom job container using flink-container, the job can't be > launched if the provided job jar does not have world-readable permission. > This is because the job jar in the container is owned by root:root, but the > docker container executes as the flink user. > In environments with restrictive umasks (e.g. company laptops) that create > files without group and other read permissions by default, this causes the > instructions to fail. > To reproduce on master: > {code:java} > cd flink-container/docker > cp ../../flink-examples/flink-examples-streaming/target/WordCount.jar . > chmod go-r WordCount.jar # still maintain user read permission > ./build.sh --job-jar WordCount.jar --from-archive > flink-1.8.0-bin-scala_2.11.tgz --image-name flink-job:latest > FLINK_DOCKER_IMAGE_NAME=flink-job > FLINK_JOB=org.apache.flink.streaming.examples.wordcount.WordCount > docker-compose up{code} > which results in the following error: > {code:java} > job-cluster_1 | 2019-04-30 18:40:57,787 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not start > cluster entrypoint StandaloneJobClusterEntryPoint. > job-cluster_1 | > org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to > initialize the cluster entrypoint StandaloneJobClusterEntryPoint. > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:535) > job-cluster_1 | at > org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:105) > job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not > create the DispatcherResourceManagerComponent. > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:257) > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:224) > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172) > job-cluster_1 | at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171) > job-cluster_1 | ... 2 more > job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not > load the provided entrypoint class. > job-cluster_1 | at > org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:119) > job-cluster_1 | at > org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.retrieveJobGraph(ClassPathJobGraphRetriever.java:96) > job-cluster_1 | at > org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:62) > job-cluster_1 | at > org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:41) > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:184) > job-cluster_1 | ... 6 more > job-cluster_1 | Caused by: java.lang.ClassNotFoundException: > org.apache.flink.streaming.examples.wordcount.WordCount > job-cluster_1 | at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > job-cluster_1 | at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > job-cluster_1 | at > org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:116) > job-cluster_1 | ... 10 more{code} > This issue can be fixed by chown'ing the job.jar file to flink:flink in the > Dockerfile. -- This message was sent by Atlassian
[GitHub] [flink] tillrohrmann closed pull request #8391: [FLINK-12416][FLINK-12375] Fix docker build scripts on Flink-1.8
tillrohrmann closed pull request #8391: [FLINK-12416][FLINK-12375] Fix docker build scripts on Flink-1.8 URL: https://github.com/apache/flink/pull/8391 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] Refactor test to not do assertions in @After methods
tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] Refactor test to not do assertions in @After methods URL: https://github.com/apache/flink/pull/8534#discussion_r287437037 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ## @@ -212,30 +212,45 @@ public void checkClusterEmpty() { * Sleep a bit between the tests (we are re-using the YARN cluster for the tests). */ @After - public void sleep() throws IOException, YarnException { - Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10)); + public void shutdownYarnClient() { + yarnClient.stop(); + } - boolean isAnyJobRunning = yarnClient.getApplications().stream() - .anyMatch(YarnTestBase::isApplicationRunning); + protected void runTest(RunnableWithException test) { + Throwable testFailure = null; + try { + test.run(); + } catch (Throwable t) { + testFailure = t; + } - while (deadline.hasTimeLeft() && isAnyJobRunning) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Assert.fail("Should not happen"); - } - isAnyJobRunning = yarnClient.getApplications().stream() + try { + Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10)); + + boolean isAnyJobRunning = yarnClient.getApplications().stream() .anyMatch(YarnTestBase::isApplicationRunning); - } - if (isAnyJobRunning) { - final List runningApps = yarnClient.getApplications().stream() - .filter(YarnTestBase::isApplicationRunning) - .map(app -> "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState() + '.') - .collect(Collectors.toList()); - if (!runningApps.isEmpty()) { - Assert.fail("There is at least one application on the cluster that is not finished." + runningApps); + while (deadline.hasTimeLeft() && isAnyJobRunning) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Assert.fail("Should not happen"); + } + isAnyJobRunning = yarnClient.getApplications().stream() + .anyMatch(YarnTestBase::isApplicationRunning); + } + + if (isAnyJobRunning) { + final List runningApps = yarnClient.getApplications().stream() + .filter(YarnTestBase::isApplicationRunning) + .map(app -> "App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState() + '.') + .collect(Collectors.toList()); + if (!runningApps.isEmpty()) { + Assert.fail("There is at least one application on the cluster that is not finished." + runningApps); + } } + } catch (Throwable t) { + throw new AssertionError(ExceptionUtils.firstOrSuppressed(t, testFailure)); Review comment: I think this won't report the `testFailure` if there is no `t`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] Refactor test to not do assertions in @After methods
tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] Refactor test to not do assertions in @After methods URL: https://github.com/apache/flink/pull/8534#discussion_r287437184 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ## @@ -212,30 +212,45 @@ public void checkClusterEmpty() { * Sleep a bit between the tests (we are re-using the YARN cluster for the tests). */ @After - public void sleep() throws IOException, YarnException { - Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10)); + public void shutdownYarnClient() { + yarnClient.stop(); + } - boolean isAnyJobRunning = yarnClient.getApplications().stream() - .anyMatch(YarnTestBase::isApplicationRunning); + protected void runTest(RunnableWithException test) { + Throwable testFailure = null; + try { + test.run(); + } catch (Throwable t) { + testFailure = t; + } - while (deadline.hasTimeLeft() && isAnyJobRunning) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Assert.fail("Should not happen"); - } - isAnyJobRunning = yarnClient.getApplications().stream() + try { Review comment: I think this block should be the `finally` block of the `try { test.run() } finally { ... }` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] Refactor test to not do assertions in @After methods
tillrohrmann commented on a change in pull request #8534: [FLINK-12614][yarn] Refactor test to not do assertions in @After methods URL: https://github.com/apache/flink/pull/8534#discussion_r287437317 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ## @@ -212,30 +212,45 @@ public void checkClusterEmpty() { * Sleep a bit between the tests (we are re-using the YARN cluster for the tests). */ @After - public void sleep() throws IOException, YarnException { - Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10)); + public void shutdownYarnClient() { + yarnClient.stop(); + } - boolean isAnyJobRunning = yarnClient.getApplications().stream() - .anyMatch(YarnTestBase::isApplicationRunning); + protected void runTest(RunnableWithException test) { + Throwable testFailure = null; + try { + test.run(); + } catch (Throwable t) { + testFailure = t; Review comment: No need to catch `t` if the application shut down is done in a `finally` block. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12546) Base Docker images on `library/flink`
[ https://issues.apache.org/jira/browse/FLINK-12546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847710#comment-16847710 ] Yun Tang edited comment on FLINK-12546 at 5/24/19 4:42 PM: --- [~knaufk] Thanks for your interpretation, and I would agree with you that {{from-archive}} option seems not so useful in this view. The only thing I use {{from-archive}} is when I try to fix FLINK-12416. I found if I always use {{from-release}} to verify the {{Dockerfile}} change, it has to download the archive Flink package from release site every time which really consume much time for me. For users, if he has to verify its user jar many times with {{from-release}} option, downloading the release archive would take too much time. I think we should add a cache feature. However, I just wonder if we base our image on [https://github.com/docker-flink/docker-flink], how we handle the Flink images before Flink-1.7. As you can see, only Flink-1.7 and Flink-1.8 images would be built. Moreover, if {{Flink}} relays on {{docker-flink}} to build docker image while {{docker-flink}} would also depend on {{Flink}} release, do you think this is a bit weird? How about involve {{docker-flink}} within {{Flink}}? was (Author: yunta): [~knaufk] Thanks for your interpretation, and I would agree with you that {{from-archive}} option seems not so useful in this view. The only thing I use {{from-archive}} is when I try to fix FLINK-12416. I found if I always use {{from-release}} to verify the {{Dockerfile}} change, it has to download the archive Flink package from release site every time which really consume much time for me. For users, if he has to verify its user jar many times with {{from-release}} option, downloading the release archive would take too much time. I think we should add a cache future. However, I just wonder if we base our image on [https://github.com/docker-flink/docker-flink], how we handle the Flink images before Flink-1.7. As you can see, only Flink-1.7 and Flink-1.8 images would be built. Moreover, if {{Flink}} relays on {{docker-flink}} to build docker image while {{docker-flink}} would also depend on {{Flink}} release, do you think this is a bit weird? How about involve {{docker-flink}} within {{Flink}}? > Base Docker images on `library/flink` > - > > Key: FLINK-12546 > URL: https://issues.apache.org/jira/browse/FLINK-12546 > Project: Flink > Issue Type: Improvement > Components: Deployment / Docker >Affects Versions: 1.8.0 >Reporter: Konstantin Knauf >Priority: Major > > Currently, Flink users turn to two different places when looking for > "official" community Docker images. > * https://github.com/docker-flink/docker-flink (Flink community maintained, > no official Apache releases) or https://hub.docker.com/_/flink/ > * The tooling and Dockerfile in the {{flink-container}} component > While users should turn to the Flink images on Docker Hub in general, the > {{flink-container}} component is used by many users, because it contains some > tooling to build images for the {{StandaloneJobClusterEntrypoint}}. Overall, > this causes confusion for users and the community needs to maintain two > Dockerfiles, which build Flink images FROM alpine or debian. > Therefore, I propose to change the tooling ({{build.sh}}) in > {{flink-container}} to have only two options: > a) {{from-release}}, which uses `library/flink` as base image and basically > only adds the user jar. (<--- for Flink users) > b) {{from-local-dist}}, which also uses `library/flink` as base image but > replaces the flink-dist by the local flink-dist ( <--- for Flink developer) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12546) Base Docker images on `library/flink`
[ https://issues.apache.org/jira/browse/FLINK-12546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847710#comment-16847710 ] Yun Tang commented on FLINK-12546: -- [~knaufk] Thanks for your interpretation, and I would agree with you that {{from-archive}} option seems not so useful in this view. The only thing I use {{from-archive}} is when I try to fix FLINK-12416. I found if I always use {{from-release}} to verify the {{Dockerfile}} change, it has to download the archive Flink package from release site every time which really consume much time for me. For users, if he has to verify its user jar many times with {{from-release}} option, downloading the release archive would take too much time. I think we should add a cache future. However, I just wonder if we base our image on [https://github.com/docker-flink/docker-flink], how we handle the Flink images before Flink-1.7. As you can see, only Flink-1.7 and Flink-1.8 images would be built. Moreover, if {{Flink}} relays on {{docker-flink}} to build docker image while {{docker-flink}} would also depend on {{Flink}} release, do you think this is a bit weird? How about involve {{docker-flink}} within {{Flink}}? > Base Docker images on `library/flink` > - > > Key: FLINK-12546 > URL: https://issues.apache.org/jira/browse/FLINK-12546 > Project: Flink > Issue Type: Improvement > Components: Deployment / Docker >Affects Versions: 1.8.0 >Reporter: Konstantin Knauf >Priority: Major > > Currently, Flink users turn to two different places when looking for > "official" community Docker images. > * https://github.com/docker-flink/docker-flink (Flink community maintained, > no official Apache releases) or https://hub.docker.com/_/flink/ > * The tooling and Dockerfile in the {{flink-container}} component > While users should turn to the Flink images on Docker Hub in general, the > {{flink-container}} component is used by many users, because it contains some > tooling to build images for the {{StandaloneJobClusterEntrypoint}}. Overall, > this causes confusion for users and the community needs to maintain two > Dockerfiles, which build Flink images FROM alpine or debian. > Therefore, I propose to change the tooling ({{build.sh}}) in > {{flink-container}} to have only two options: > a) {{from-release}}, which uses `library/flink` as base image and basically > only adds the user jar. (<--- for Flink users) > b) {{from-local-dist}}, which also uses `library/flink` as base image but > replaces the flink-dist by the local flink-dist ( <--- for Flink developer) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-495702974 @fhueske @twalthr could you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints
[ https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847702#comment-16847702 ] vinoyang commented on FLINK-10855: -- [~yunta] I think [~till.rohrmann] and I have reached an agreement on how to achieve that (I'm just looking to see if [~srichter] still has any additions). I usually assign issues to me because of two considerations: (1) this is also a problem we encounter internally; (2) I have the ability to handle it; I have a lot of issues and PR to follow up. But in the period after the 1.8 release, I focused mainly on checkpoint modules, so it seems that I just randomly assigned an issue to myself, which is not the case. We also have our own solutions to this problem. Thank you for your suggestion. I don't think a job with too many tasks will have a significant impact on cleanup. Basically, I think a simple strategy is that the cleaner records the metadata information (intervals, timeouts) of checkpoints, which it uses, and it doesn't always scan the file system. Thank you for your suggestion. I will try to take it into account when dealing with other checkpoints issue. I will try my best to balance priorities. Thank you. > CheckpointCoordinator does not delete checkpoint directory of late/failed > checkpoints > - > > Key: FLINK-10855 > URL: https://issues.apache.org/jira/browse/FLINK-10855 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > > In case that an acknowledge checkpoint message is late or a checkpoint cannot > be acknowledged, we discard the subtask state in the > {{CheckpointCoordinator}}. What's not happening in this case is that we > delete the parent directory of the checkpoint. This only happens when we > dispose a {{PendingCheckpoint#dispose}}. > Due to this behaviour it can happen that a checkpoint fails (e.g. a task not > being ready) and we delete the checkpoint directory. Next another task writes > its checkpoint data to the checkpoint directory (thereby creating it again) > and sending an acknowledge message back to the {{CheckpointCoordinator}}. The > {{CheckpointCoordinator}} will realize that there is no longer a > {{PendingCheckpoint}} and will discard the sub task state. This will remove > the state files from the checkpoint directory but will leave the checkpoint > directory untouched. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r287433479 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ## @@ -493,6 +497,59 @@ public boolean accept(File dir, String name) { } } + public static boolean verifyTokenKindInContainerCredentials(final String[] tokens, final String containerId) { + List tokenList = Arrays.asList(tokens); Review comment: if `tokens` is of type `Collection`, then we would not need to convert it here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r287432149 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ## @@ -80,6 +80,9 @@ /** Yarn site xml file name populated in YARN container for secure IT run. */ public static final String YARN_SITE_FILE_NAME = "yarn-site.xml"; + /** AM RM delegation token name populated in YARN container. */ + public static final String AM_RM_TOKEN_NAME = "YARN_AM_RM_TOKEN"; Review comment: Where is it specified that the AM_RM token is called like this across all Yarn versions? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r287433179 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ## @@ -493,6 +497,59 @@ public boolean accept(File dir, String name) { } } + public static boolean verifyTokenKindInContainerCredentials(final String[] tokens, final String containerId) { + List tokenList = Arrays.asList(tokens); + File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY)); + if (!cwd.exists() || !cwd.isDirectory()) { + return false; + } + + File containerTokens = findFile(cwd.getAbsolutePath(), new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.equals(containerId + ".tokens"); + } + }); + + if (containerTokens != null) { + LOG.info("Verifying tokens in {}", containerTokens.getAbsolutePath()); + + Credentials tmCredentials = null; + try { + tmCredentials = Credentials.readTokenStorageFile(containerTokens, new Configuration()); + } catch (IOException e) { + LOG.warn("Unable to read credential file: " + e.getMessage() + " file: " + containerTokens.getAbsolutePath()); Review comment: SHouldn't we simply let the exception bubble up and let the test fail? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
tillrohrmann commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r287431982 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ## @@ -565,7 +568,20 @@ static ContainerLaunchContext createTaskExecutorContext( new File(fileLocation), HadoopUtils.getHadoopConfiguration(flinkConfig)); - cred.writeTokenStorageToStream(dob); + // Filter out AMRMToken before setting the tokens to the TaskManager container context. + Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens"); + Credentials taskManagerCred = new Credentials(); + final Text amRmTokenKind = new Text(AM_RM_TOKEN_NAME); + Collection> userTokens = + (Collection>) getAllTokensMethod.invoke(cred); Review comment: No need for reflection. Simply call `cred.getAllTokens()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken commented on issue #8265: [FLINK-12302][runtime] Fixed the wrong finalStatus of yarn application when application finished
lamber-ken commented on issue #8265: [FLINK-12302][runtime] Fixed the wrong finalStatus of yarn application when application finished URL: https://github.com/apache/flink/pull/8265#issuecomment-495695056 > Thanks, I'll take a look @lamber-ken you're welcome, if have any question, ping me any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10713) RestartIndividualStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847683#comment-16847683 ] Yun Tang commented on FLINK-10713: -- Since new scheduling strategy is already started in FLINK-10429 , I think this Jira would be useless. > RestartIndividualStrategy does not restore state > > > Key: FLINK-10713 > URL: https://issues.apache.org/jira/browse/FLINK-10713 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Assignee: Yun Tang >Priority: Critical > > RestartIndividualStrategy does not perform any state restore. This is big > problem because all restored regions will be restarted with empty state. We > need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN
tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN URL: https://github.com/apache/flink/pull/8438#discussion_r287425485 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java ## @@ -49,6 +49,14 @@ .defaultValue(-1) .withDescription("The port where the application master RPC system is listening."); + /** +* The vcores used by YARN application master. +*/ + public static final ConfigOption APP_MASTER_VCORES = + key("yarn.appmaster.vcores") + .defaultValue(1) + .withDescription("The number of virtual cores (vcores) used by YARN application master."); Review comment: I guess we need to regenerate the configuration documentation because we added a new config option: `mvn package -Dgenerate-config-docs -pl flink-docs`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN
tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN URL: https://github.com/apache/flink/pull/8438#discussion_r287425131 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ## @@ -229,6 +229,13 @@ private void isReadyForDeployment(ClusterSpecification clusterSpecification) thr throw new YarnDeploymentException("Flink configuration object has not been set"); } + int configuredAmVcores = flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES); + if (configuredAmVcores < YarnConfigOptions.APP_MASTER_VCORES.defaultValue()) { Review comment: I think we should compare `configuredAmVcores` against `numYarnMaxVcores` which we calculate a bit further down. I think we don't have to check that we request fewer cores than `1` because it won't take any effect. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN
tillrohrmann commented on a change in pull request #8438: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN URL: https://github.com/apache/flink/pull/8438#discussion_r287425587 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java ## @@ -147,6 +147,41 @@ public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentExc } } + @Test + public void testFailIfAppMasterVCoreLessThanDefaultVcores() throws ClusterDeploymentException { + final Configuration flinkConfiguration = new Configuration(); + flinkConfiguration.setInteger(YarnConfigOptions.APP_MASTER_VCORES, 0); + + YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( + flinkConfiguration, + yarnConfiguration, + temporaryFolder.getRoot().getAbsolutePath(), + yarnClient, + true); + + clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath())); + + ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() + .setMasterMemoryMB(1) + .setTaskManagerMemoryMB(1024) + .setNumberTaskManagers(1) + .setSlotsPerTaskManager(1) + .createClusterSpecification(); + + try { + clusterDescriptor.deploySessionCluster(clusterSpecification); + + fail("The deploy call should have failed."); + } catch (ClusterDeploymentException e) { + // we expect the cause to be an IllegalConfigurationException + if (!(e.getCause() instanceof IllegalConfigurationException)) { + throw e; + } + } finally { + clusterDescriptor.close(); + } + } Review comment: I think we don't need this test here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10855) CheckpointCoordinator does not delete checkpoint directory of late/failed checkpoints
[ https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847677#comment-16847677 ] Yun Tang commented on FLINK-10855: -- Thanks [~yanghua] to start this Jira again. I just wonder whether a checkpoint cleaner could really solve FLINK-11662 . If we have 1000 tasks here and task-1 just declined the checkpoint but other tasks would keep checkpointing. The timing to clear the checkpoint directory is a tough problem since we cannot guarantee other tasks would create that parent checkpoint directory again. Generally, filesystem like HDFS would create the non-existing parent folder when writing to a file in the sub folder. Then should we need to clean up the parent folder again at what time? I planed to refactor the checkpoint directory layout in FLINK-10930 but seems not so widely accepted. If we put FLINK-11662 aside to ignore that bug and focus on the checkpoint cleaner logic, we already implemented an version of cleanup in our internal Flink last year.There exist several points to implement this based on our experience: # Reduce the cost of listing files: I already created a issue before: FLINK-11868 . # When to list files: at least when job failover or region failover. # When to delete useless files: we found delete would take more time than listing, it should stay in the async thread. # Scan files synchronously or asynchronously: asynchronously scan would not block theJM main thread but could not delete files in time. Since you assign this issue to yourself too eagerly but did not take any progress until now, and from the other JIRA's lesson, I think I could also attach a design doc based on our internal version of cleaner. > CheckpointCoordinator does not delete checkpoint directory of late/failed > checkpoints > - > > Key: FLINK-10855 > URL: https://issues.apache.org/jira/browse/FLINK-10855 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > > In case that an acknowledge checkpoint message is late or a checkpoint cannot > be acknowledged, we discard the subtask state in the > {{CheckpointCoordinator}}. What's not happening in this case is that we > delete the parent directory of the checkpoint. This only happens when we > dispose a {{PendingCheckpoint#dispose}}. > Due to this behaviour it can happen that a checkpoint fails (e.g. a task not > being ready) and we delete the checkpoint directory. Next another task writes > its checkpoint data to the checkpoint directory (thereby creating it again) > and sending an acknowledge message back to the {{CheckpointCoordinator}}. The > {{CheckpointCoordinator}} will realize that there is no longer a > {{PendingCheckpoint}} and will discard the sub task state. This will remove > the state files from the checkpoint directory but will leave the checkpoint > directory untouched. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287419849 ## File path: flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ## @@ -102,4 +102,20 @@ public static final ConfigOption OFFLOAD_MINSIZE = key("blob.offload.minsize") .defaultValue(1_024 * 1_024) // 1MiB by default .withDescription("The minimum size for messages to be offloaded to the BlobServer."); + + /** +* The socket timeout in milliseconds for the blob client. +*/ + public static final ConfigOption SO_TIMEOUT = + key("blob.client.socket.timeout") + .defaultValue(120_000) Review comment: How did you decide on 2 minutes for the timeouts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287419923 ## File path: flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ## @@ -102,4 +102,20 @@ public static final ConfigOption OFFLOAD_MINSIZE = key("blob.offload.minsize") .defaultValue(1_024 * 1_024) // 1MiB by default .withDescription("The minimum size for messages to be offloaded to the BlobServer."); + + /** +* The socket timeout in milliseconds for the blob client. +*/ + public static final ConfigOption SO_TIMEOUT = + key("blob.client.socket.timeout") + .defaultValue(120_000) + .withDescription("The socket timeout in milliseconds for the blob client."); + + /** +* The connection timeout in milliseconds for the blob client. +*/ + public static final ConfigOption CONNECT_TIMEOUT = + key("blob.client.connect.timeout") + .defaultValue(120_000) Review comment: Same here, why 2 minutes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287419036 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ## @@ -487,4 +488,58 @@ private static void uploadJarFile( validateGetAndClose(blobClient.getInternal(jobId, blobKeys.get(0)), testFile); } } + + + /** +* Tests the socket operation timeout. +*/ + @Test + public void testSocketTimeout() { + Configuration clientConfig = getBlobClientConfig(); + int oldSoTimeout = clientConfig.getInteger(BlobServerOptions.SO_TIMEOUT); + + clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, 50); + getBlobServer().setBlockingMillis(10_000); + + try { + InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + + try (BlobClient client = new BlobClient(serverAddress, clientConfig)) { + client.getInternal(new JobID(), BlobKey.createKey(TRANSIENT_BLOB)); + + fail("Should throw a exception."); + } catch (Throwable t) { + assertEquals(java.net.SocketTimeoutException.class, ExceptionUtils.stripException(t, IOException.class).getClass()); + } + } finally { + clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, oldSoTimeout); Review comment: We could create a new test class which contains all blob server tests which need to start a new blob server for each test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287419668 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java ## @@ -70,7 +70,7 @@ public static void startNonSSLServer() throws IOException { config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath()); config.setBoolean(BlobServerOptions.SSL_ENABLED, false); - blobNonSslServer = new BlobServer(config, new VoidBlobStore()); + blobNonSslServer = new TestBlobServer(config, new VoidBlobStore()); Review comment: Why do we use a `TestBlobServer` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287419507 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ## @@ -487,4 +488,58 @@ private static void uploadJarFile( validateGetAndClose(blobClient.getInternal(jobId, blobKeys.get(0)), testFile); } } + + + /** +* Tests the socket operation timeout. +*/ + @Test + public void testSocketTimeout() { + Configuration clientConfig = getBlobClientConfig(); + int oldSoTimeout = clientConfig.getInteger(BlobServerOptions.SO_TIMEOUT); + + clientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, 50); + getBlobServer().setBlockingMillis(10_000); + + try { + InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + + try (BlobClient client = new BlobClient(serverAddress, clientConfig)) { + client.getInternal(new JobID(), BlobKey.createKey(TRANSIENT_BLOB)); + + fail("Should throw an exception."); + } catch (Throwable t) { + assertEquals(java.net.SocketTimeoutException.class, ExceptionUtils.stripException(t, IOException.class).getClass()); Review comment: We could also use `assertThat(ExceptionUtils.findThrowable(t, java.net.SocketTimeoutException.class).isDefined(), is(true))` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
eaglewatcherwb commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r287422649 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adapter; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.EMPTY; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.RELEASED; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Default implementation of {@link SchedulingResultPartition}. + */ +public class DefaultResultPartition implements SchedulingResultPartition { + + private final IntermediateResultPartitionID resultPartitionId; + + private final IntermediateDataSetID intermediateDataSetId; + + private final ResultPartitionType partitionType; + + private SchedulingExecutionVertex producer; + + private final List consumers; + + DefaultResultPartition( + IntermediateResultPartitionID partitionId, + IntermediateDataSetID intermediateDataSetId, + ResultPartitionType partitionType) { + this.resultPartitionId = checkNotNull(partitionId); + this.intermediateDataSetId = checkNotNull(intermediateDataSetId); + this.partitionType = checkNotNull(partitionType); + this.consumers = new ArrayList<>(); + } + + @Override + public IntermediateResultPartitionID getId() { + return resultPartitionId; + } + + @Override + public IntermediateDataSetID getResultId() { + return intermediateDataSetId; + } + + @Override + public ResultPartitionType getPartitionType() { + return partitionType; + } + + @Override + public ResultPartitionState getState() { + switch (producer.getState()) { + case CREATED: + case SCHEDULED: + case DEPLOYING: + return EMPTY; + case RUNNING: + return PRODUCING; + case FINISHED: + return DONE; + default: + return RELEASED; Review comment: OK, in the latest PR, I return `EMPTY` for the failure state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287421208 ## File path: flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ## @@ -102,4 +102,20 @@ public static final ConfigOption OFFLOAD_MINSIZE = key("blob.offload.minsize") .defaultValue(1_024 * 1_024) // 1MiB by default .withDescription("The minimum size for messages to be offloaded to the BlobServer."); + + /** +* The socket timeout in milliseconds for the blob client. +*/ + public static final ConfigOption SO_TIMEOUT = + key("blob.client.socket.timeout") + .defaultValue(120_000) Review comment: By setting it to 2 minutes, we might risk that we break existing setups if 2 minutes is too low for them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client
tillrohrmann commented on a change in pull request #8484: [FLINK-12547] Add connection and socket timeouts for the blob client URL: https://github.com/apache/flink/pull/8484#discussion_r287419709 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java ## @@ -58,7 +58,7 @@ public static void startSSLServer() throws IOException { Configuration config = SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(); config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporarySslFolder.newFolder().getAbsolutePath()); - blobSslServer = new BlobServer(config, new VoidBlobStore()); + blobSslServer = new TestBlobServer(config, new VoidBlobStore()); Review comment: Why `TestBlobServer`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11947) Support MapState key / value schema evolution for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847669#comment-16847669 ] Cliff Resnick commented on FLINK-11947: --- Does the deescalation from Blocker mean that this will not likely be fixed by 1.9? On Fri, May 24, 2019, 10:15 AM Aljoscha Krettek (JIRA) > Support MapState key / value schema evolution for RocksDB > - > > Key: FLINK-11947 > URL: https://issues.apache.org/jira/browse/FLINK-11947 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System, Runtime / State Backends >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > > Currently, we do not attempt to perform state schema evolution if the key or > value's schema of a user {{MapState}} has changed when using {{RocksDB}}: > https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542 > This was disallowed in the initial support for state schema evolution because > the way we did state evolution in the RocksDB state backend was simply > overwriting values. > For {{MapState}} key evolution, only overwriting RocksDB values does not > work, since RocksDB entries for {{MapState}} uses a composite key containing > the map state key. This means that when evolving {{MapState}} in this case > with an evolved key schema, we will have new entries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Aitozi commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
Aitozi commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#issuecomment-495687292 Hi @pnowojski , what is the reason for the second point ? > This is not true for even slight data skew And +1 for adding the `inFloatingPoolUsage` specialized for detecting bottleneck. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12426) TM occasionally hang in deploying state
[ https://issues.apache.org/jira/browse/FLINK-12426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847653#comment-16847653 ] Till Rohrmann commented on FLINK-12426: --- Could it be that FLINK-12547 tries to solve the same problem [~QiLuo]? If yes, then let's close this issue as a duplicate, because the other issue already contains a fix. > TM occasionally hang in deploying state > --- > > Key: FLINK-12426 > URL: https://issues.apache.org/jira/browse/FLINK-12426 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Qi >Priority: Major > > Hi all, > > We use Flink batch and start thousands of jobs per day. Occasionally we > observed some stuck jobs, due to some TM hang in “DEPLOYING” state. > > It seems that the TM is calling BlobClient to download jars from > JM/BlobServer. Under hood it’s calling Socket.connect() and then > Socket.read() to retrieve results. > > These jobs usually have many TM slots (1~2k). We checked the TM log and > dumped the TM thread. It indeed hung on socket read to download jar from Blob > server. > > We're using Flink 1.5 but this may also affect later versions since related > code are not changed much. We've tried to add socket timeout in BlobClient, > but still no luck. > > > TM log > > ... > INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task > DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) > (184/2000). > INFO org.apache.flink.runtime.taskmanager.Task - DataSource (at > createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000) switched > from CREATED to DEPLOYING. > INFO org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream > leak safety net for task DataSource (at > createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000) [DEPLOYING] > INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task > DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) > (184/2000) [DEPLOYING]. > INFO org.apache.flink.runtime.blob.BlobClient - Downloading > 19e65c0caa41f264f9ffe4ca2a48a434/p-3ecd6341bf97d5512b14c93f6c9f51f682b6db26-37d5e69d156ee00a924c1ebff0c0d280 > from some-host-ip-port > {color:#22}no more logs...{color} > > > TM thread dump: > > _"DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) > (1999/2000)" #72 prio=5 os_prio=0 tid=0x7fb9a1521000 nid=0xa0994 runnable > [0x7fb97cfbf000]_ > _java.lang.Thread.State: RUNNABLE_ > _at java.net.SocketInputStream.socketRead0(Native Method)_ > _at > java.net.SocketInputStream.socketRead(SocketInputStream.java:116)_ > _at java.net.SocketInputStream.read(SocketInputStream.java:171)_ > _at java.net.SocketInputStream.read(SocketInputStream.java:141)_ > _at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152)_ > _at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140)_ > _at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:170)_ > _at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)_ > _at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206)_ > _at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)_ > _- locked <0x00078ab60ba8> (a java.lang.Object)_ > _at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:893)_ > _at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)_ > _at java.lang.Thread.run(Thread.java:748)_ > __ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12547) Deadlock when the task thread downloads jars using BlobClient
[ https://issues.apache.org/jira/browse/FLINK-12547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-12547: -- Component/s: (was: Runtime / Operators) Runtime / Coordination > Deadlock when the task thread downloads jars using BlobClient > - > > Key: FLINK-12547 > URL: https://issues.apache.org/jira/browse/FLINK-12547 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.0 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The jstack is as follows (this jstack is from an old Flink version, but the > master branch has the same problem). > {code:java} > "Source: Custom Source (76/400)" #68 prio=5 os_prio=0 tid=0x7f8139cd3000 > nid=0xe2 runnable [0x7f80da5fd000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > at java.net.SocketInputStream.read(SocketInputStream.java:170) > at java.net.SocketInputStream.read(SocketInputStream.java:141) > at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152) > at > org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140) > at > org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:164) > at > org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) > at > org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > - locked <0x00062cf2a188> (a java.lang.Object) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:968) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:604) > at java.lang.Thread.run(Thread.java:834) > Locked ownable synchronizers: > - None > {code} > > The reason is that SO_TIMEOUT is not set in the socket connection of the blob > client. When the network packet loss seriously due to the high CPU load of > the machine, the blob client connection fails to perceive that the server has > been disconnected, which results in blocking in the native method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods
tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods URL: https://github.com/apache/flink/pull/8154#discussion_r287414640 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ## @@ -197,26 +197,33 @@ public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalExc // public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) - throws CompilerException, ProgramInvocationException { + throws CompilerException, ProgramInvocationException { PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism)); } public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) - throws CompilerException, ProgramInvocationException { - Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); - if (prog.isUsingProgramEntryPoint()) { - return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism); - } else if (prog.isUsingInteractiveMode()) { - // temporary hack to support the optimizer plan preview - OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler); - if (parallelism > 0) { - env.setParallelism(parallelism); - } + throws CompilerException, ProgramInvocationException { + ClassLoader contextCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); + if (prog.isUsingProgramEntryPoint()) { + return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism); + } else if (prog.isUsingInteractiveMode()) { + // temporary hack to support the optimizer plan preview + OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler); + if (parallelism > 0) { + env.setParallelism(parallelism); + } - return env.getOptimizedPlan(prog); - } else { - throw new RuntimeException("Couldn't determine program mode."); + return env.getOptimizedPlan(prog); + } else { + throw new RuntimeException("Couldn't determine program mode."); + } + } finally { + if (contextCl != null) { Review comment: I think we don't need the null check here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods
tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods URL: https://github.com/apache/flink/pull/8154#discussion_r287414688 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ## @@ -197,26 +197,33 @@ public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalExc // public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) - throws CompilerException, ProgramInvocationException { + throws CompilerException, ProgramInvocationException { Review comment: Unrelated change. Please revert. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods
tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods URL: https://github.com/apache/flink/pull/8154#discussion_r287414860 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ## @@ -246,45 +253,51 @@ public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int par * @throws ProgramInvocationException */ public JobSubmissionResult run(PackagedProgram prog, int parallelism) - throws ProgramInvocationException, ProgramMissingJobException { - Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); - if (prog.isUsingProgramEntryPoint()) { - - final JobWithJars jobWithJars = prog.getPlanWithJars(); - - return run(jobWithJars, parallelism, prog.getSavepointSettings()); - } - else if (prog.isUsingInteractiveMode()) { - log.info("Starting program in interactive mode (detached: {})", isDetached()); - - final List libraries = prog.getAllLibraries(); - - ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, - prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(), - prog.getSavepointSettings()); - ContextEnvironment.setAsContext(factory); - - try { - // invoke main method - prog.invokeInteractiveModeForExecution(); - if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) { - throw new ProgramMissingJobException("The program didn't contain a Flink job."); - } - if (isDetached()) { - // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here - return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute(); + throws ProgramInvocationException, ProgramMissingJobException { + ClassLoader contextCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); + if (prog.isUsingProgramEntryPoint()) { + final JobWithJars jobWithJars = prog.getPlanWithJars(); + return run(jobWithJars, parallelism, prog.getSavepointSettings()); + } + else if (prog.isUsingInteractiveMode()) { + log.info("Starting program in interactive mode (detached: {})", isDetached()); + + final List libraries = prog.getAllLibraries(); + + ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, + prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(), + prog.getSavepointSettings()); + ContextEnvironment.setAsContext(factory); + + try { + // invoke main method + prog.invokeInteractiveModeForExecution(); + if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) { + throw new ProgramMissingJobException("The program didn't contain a Flink job."); + } + if (isDetached()) { + // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here + return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute(); + } + else { + // in blocking mode, we execute all Flink jobs contained in the user code and then return here + return this.lastJobExecutionResult; + } } - else { - // in blocking mode, we execute all Flink jobs contained in the user code and then return here - return this.lastJobExecutionResult;
[GitHub] [flink] tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods
tillrohrmann commented on a change in pull request #8154: [Flink-12167] Reset context classloader in run and getOptimizedPlan methods URL: https://github.com/apache/flink/pull/8154#discussion_r287414740 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ## @@ -197,26 +197,33 @@ public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalExc // public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) - throws CompilerException, ProgramInvocationException { + throws CompilerException, ProgramInvocationException { PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism)); } public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) - throws CompilerException, ProgramInvocationException { - Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); - if (prog.isUsingProgramEntryPoint()) { - return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism); - } else if (prog.isUsingInteractiveMode()) { - // temporary hack to support the optimizer plan preview - OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler); - if (parallelism > 0) { - env.setParallelism(parallelism); - } + throws CompilerException, ProgramInvocationException { Review comment: Same here with the formatting. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
tillrohrmann commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology URL: https://github.com/apache/flink/pull/8446#discussion_r287402622 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adapter; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.EMPTY; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.RELEASED; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Default implementation of {@link SchedulingResultPartition}. + */ +public class DefaultResultPartition implements SchedulingResultPartition { + + private final IntermediateResultPartitionID resultPartitionId; + + private final IntermediateDataSetID intermediateDataSetId; + + private final ResultPartitionType partitionType; + + private SchedulingExecutionVertex producer; + + private final List consumers; + + DefaultResultPartition( + IntermediateResultPartitionID partitionId, + IntermediateDataSetID intermediateDataSetId, + ResultPartitionType partitionType) { + this.resultPartitionId = checkNotNull(partitionId); + this.intermediateDataSetId = checkNotNull(intermediateDataSetId); + this.partitionType = checkNotNull(partitionType); + this.consumers = new ArrayList<>(); + } + + @Override + public IntermediateResultPartitionID getId() { + return resultPartitionId; + } + + @Override + public IntermediateDataSetID getResultId() { + return intermediateDataSetId; + } + + @Override + public ResultPartitionType getPartitionType() { + return partitionType; + } + + @Override + public ResultPartitionState getState() { + switch (producer.getState()) { + case CREATED: + case SCHEDULED: + case DEPLOYING: + return EMPTY; + case RUNNING: + return PRODUCING; + case FINISHED: + return DONE; + default: + return RELEASED; Review comment: I guess that in case of a failure we should return the partition state to the initial state (e.g. `EMPTY`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8537: [FLINK-12115][fs] Add support for AzureFS
tillrohrmann commented on a change in pull request #8537: [FLINK-12115][fs] Add support for AzureFS URL: https://github.com/apache/flink/pull/8537#discussion_r287400354 ## File path: flink-filesystems/flink-azure-fs-hadoop/pom.xml ## @@ -119,17 +111,69 @@ under the License. + org.apache.hadoop - org.apache.flink.fs.shaded.hadoop.org.apache.hadoop + org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop - + + + + org.apache.commons + org.apache.flink.fs.shaded.hadoop3.org.apache.commons + + + - com.microsoft.azure.storage - org.apache.flink.fs.shaded.com.microsoft.azure.storage + com.microsoft.azure + org.apache.flink.fs.azure.shaded.com.microsoft.azure + + + + + org.apache.httpcomponents Review comment: If the plugins are used then not. If people drop the jar into `lib` it would still be beneficial. Therefore, I opted for this approach. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8539: [FLINK-12617] [container] StandaloneJobClusterEntrypoint defaults to random JobID for non-HA setups
flinkbot commented on issue #8539: [FLINK-12617] [container] StandaloneJobClusterEntrypoint defaults to random JobID for non-HA setups URL: https://github.com/apache/flink/pull/8539#issuecomment-495666848 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12617) StandaloneJobClusterEntrypoint should default to random JobID for non-HA setups
[ https://issues.apache.org/jira/browse/FLINK-12617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12617: --- Labels: pull-request-available (was: ) > StandaloneJobClusterEntrypoint should default to random JobID for non-HA > setups > > > Key: FLINK-12617 > URL: https://issues.apache.org/jira/browse/FLINK-12617 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Affects Versions: 1.8.0 >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8538: [FLINK-11283] Accessing the key when processing connected keyed stream
flinkbot commented on issue #8538: [FLINK-11283] Accessing the key when processing connected keyed stream URL: https://github.com/apache/flink/pull/8538#issuecomment-495666295 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] knaufk opened a new pull request #8539: [FLINK-12617] [container] StandaloneJobClusterEntrypoint defaults to random JobID for non-HA setups
knaufk opened a new pull request #8539: [FLINK-12617] [container] StandaloneJobClusterEntrypoint defaults to random JobID for non-HA setups URL: https://github.com/apache/flink/pull/8539 ## What is the purpose of the change ## Brief change log * StandaloneJobClusterEntrypoint defaults to random JobID for non-HA setups * refactor StandaloneJobClusterConfigurationParserFactoryTest to use temporary folder for global Flink configuration ## Verifying this change - run unit tests in flink-container to check the correct defaults are - build Flink locally, build job-cluster image - run a job without HA and check that JobID is random ``` ./build.sh --job-jar ../../flink-examples/flink-examples-streaming/target/flink-examples-streaming_2.11-1.9-SNAPSHOT-TopSpeedWindowing.jar --from-local-dist FLINK_JOB=org.apache.flink.streaming.examples.windowing.TopSpeedWindowing docker-compose up ``` - run a job without HA and explicit JobID and check that parameter takes precedence ``` FLINK_JOB_ARGUMENTS="--job-id " FLINK_JOB=org.apache.flink.streaming.examples.windowing.TopSpeedWindowing docker-compose up ``` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aljoscha opened a new pull request #8538: [FLINK-11283] Accessing the key when processing connected keyed stream
aljoscha opened a new pull request #8538: [FLINK-11283] Accessing the key when processing connected keyed stream URL: https://github.com/apache/flink/pull/8538 Cleaned up and rebased version of #7470, for running Travis tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
pnowojski commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#issuecomment-495661903 > The current inputBufferUsage is only reflecting the usage of floating buffers, and it could give an indicator whether the setting of floating-buffers-per-gate is enough for performance tuning. I agree, there is a value of the current semantic. I would even build upon that. If you want to detect bottleneck, current semantic of tracking just floating buffers is perfect for that. If current operator (reading from this input) is causing a bottleneck it well can be that only a single one input channel is back-pressured. At the moment we could detect this situation with "all floating buffers are exhausted/used". > We could always assume if the exclusive buffers are enough for one channel, it will not request floating buffers. In other words, if we see the floating buffers are used, that means the exclusive buffers should also be used. This is not true for even slight data skew. I think that there is a value of tracking separately floating and exclusive buffers. Maybe we should make `inPoolUsage` track `floating + exclusive` buffers (it would make it consistent with non credit based mode), while we could add `inFloatingPoolUsage` metric with current semantic? What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8537: [FLINK-12115][fs] Add support for AzureFS
zentol commented on a change in pull request #8537: [FLINK-12115][fs] Add support for AzureFS URL: https://github.com/apache/flink/pull/8537#discussion_r287392198 ## File path: flink-filesystems/flink-azure-fs-hadoop/pom.xml ## @@ -119,17 +111,69 @@ under the License. + org.apache.hadoop - org.apache.flink.fs.shaded.hadoop.org.apache.hadoop + org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop - + + + + org.apache.commons + org.apache.flink.fs.shaded.hadoop3.org.apache.commons + + + - com.microsoft.azure.storage - org.apache.flink.fs.shaded.com.microsoft.azure.storage + com.microsoft.azure + org.apache.flink.fs.azure.shaded.com.microsoft.azure + + + + + org.apache.httpcomponents Review comment: do we still need all this now that we have plugins? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12222) 'start-scala-shell -a ' doesn't ship jars to yarn container
[ https://issues.apache.org/jira/browse/FLINK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847597#comment-16847597 ] Jeff Zhang commented on FLINK-1: Sorry [~Zentol] It is due to my environment issue, I will close it. > 'start-scala-shell -a ' doesn't ship jars to yarn container > - > > Key: FLINK-1 > URL: https://issues.apache.org/jira/browse/FLINK-1 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Scala Shell >Affects Versions: 1.8.0 >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > > If user use `flink run` to submit flink jars, he could build uber jar to > include all its dependencies, but in flink scala-shell, there's no way to > ship custom jars. Currently `start-scala-shell -a ` only put the jars > on the classpath of client side, but doesn't ship it to yarn container. This > make it impossible to use these jars in flink tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12222) 'start-scala-shell -a ' doesn't ship jars to yarn container
[ https://issues.apache.org/jira/browse/FLINK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Zhang closed FLINK-1. -- Resolution: Invalid > 'start-scala-shell -a ' doesn't ship jars to yarn container > - > > Key: FLINK-1 > URL: https://issues.apache.org/jira/browse/FLINK-1 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Scala Shell >Affects Versions: 1.8.0 >Reporter: Jeff Zhang >Assignee: Jeff Zhang >Priority: Major > > If user use `flink run` to submit flink jars, he could build uber jar to > include all its dependencies, but in flink scala-shell, there's no way to > ship custom jars. Currently `start-scala-shell -a ` only put the jars > on the classpath of client side, but doesn't ship it to yarn container. This > make it impossible to use these jars in flink tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12297) Clean the closure for OutputTags in PatternStream
[ https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847590#comment-16847590 ] Aljoscha Krettek commented on FLINK-12297: -- {{OutputTag}} needs to be an anonymous inner class (or a concrete subclass, at least) so that the Java compiler will put the information about the generic parameters in the generated code. Thats why you see {{new OutputTag()}}, notice the {{{}}}. > Clean the closure for OutputTags in PatternStream > - > > Key: FLINK-12297 > URL: https://issues.apache.org/jira/browse/FLINK-12297 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.8.0 >Reporter: Dawid Wysakowicz >Assignee: aitozi >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Right now we do not invoke closure cleaner on output tags. Therefore such > code: > {code} > @Test > public void testFlatSelectSerialization() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStreamSource elements = env.fromElements(1, 2, 3); > OutputTag outputTag = new OutputTag("AAA") {}; > CEP.pattern(elements, Pattern.begin("A")).flatSelect( > outputTag, > new PatternFlatTimeoutFunction() { > @Override > public void timeout( > Map> pattern, > long timeoutTimestamp, > Collector out) throws > Exception { > } > }, > new PatternFlatSelectFunction() { > @Override > public void flatSelect(Map List> pattern, Collector out) throws Exception { > } > } > ); > env.execute(); > } > {code} > will fail with {{The implementation of the PatternFlatSelectAdapter is not > serializable. }} exception -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11107) Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured
[ https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-11107: - Summary: Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured (was: [state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured) > Avoid memory stateBackend to create arbitrary folders under HA path when no > checkpoint path configured > -- > > Key: FLINK-11107 > URL: https://issues.apache.org/jira/browse/FLINK-11107 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.6.2, 1.7.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > > Currently, memory state-backend would create a folder named with random UUID > under HA directory if no checkpoint path ever configured. (the code logic > locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) > However, the default memory state-backend would not only be created on JM > side, but also on each task manager's side, which means many folders with > random UUID would be created under HA directory. It would result in exception > like: > {noformat} > The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 > items=1048576{noformat} > If this happens, no new jobs could be submitted only if we clean up those > directories manually. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11947) Support MapState key / value schema evolution for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-11947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-11947: - Priority: Critical (was: Blocker) > Support MapState key / value schema evolution for RocksDB > - > > Key: FLINK-11947 > URL: https://issues.apache.org/jira/browse/FLINK-11947 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System, Runtime / State Backends >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > > Currently, we do not attempt to perform state schema evolution if the key or > value's schema of a user {{MapState}} has changed when using {{RocksDB}}: > https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542 > This was disallowed in the initial support for state schema evolution because > the way we did state evolution in the RocksDB state backend was simply > overwriting values. > For {{MapState}} key evolution, only overwriting RocksDB values does not > work, since RocksDB entries for {{MapState}} uses a composite key containing > the map state key. This means that when evolving {{MapState}} in this case > with an evolved key schema, we will have new entries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] piyushnarang commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS
piyushnarang commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS URL: https://github.com/apache/flink/pull/8537#issuecomment-495638250 Thanks for spinning this up @tillrohrmann, will review and chime in. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287368246 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java ## @@ -504,26 +441,11 @@ public void testUpdateTaskInputPartitionsFailure() throws Exception { */ @Test(timeout = 1L) public void testLocalPartitionNotFound() throws Exception { - final ExecutionAttemptID eid = new ExecutionAttemptID(); - - final IntermediateDataSetID resultId = new IntermediateDataSetID(); - final ResultPartitionID partitionId = new ResultPartitionID(); - - final ResultPartitionLocation loc = ResultPartitionLocation.createLocal(); - - final InputChannelDeploymentDescriptor[] inputChannelDeploymentDescriptors = - new InputChannelDeploymentDescriptor[] { - new InputChannelDeploymentDescriptor(partitionId, loc)}; - - final InputGateDeploymentDescriptor inputGateDeploymentDescriptor = - new InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, inputChannelDeploymentDescriptors); - - final TaskDeploymentDescriptor tdd = - createTestTaskDeploymentDescriptor("Receiver", - eid, - Tasks.AgnosticReceiver.class, - 1, Collections.emptyList(), - Collections.singletonList(inputGateDeploymentDescriptor)); + ResourceID producerLocation = new ResourceID("local"); + DefaultShuffleDeploymentDescriptor sdd = + createSddWithLocalConnection(new IntermediateResultPartitionID(), producerLocation, 1); + TaskDeploymentDescriptor tdd = createReceiver(sdd, producerLocation); Review comment: it should result in identifying that the partition is located locally in SingleInputGateFactory on the consumer side, by comparing producer and consumer resource ids. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS
tillrohrmann commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS URL: https://github.com/apache/flink/pull/8537#issuecomment-495633301 @shuai-xu do you have time to test the Azure FS implementation? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS
flinkbot commented on issue #8537: [FLINK-12115][fs] Add support for AzureFS URL: https://github.com/apache/flink/pull/8537#issuecomment-495632841 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann opened a new pull request #8537: [FLINK-12115][fs] Add support for AzureFS
tillrohrmann opened a new pull request #8537: [FLINK-12115][fs] Add support for AzureFS URL: https://github.com/apache/flink/pull/8537 This PR is based on #8117. It decreases the dependency footprint and adds a proper NOTICE file to it. cc @piyushnarang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287362965 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java ## @@ -483,12 +419,13 @@ public void testUpdateTaskInputPartitionsFailure() throws Exception { tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get(); taskRunningFuture.get(); + ResourceID producerLocation = new ResourceID("local"); Review comment: I think it would be better to use random `ResourceID.generate()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services