[jira] [Commented] (FLINK-12389) flink codegen set String type for ByteBuffer fields

2019-05-01 Thread Yu Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831429#comment-16831429
 ] 

Yu Yang commented on FLINK-12389:
-

[~fhue...@gmail.com]  could you share any pointers on how we should resolve 
this issue? 

> flink codegen set String type for ByteBuffer fields
> ---
>
> Key: FLINK-12389
> URL: https://issues.apache.org/jira/browse/FLINK-12389
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.8.0
>Reporter: Yu Yang
>Priority: Major
>
> We try to write a simple flink sql program using "select  .. from " 
> statement, and encounter  a compile exception. 
> *Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"*
> Further debugging shows that the following flink-generated code snippet 
> caused problem: 
> {code}
>   final java.lang.reflect.Field 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds =
>   org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField(
> com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds");
> ...
> boolean isNull$5 = (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == 
> null;
> java.lang.String result$4;
> if (isNull$5) {
>   result$4 = "";
> }
> else {
>   result$4 = (java.lang.String) (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1);
> }
>
> {code}
> The following is the stack track:
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 17 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049)
>   at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)
>   at org.codehaus.janino.Java$Cast.accept(Java.java:4887)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790)
>   at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732)
>   at org.codehaus.janino.Java$Assignment.accept(Java.java:4466)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>   at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>   at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>   at 
> or

[jira] [Comment Edited] (FLINK-12389) flink codegen set String type for ByteBuffer fields

2019-05-01 Thread Yu Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831429#comment-16831429
 ] 

Yu Yang edited comment on FLINK-12389 at 5/2/19 6:30 AM:
-

[~fhue...@gmail.com]  not sure whom we should contact on this. could you share 
any pointers on how we should resolve this issue? 


was (Author: yuyang08):
[~fhue...@gmail.com]  could you share any pointers on how we should resolve 
this issue? 

> flink codegen set String type for ByteBuffer fields
> ---
>
> Key: FLINK-12389
> URL: https://issues.apache.org/jira/browse/FLINK-12389
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.8.0
>Reporter: Yu Yang
>Priority: Major
>
> We try to write a simple flink sql program using "select  .. from " 
> statement, and encounter  a compile exception. 
> *Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"*
> Further debugging shows that the following flink-generated code snippet 
> caused problem: 
> {code}
>   final java.lang.reflect.Field 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds =
>   org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField(
> com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds");
> ...
> boolean isNull$5 = (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == 
> null;
> java.lang.String result$4;
> if (isNull$5) {
>   result$4 = "";
> }
> else {
>   result$4 = (java.lang.String) (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1);
> }
>
> {code}
> The following is the stack track:
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 17 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049)
>   at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)
>   at org.codehaus.janino.Java$Cast.accept(Java.java:4887)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790)
>   at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732)
>   at org.codehaus.janino.Java$Assignment.accept(Java.java:4466)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>   at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionS

[jira] [Created] (FLINK-12389) flink codegen set String type for ByteBuffer fields

2019-05-01 Thread Yu Yang (JIRA)
Yu Yang created FLINK-12389:
---

 Summary: flink codegen set String type for ByteBuffer fields
 Key: FLINK-12389
 URL: https://issues.apache.org/jira/browse/FLINK-12389
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.8.0
Reporter: Yu Yang


We try to write a simple flink sql program using "select  .. from " statement, 
and encounter  a compile exception. 

*Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"*

Further debugging shows that the following flink-generated code snippet caused 
problem: 

{code}
  final java.lang.reflect.Field 
field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds =
  org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField(
com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds");
...

boolean isNull$5 = (java.nio.ByteBuffer) 
field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == null;
java.lang.String result$4;
if (isNull$5) {
  result$4 = "";
}
else {
  result$4 = (java.lang.String) (java.nio.ByteBuffer) 
field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1);
}
   
{code}

The following is the stack track:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 17 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 38: 
Cannot cast "java.nio.ByteBuffer" to "java.lang.String"
at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049)
at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)
at org.codehaus.janino.Java$Cast.accept(Java.java:4887)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790)
at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215)
at 
org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752)
at 
org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732)
at org.codehaus.janino.Java$Assignment.accept(Java.java:4466)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
at 
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
at 
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at 
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
at org.codehaus.janino.Java$Block.accept(

[jira] [Commented] (FLINK-12342) Yarn Resource Manager Acquires Too Many Containers

2019-05-01 Thread Zhenqiu Huang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831427#comment-16831427
 ] 

Zhenqiu Huang commented on FLINK-12342:
---

As using the config and set it to 3000 milliseconds, the job with 256 
containers can be successfully launched with only 1000+ total requested 
containers. The number can be further reduced by using larger number, such as 
5000 or even higher. So, for small jobs with 32 containers, user should just 
default value for sending out request as soon as possible. For large jobs, user 
need to tune the parameter to trade-off the fast request and negative impact of 
repetitively as more containers.

> Yarn Resource Manager Acquires Too Many Containers
> --
>
> Key: FLINK-12342
> URL: https://issues.apache.org/jira/browse/FLINK-12342
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
> Environment: We runs job in Flink release 1.6.3. 
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
> Attachments: Screen Shot 2019-04-29 at 12.06.23 AM.png, 
> container.log, flink-1.4.png, flink-1.6.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In currently implementation of YarnFlinkResourceManager, it starts to acquire 
> new container one by one when get request from SlotManager. The mechanism 
> works when job is still, say less than 32 containers. If the job has 256 
> container, containers can't be immediately allocated and appending requests 
> in AMRMClient will be not removed accordingly. We observe the situation that 
> AMRMClient ask for current pending request + 1 (the new request from slot 
> manager) containers. In this way, during the start time of such job, it asked 
> for 4000+ containers. If there is an external dependency issue happens, for 
> example hdfs access is slow. Then, the whole job will be blocked without 
> getting enough resource and finally killed with SlotManager request timeout.
> Thus, we should use the total number of container asked rather than pending 
> request in AMRMClient as threshold to make decision whether we need to add 
> one more resource request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] HuangZhenQiu commented on issue #8306: [FLINK-12342] Add fast-heartbeat-delay yarn config for jobs with large number of containers

2019-05-01 Thread GitBox
HuangZhenQiu commented on issue #8306: [FLINK-12342] Add fast-heartbeat-delay 
yarn config for  jobs with large number of containers
URL: https://github.com/apache/flink/pull/8306#issuecomment-488563188
 
 
   @tillrohrmann @rmetzger 
   As using the config and set it to 3000 milliseconds, the job with 256 
containers can be successfully launched with only 1000+ total requested 
containers. The number can be further reduced by using larger number, such as 
5000 or even higher. So, for small jobs with 32 containers, user should just 
default value for sending out request as soon as possible. For large jobs, user 
need to tune the parameter to trade-off the fast request and negative impact of 
repetitively as more containers. 
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on issue #8329: [FLINK-12239][hive] Support table related 
operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#issuecomment-488562799
 
 
   Thanks @xuefuz for reviewing! I've addressed the comments.
   
   Would also be great to have @KurtYoung @JingsongLi @zjffdu take a look


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280257423
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   hiveTable.setPartitionKeys(part

[GitHub] [flink] bowenli86 commented on issue #8325: [hotfix] [table] Refactor GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase

2019-05-01 Thread GitBox
bowenli86 commented on issue #8325: [hotfix] [table] Refactor 
GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase
URL: https://github.com/apache/flink/pull/8325#issuecomment-488559175
 
 
   This is the first time I merged a PR and I'm still trying to figure out the 
right steps/process with this [Merging 
PR](https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests) 
doc as reference.
   
   Sorry that the commit message contains the PR title this time. Seems that I 
should have just delete it. Will improve next time
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11909) Provide default failure/timeout/backoff handling strategy for AsyncIO functions

2019-05-01 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831421#comment-16831421
 ] 

Rong Rong edited comment on FLINK-11909 at 5/2/19 5:40 AM:
---

I took some time to look at the {{AsyncFunction}} API and the 
{{AsyncWaitOperator}}. My understanding is that:
when user implements an AsyncFunction, it has full control on how the 
asyncInvoke being executed (including creation of the ExecutorService used, # 
of threads). Thus user can also include any of the retry policies if needed (by 
directly create a retry mechanism)

The limitation of this is: 
1. Users does not have access to timer service from AsyncFunction 
  - On the contrary, {{AsyncWaitOperator}} have access to register / cleanup / 
invoke {{AsyncFunction#timeout}} API which makes it more flexible in terms of 
creating retry policies.

2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is 
input-element independent 
  - This does not fit the "rate limiting" requirement from the mailing list: I 
am assuming this is more like a retry queue for a specific outbound service, 
not limited to a specific input element.

3. Retry within {{AsyncFunction}} cannot allow access to 
{{StreamElementQueue}}, which means it cannot alter the ordering of the 
elements being executed 
  - This doesn't fit into the description in Shuyi's comment of a DLQ (where 
failed component are put in the back of the queue before it can be retried the 
2nd time)

I have a general idea to extend the {{RuntimeContext}} to create a 
{{RetryContext}} to solve these issues. But I want to also make sure that the 
above concerns I mentioned are valid and should be addressed by Flink. 
otherwise, I think the current AsyncFunction API is good to go for some basic 
retry policies.

Any thoughts? [~till.rohrmann] [~suez1224] 


was (Author: walterddr):
I took some time to look at the {{AsyncFunction}} API and the 
{{AsyncWaitOperator}}. My understanding is that:
when user implements an AsyncFunction, it has full control on how the 
asyncInvoke being executed (including creation of the ExecutorService used, # 
of threads). Thus user can also include any of the retry policies if needed (by 
directly create a retry mechanism)

The limitation of this is: 
1. Users does not have access to timer service from AsyncFunction - On the 
contrary, {{AsyncWaitOperator}} have access to register / cleanup / invoke 
{{AsyncFunction#timeout}} API which makes it more flexible in terms of creating 
retry policies.
2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is 
input-element independent - this does not fit the "rate limiting" requirement 
from the mailing list - I am assuming this is more like a retry queue for a 
specific outbound service, not limited to a specific input element.
3. Retry within {{AsyncFunction}} cannot allow access to 
{{StreamElementQueue}}, which means it cannot alter the ordering of the 
elements being executed. This doesn't fit into the description in Shuyi's 
comment of a DLQ (where failed component are put in the back of the queue 
before it can be retried the 2nd time)

I have a general idea to extend the {{RuntimeContext}} to create a 
{{RetryContext}} to solve these issues. But I want to also make sure that the 
above concerns I mentioned are valid and should be addressed by Flink. 
otherwise, I think the current AsyncFunction API is good to go for some basic 
retry policies.

Any thoughts? [~till.rohrmann] [~suez1224] 

> Provide default failure/timeout/backoff handling strategy for AsyncIO 
> functions
> ---
>
> Key: FLINK-11909
> URL: https://issues.apache.org/jira/browse/FLINK-11909
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently Flink AsyncIO by default fails the entire job when async function 
> invoke fails [1]. It would be nice to have some default Async IO 
> failure/timeout handling strategy, or opens up some APIs for AsyncFunction 
> timeout method to interact with the AsyncWaitOperator. For example (quote 
> [~suez1224] in [2]):
> * FAIL_OPERATOR (default & current behavior)
> * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
> * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
> Discussion also extended to introduce configuration such as: 
> * MAX_RETRY_COUNT
> * RETRY_FAILURE_POLICY
> REF:
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling
> [2] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Backoff-strategies-for-async-IO-functions-tt26580.html



--
This message was sent by Atlass

[jira] [Comment Edited] (FLINK-11909) Provide default failure/timeout/backoff handling strategy for AsyncIO functions

2019-05-01 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831421#comment-16831421
 ] 

Rong Rong edited comment on FLINK-11909 at 5/2/19 5:39 AM:
---

I took some time to look at the {{AsyncFunction}} API and the 
{{AsyncWaitOperator}}. My understanding is that:
when user implements an AsyncFunction, it has full control on how the 
asyncInvoke being executed (including creation of the ExecutorService used, # 
of threads). Thus user can also include any of the retry policies if needed (by 
directly create a retry mechanism)

The limitation of this is: 
1. Users does not have access to timer service from AsyncFunction - On the 
contrary, {{AsyncWaitOperator}} have access to register / cleanup / invoke 
{{AsyncFunction#timeout}} API which makes it more flexible in terms of creating 
retry policies.
2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is 
input-element independent - this does not fit the "rate limiting" requirement 
from the mailing list - I am assuming this is more like a retry queue for a 
specific outbound service, not limited to a specific input element.
3. Retry within {{AsyncFunction}} cannot allow access to 
{{StreamElementQueue}}, which means it cannot alter the ordering of the 
elements being executed. This doesn't fit into the description in Shuyi's 
comment of a DLQ (where failed component are put in the back of the queue 
before it can be retried the 2nd time)

I have a general idea to extend the {{RuntimeContext}} to create a 
{{RetryContext}} to solve these issues. But I want to also make sure that the 
above concerns I mentioned are valid and should be addressed by Flink. 
otherwise, I think the current AsyncFunction API is good to go for some basic 
retry policies.

Any thoughts? [~till.rohrmann] [~suez1224] 


was (Author: walterddr):
I took some time to look at the {{AsyncFunction}} API and the 
{{AsyncWaitOperator}}. My understanding is that:
when user implements an AsyncFunction, it has full control on how the 
asyncInvoke being executed (including creation of the ExecutorService used, # 
of threads). Thus user can also include any of the retry policies if needed (by 
directly create a retry mechanism)

The limitation of this is: 
1. Users does not have access to timer service from AsyncFunction. However, 
AsyncWaitOperator have access to register / cleanup / invoke 
{{AsyncFunction#timeout}} API 
2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is 
input-element independent. this does not fit the "rate limiting" requirement 
from the mailing list - I am assuming this is more like a retry queue for a 
specific outbound service, not limited to a specific input element.
3. Retry within {{AsyncFunction}} cannot allow access to 
{{StreamElementQueue}}, which means it cannot alter the ordering of the 
elements being executed. This doesn't fit into the description in Shuyi's 
comment of a DLQ (where failed component are put in the back of the queue 
before it can be retried the 2nd time)

I have a general idea to extend the {{RuntimeContext}} to create a 
{{RetryContext}} to solve these issues. But I want to also make sure that the 
above concerns I mentioned are valid and should be addressed by Flink. 
otherwise, I think the current AsyncFunction API is good to go for some basic 
retry policies.

Any thoughts? [~till.rohrmann] [~suez1224] 

> Provide default failure/timeout/backoff handling strategy for AsyncIO 
> functions
> ---
>
> Key: FLINK-11909
> URL: https://issues.apache.org/jira/browse/FLINK-11909
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently Flink AsyncIO by default fails the entire job when async function 
> invoke fails [1]. It would be nice to have some default Async IO 
> failure/timeout handling strategy, or opens up some APIs for AsyncFunction 
> timeout method to interact with the AsyncWaitOperator. For example (quote 
> [~suez1224] in [2]):
> * FAIL_OPERATOR (default & current behavior)
> * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
> * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
> Discussion also extended to introduce configuration such as: 
> * MAX_RETRY_COUNT
> * RETRY_FAILURE_POLICY
> REF:
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling
> [2] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Backoff-strategies-for-async-IO-functions-tt26580.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11909) Provide default failure/timeout/backoff handling strategy for AsyncIO functions

2019-05-01 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831421#comment-16831421
 ] 

Rong Rong commented on FLINK-11909:
---

I took some time to look at the {{AsyncFunction}} API and the 
{{AsyncWaitOperator}}. My understanding is that:
when user implements an AsyncFunction, it has full control on how the 
asyncInvoke being executed (including creation of the ExecutorService used, # 
of threads). Thus user can also include any of the retry policies if needed (by 
directly create a retry mechanism)

The limitation of this is: 
1. Users does not have access to timer service from AsyncFunction. However, 
AsyncWaitOperator have access to register / cleanup / invoke 
{{AsyncFunction#timeout}} API 
2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is 
input-element independent. this does not fit the "rate limiting" requirement 
from the mailing list - I am assuming this is more like a retry queue for a 
specific outbound service, not limited to a specific input element.
3. Retry within {{AsyncFunction}} cannot allow access to 
{{StreamElementQueue}}, which means it cannot alter the ordering of the 
elements being executed. This doesn't fit into the description in Shuyi's 
comment of a DLQ (where failed component are put in the back of the queue 
before it can be retried the 2nd time)

I have a general idea to extend the {{RuntimeContext}} to create a 
{{RetryContext}} to solve these issues. But I want to also make sure that the 
above concerns I mentioned are valid and should be addressed by Flink. 
otherwise, I think the current AsyncFunction API is good to go for some basic 
retry policies.

Any thoughts? [~till.rohrmann] [~suez1224] 

> Provide default failure/timeout/backoff handling strategy for AsyncIO 
> functions
> ---
>
> Key: FLINK-11909
> URL: https://issues.apache.org/jira/browse/FLINK-11909
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently Flink AsyncIO by default fails the entire job when async function 
> invoke fails [1]. It would be nice to have some default Async IO 
> failure/timeout handling strategy, or opens up some APIs for AsyncFunction 
> timeout method to interact with the AsyncWaitOperator. For example (quote 
> [~suez1224] in [2]):
> * FAIL_OPERATOR (default & current behavior)
> * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
> * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
> Discussion also extended to introduce configuration such as: 
> * MAX_RETRY_COUNT
> * RETRY_FAILURE_POLICY
> REF:
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling
> [2] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Backoff-strategies-for-async-IO-functions-tt26580.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 merged pull request #8325: [hotfix] [table] Refactor GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase

2019-05-01 Thread GitBox
bowenli86 merged pull request #8325: [hotfix] [table] Refactor 
GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase
URL: https://github.com/apache/flink/pull/8325
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-10758) Refactor TableEnvironment so that all registration calls delegate to CatalogManager

2019-05-01 Thread Xuefu Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang resolved FLINK-10758.
-
Resolution: Done

The actual work is captured in FLINK-11476. Thus, I close this in favor of 
FLINK-11476.

> Refactor TableEnvironment so that all registration calls delegate to 
> CatalogManager 
> 
>
> Key: FLINK-10758
> URL: https://issues.apache.org/jira/browse/FLINK-10758
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> There are many different APIs defined in {{TableEnvironment}} class that 
> register tables/views/functions. Based on the design doc, those calls need to 
> be delegated to {{CatalogManager}}. However, not all delegations are 
> straightforward. For example. table registration could mean registering 
> permanent tables, temp tables, or views. This JIRA takes care of the details. 
> Please refer to the "TableEnvironment Class" section in the design doc 
> (attached to the parent task) for more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10618) Introduce catalog for Flink tables

2019-05-01 Thread Xuefu Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang resolved FLINK-10618.
-
Resolution: Done

This JIRA seems to be a hive-level proposal, while most of the actual work is 
captured in FLINK-11275. Thus, I close this as "done". 

> Introduce catalog for Flink tables
> --
>
> Key: FLINK-10618
> URL: https://issues.apache.org/jira/browse/FLINK-10618
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Client
>Affects Versions: 1.6.1
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> This JIRA covers the 2nd aspect of Flink-Hive metastore integration.
> Besides meta objects such as tables that may come from an 
> {{ExternalCatalog}}, Flink also deals with tables/views/functions that are 
> created on the fly (in memory), or specified in a configuration file. Those 
> objects don't belong to any {{ExternalCatalog}}, yet Flink either stores them 
> in memory, which are non-persistent, or recreates them from a file, which is 
> a big pain for the user. Those objects are only known to Flink but Flink has 
> a poor management for them.
> Since they are typical objects in a database catalog, it's natural to have a 
> catalog that manages those objects. The interface will be similar to 
> {{ExternalCatalog}}, which contains meta objects that are not managed by 
> Flink. There are several possible implementations of the Flink internal 
> catalog interface: memory, file, external registry (such as confluent schema 
> registry or Hive metastore), and relational database, etc. 
> The initial functionality as well as the catalog hierarchy could be very 
> simple. The basic functionality of the catalog will be mostly create, alter, 
> and drop tables, views, functions, etc. Obviously, this can evolve over the 
> time.
> We plan to provide implementations: in-memory and in Hive metastore.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot edited a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-01 Thread GitBox
flinkbot edited a comment on issue #8324: [FLINK-11921][table] Upgrade to 
calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-488032392
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @twalthr [PMC]
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-01 Thread GitBox
walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-488551197
 
 
   @flinkbot attention @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-01 Thread GitBox
walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-488551100
 
 
   @flinkbot attention @twalthr could you kindly take a look when you have time?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr edited a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-01 Thread GitBox
walterddr edited a comment on issue #8324: [FLINK-11921][table] Upgrade to 
calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-488551100
 
 
   @flinkbot attention @twalthr 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr removed a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-01 Thread GitBox
walterddr removed a comment on issue #8324: [FLINK-11921][table] Upgrade to 
calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-488551100
 
 
   @flinkbot attention @twalthr 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280287299
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   hiveTable.setPartitionKeys(part

[GitHub] [flink] flinkbot edited a comment on issue #8330: [FLINK-12388][docs] Update the production readiness checklist

2019-05-01 Thread GitBox
flinkbot edited a comment on issue #8330: [FLINK-12388][docs] Update the 
production readiness checklist
URL: https://github.com/apache/flink/pull/8330#issuecomment-488536697
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @fhueske [PMC]
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman commented on issue #8330: [FLINK-12388][docs] Update the production readiness checklist

2019-05-01 Thread GitBox
sjwiesman commented on issue #8330: [FLINK-12388][docs] Update the production 
readiness checklist
URL: https://github.com/apache/flink/pull/8330#issuecomment-488536823
 
 
   @flinkbot attention @fhueske 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8330: [FLINK-12388][docs] Update the production readiness checklist

2019-05-01 Thread GitBox
flinkbot commented on issue #8330: [FLINK-12388][docs] Update the production 
readiness checklist
URL: https://github.com/apache/flink/pull/8330#issuecomment-488536697
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12388) Update the production readiness checklist

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12388:
---
Labels: pull-request-available  (was: )

> Update the production readiness checklist 
> --
>
> Key: FLINK-12388
> URL: https://issues.apache.org/jira/browse/FLINK-12388
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> The production readiness checklist has grown organically over the years, and 
> while it provides valuable information, the content does not flow cohesively 
> as it has been worked on by a number of users. 
> We should improve the overall structure and readability of the checklist and 
> also update any outdated information. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sjwiesman opened a new pull request #8330: [FLINK-12388][docs] Update the production readiness checklist

2019-05-01 Thread GitBox
sjwiesman opened a new pull request #8330: [FLINK-12388][docs] Update the 
production readiness checklist
URL: https://github.com/apache/flink/pull/8330
 
 
   ## What is the purpose of the change
   
   
   
   The production readiness checklist has grown organically over the years, and 
while it provides valuable information, the content does not flow cohesively as 
it has been worked on by a number of users.
   
   We should improve the overall structure and readability of the checklist and 
also update any outdated information.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12388) Update the production readiness checklist

2019-05-01 Thread Seth Wiesman (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seth Wiesman updated FLINK-12388:
-
Summary: Update the production readiness checklist   (was: Update the 
production readyness checklist )

> Update the production readiness checklist 
> --
>
> Key: FLINK-12388
> URL: https://issues.apache.org/jira/browse/FLINK-12388
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>
> The production readiness checklist has grown organically over the years, and 
> while it provides valuable information, the content does not flow cohesively 
> as it has been worked on by a number of users. 
> We should improve the overall structure and readability of the checklist and 
> also update any outdated information. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12388) Update the production readyness checklist

2019-05-01 Thread Seth Wiesman (JIRA)
Seth Wiesman created FLINK-12388:


 Summary: Update the production readyness checklist 
 Key: FLINK-12388
 URL: https://issues.apache.org/jira/browse/FLINK-12388
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Seth Wiesman
Assignee: Seth Wiesman


The production readiness checklist has grown organically over the years, and 
while it provides valuable information, the content does not flow cohesively as 
it has been worked on by a number of users. 

We should improve the overall structure and readability of the checklist and 
also update any outdated information. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12387) Salesforce

2019-05-01 Thread Mukesh Jha (JIRA)
Mukesh Jha created FLINK-12387:
--

 Summary: Salesforce
 Key: FLINK-12387
 URL: https://issues.apache.org/jira/browse/FLINK-12387
 Project: Flink
  Issue Type: Test
Reporter: Mukesh Jha






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280238806
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -229,31 +231,91 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   client.dropTable(tablePath.getDatabaseName(), 
tablePath.getObjectName(), true, ignoreIfNotExists);
 
 Review comment:
   sure


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280257423
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   hiveTable.setPartitionKeys(part

[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280257096
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   hiveTable.setPartitionKeys(part

[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280256225
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -41,28 +41,28 @@
private final List partitionKeys;
// Properties of the table
private final Map properties;
-   // Comment of the table
-   private String comment = "This is a generic catalog table.";
+   // Description of the table
+   private String description = "This is a generic catalog table.";
 
 Review comment:
   I agree. To add on, I also doubt the necessity to have a predefined 
description and a predefined detailed description.
   
   Nevertheless, I think we'd better leave them to a different PR. I'll revert 
this part to make this PR focus on what it intends to accomplish


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11935) Remove DateTimeUtils pull-in and fix datetime casting problem

2019-05-01 Thread Julian Hyde (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831327#comment-16831327
 ] 

Julian Hyde commented on FLINK-11935:
-

If the language you are implementing is SQL, then you don't have a choice, and 
you shouldn't give the user a choice. You need to follow the standard.

There are already functions to map SQL timestamps from one timezone to another, 
so it would be possible to add functions to map SQL timestamps from one 
calendar to another.

> Remove DateTimeUtils pull-in and fix datetime casting problem
> -
>
> Key: FLINK-11935
> URL: https://issues.apache.org/jira/browse/FLINK-11935
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Rong Rong
>Assignee: vinoyang
>Priority: Major
>
> This {{DateTimeUtils}} was pulled in in FLINK-7235.
> Originally the time operation was not correctly done via the {{ymdToJulian}} 
> function before the date {{1970-01-01}} thus we need the fix. similar to 
> addressing this problem:
> {code:java}
>  Optimized :1017-12-05 22:58:58.998 
>  Expected :1017-11-29 22:58:58.998
>  Actual :1017-12-05 22:58:58.998
> {code}
>  
> However, after pulling in avatica 1.13, I found out that the optimized plans 
> of the time operations are actually correct. it is in fact the casting part 
> that creates problem:
> For example, the following:
> *{{(plus(-12000.months, cast('2017-11-29 22:58:58.998', TIMESTAMP))}}*
> result in a StringTestExpression of:
> *{{CAST(1017-11-29 22:58:58.998):VARCHAR(65536) CHARACTER SET "UTF-16LE" 
> COLLATE "ISO-8859-1$en_US$primary" NOT NULL}}*
> but the testing results are:
> {code:java}
>  Optimized :1017-11-29 22:58:58.998
>  Expected :1017-11-29 22:58:58.998
>  Actual :1017-11-23 22:58:58.998
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280248865
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
 ##
 @@ -102,11 +102,10 @@ void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean ignoreIfNot
 *  if set to false, throw an exception,
 *  if set to true, do nothing.
 * @throws TableNotExistException if the table does not exist
-* @throws DatabaseNotExistException if the database in tablePath to 
doesn't exist
 * @throws CatalogException in case of any runtime exception
 */
void renameTable(ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
-   throws TableNotExistException, TableAlreadyExistException, 
DatabaseNotExistException, CatalogException;
+   throws TableNotExistException, TableAlreadyExistException, 
CatalogException;
 
 Review comment:
   Good cleanup!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280248483
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -41,28 +41,28 @@
private final List partitionKeys;
// Properties of the table
private final Map properties;
-   // Comment of the table
-   private String comment = "This is a generic catalog table.";
+   // Description of the table
+   private String description = "This is a generic catalog table.";
 
 Review comment:
   since you're changing this part, I'm wondering if it makes sense to move the 
comment to properties, as that's what Hive does. Doing this might simply things.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280245883
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   hiveTable.setPartitionKeys(partiti

[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280245087
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   hiveTable.setPartitionKeys(partiti

[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280244614
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   hiveTable.setPartitionKeys(partiti

[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280242198
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -262,13 +324,33 @@ public void alterTable(ObjectPath tableName, 
CatalogBaseTable newTable, boolean
}
 
@Override
-   public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
+   Table hiveTable = getHiveTable(tablePath);
+
+   return 
GenericHiveMetastoreCatalogUtil.createCatalogTable(hiveTable);
+   }
+
+   protected Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
+   try {
+   return client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+   } catch (NoSuchObjectException e) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get Hive table %s", 
tablePath.getFullName()), e);
 
 Review comment:
   => Failed to get table from Hive?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280239908
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -229,31 +231,91 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   client.dropTable(tablePath.getDatabaseName(), 
tablePath.getObjectName(), true, ignoreIfNotExists);
+   } catch (NoSuchObjectException e) {
+   if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+   }
}
 
@Override
public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to alter table %s", 
tablePath.getFullName()), e);
 
 Review comment:
   => Failed to rename table?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280238806
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -229,31 +231,91 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   client.dropTable(tablePath.getDatabaseName(), 
tablePath.getObjectName(), true, ignoreIfNotExists);
 
 Review comment:
   sure


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280236484
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -229,31 +231,91 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   client.dropTable(tablePath.getDatabaseName(), 
tablePath.getObjectName(), true, ignoreIfNotExists);
 
 Review comment:
   Can we document the meaning of "true" and add a TODO for future work?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
flinkbot commented on issue #8329: [FLINK-12239][hive] Support table related 
operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#issuecomment-488438750
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12239) Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12239:
---
Labels: pull-request-available  (was: )

> Support table related operations in GenericHiveMetastoreCatalog
> ---
>
> Key: FLINK-12239
> URL: https://issues.apache.org/jira/browse/FLINK-12239
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Support table related operations in GenericHiveMetastoreCatalog, which 
> implements ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 opened a new pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-01 Thread GitBox
bowenli86 opened a new pull request #8329: [FLINK-12239][hive] Support table 
related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329
 
 
   ## What is the purpose of the change
   
   This PR enables GenericHiveMetastoreCatalog to operate Flink tables by using 
Hive metastore as a storage. Flink tables will be stored as Hive tables in 
metastore, and GenericHiveMetastoreCatalog can convert between Flink and Hive 
tables upon read and write.
   
   ## Brief change log
   
 - implemented Table APIs in GenericHiveMetastoreCatalog
 - reused unit tests in GenericInMemoryCatalog, and moved them to 
CatalogTestBase
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - reused unit tests in GenericInMemoryCatalog, and moved them to 
CatalogTestBase
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   
   This PR depends on https://github.com/apache/flink/pull/8325


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12386) Support complete mapping between Flink and Hive data types

2019-05-01 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12386:


 Summary: Support complete mapping between Flink and Hive data types
 Key: FLINK-12386
 URL: https://issues.apache.org/jira/browse/FLINK-12386
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0


Flink right now doesn't have a 1:1 data type mapping for Hive's CHAR, VARCHAR, 
DECIMAL, MAP, STRUCT. Needs to add the mapping once FLIP-37 [rework Flink type 
system] is finished



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-05-01 Thread GitBox
Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source 
connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#issuecomment-488354670
 
 
   I've spoken with @haf through email and it seems the low acknowledge rate is 
related to the checkpoint frequency. The actual throughput of messages is fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs

2019-05-01 Thread GitBox
xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog 
APIs
URL: https://github.com/apache/flink/pull/8314#issuecomment-488351982
 
 
   Thanks for the review, @bowenli86 ! I pushed another commit addressing above 
review comments. Plus, I will create a followup JIRA to add more test cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add 
stats related catalog APIs
URL: https://github.com/apache/flink/pull/8314#discussion_r280134188
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java
 ##
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Column statistics a table or partition.
 
 Review comment:
   nit: "Column statistics `of` a table or partition."?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add 
stats related catalog APIs
URL: https://github.com/apache/flink/pull/8314#discussion_r280134823
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Statistics for a non-partitioned table or a partition of a partitioned 
table.
+ */
+public class CatalogTableStatistics {
+   public static final CatalogTableStatistics UNKNOWN = new 
CatalogTableStatistics(0, 0, 0, 0);
+
+   /**
+* The number of rows in the table or partition.
+*/
+   private final long rowCount;
+
+   /**
+* The number of files on disk.
+*/
+   private final int fileCount;
+
+   /**
+* The total size in bytes.
+*/
+   private final long totalSize;
+
+   /**
+* The raw data size (size when loaded in memory) in bytes.
+*/
+   private final long rawDataSize;
+
+   private Map properties;
+
+   public CatalogTableStatistics(long rowCount, int fileCount, long 
totalSize, long rawDataSize) {
+   this(rowCount, fileCount, totalSize, rawDataSize, new 
HashMap<>());
+   }
+
+   public CatalogTableStatistics(long rowCount, int fileCount, long 
totalSize, long rawDataSize,
+   Map properties) {
+   this.rowCount = rowCount;
+   this.fileCount = fileCount;
+   this.totalSize = totalSize;
+   this.rawDataSize = rawDataSize;
+   this.properties = properties;
+   }
+
+   /**
+* The number of rows.
+*/
+   public long getRowCount() {
+   return this.rowCount;
+   }
+
+   public int getFileCount() {
+   return this.fileCount;
+   }
+
+   public long getTotalSize() {
+   return this.totalSize;
+   }
+
+   public long getRawDataSize() {
+   return this.rawDataSize;
+   }
+
+   public Map getProperties() {
+   return this.properties;
+   }
+
+   /**
+* Create a deep copy of "this" instance.
+* @return a deep copy
 
 Review comment:
   nit: please add an empty line between comment and annotation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add 
stats related catalog APIs
URL: https://github.com/apache/flink/pull/8314#discussion_r280133871
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.Map;
+
+/**
+ * Column statistics value of long type.
+ */
+public class CatalogColumnStatisticsDataLong extends 
CatalogColumnStatisticsDataBase {
+   /**
+* mim value.
+*/
+   private final long min;
+
+   /**
+* max value.
+*/
+   private final long max;
+
+   /**
+* number of distinct values.
+*/
+   private final long ndv;
+
+   public CatalogColumnStatisticsDataLong(long min, long max, long ndv, 
long nullCount) {
+   super(nullCount);
+   this.min = min;
+   this.max = max;
+   this.ndv = ndv;
+   }
+
+   public CatalogColumnStatisticsDataLong(long min, long max, long ndv, 
long nullCount, Map properties) {
+   super(nullCount, properties);
+   this.min = min;
+   this.max = max;
+   this.ndv = ndv;
+   }
+
+   public long getMin() {
+   return min;
+   }
+
+   public long getMax() {
+   return max;
+   }
+
+   public long getNdv() {
+   return ndv;
+   }
+
+   public CatalogColumnStatisticsDataLong copy() {
+   return new CatalogColumnStatisticsDataLong(min, max, ndv, 
getNullCount(), getProperties());
 
 Review comment:
   create a copy of the properties?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs

2019-05-01 Thread GitBox
bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add 
stats related catalog APIs
URL: https://github.com/apache/flink/pull/8314#discussion_r280133941
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+import java.util.Map;
+
+/**
+ * Column statistics value of boolean type.
+ */
+public class CatalogColumnStatisticsDataBoolean extends 
CatalogColumnStatisticsDataBase {
+   /**
+* number of "true" values.
+*/
+   private final long trueCount;
+
+   /**
+* number of "false" values.
+*/
+   private final long falseCount;
+
+   public CatalogColumnStatisticsDataBoolean(long trueCount, long 
falseCount, long nullCount) {
+   super(nullCount);
+   this.trueCount = trueCount;
+   this.falseCount = falseCount;
+   }
+
+   public CatalogColumnStatisticsDataBoolean(long trueCount, long 
falseCount, long nullCount, Map properties) {
+   super(nullCount, properties);
+   this.trueCount = trueCount;
+   this.falseCount = falseCount;
+   }
+
+   public Long getTrueCount() {
+   return trueCount;
+   }
+
+   public Long getFalseCount() {
+   return falseCount;
+   }
+
+   public CatalogColumnStatisticsDataBoolean copy() {
+   return new CatalogColumnStatisticsDataBoolean(trueCount, 
falseCount, getNullCount(), getProperties());
 
 Review comment:
   create a copy of the properties?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on issue #8328: [FLINK-12380] Add thread name in the log4j.properties

2019-05-01 Thread GitBox
Myasuka commented on issue #8328: [FLINK-12380] Add thread name in the 
log4j.properties
URL: https://github.com/apache/flink/pull/8328#issuecomment-488324862
 
 
   @link3280 This PR is inspired by FLINK-12368 when users want to add sub-task 
index information in the source code. Actually, we could add thread name, which 
already contains sub-task index information, in the logs to avoid have to 
change the source code. And `logback` in Flink already configured `thread name` 
in the pattern (you could refer to the [doc of 
logginng](https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html#configuring-logback)
 to confirm).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12385) RestClusterClient can hang indefinitely during job submission

2019-05-01 Thread Matt Dailey (JIRA)
Matt Dailey created FLINK-12385:
---

 Summary: RestClusterClient can hang indefinitely during job 
submission
 Key: FLINK-12385
 URL: https://issues.apache.org/jira/browse/FLINK-12385
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.8.0
Reporter: Matt Dailey


We have had situations where clients would hang indefinitely during job 
submission, even when job submission would succeed. We have not yet 
characterized what happened on the server to cause this, but we thought that 
the client should have a timeout for these requests.

This was observed in Flink 1.5.5, but the code seems to still have this problem 
in 1.8.0. One option is to include a timeout in calls to 
{{CompletableFuture.get()}}:
 * [RestClusterClient in 
1.5.5|https://github.com/apache/flink/blob/release-1.5.5/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L246]
 * [RestClusterClient in 
1.8.0|https://github.com/apache/flink/blob/release-1.8.0/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L247]

Thread dump from client running Flink 1.5.5, running in Java 8:
{noformat}
http-nio-0.0.0.0-8443-exec-6" #34 daemon prio=5 os_prio=0 
tid=0x55b421fd2000 nid=0x29 waiting on condition [0x7f932e176000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xb331d7c0> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:246)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410)
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12384) Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable"

2019-05-01 Thread Henrik (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henrik updated FLINK-12384:
---
Description: 
{code:java}
[tm] 2019-05-01 13:30:53,316 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating 
client connection, connectString=analytics-zetcd:2181 sessionTimeout=6 
watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f
[tm] 2019-05-01 13:30:53,384 WARN  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
configuration failed: javax.security.auth.login.LoginException: No JAAS 
configuration section named 'Client' was found in specified JAAS configuration 
file: '/tmp/jaas-3674237213070587877.conf'. Will continue connection to 
Zookeeper server without SASL authentication, if Zookeeper server allows it.
[tm] 2019-05-01 13:30:53,395 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
socket connection to server 
analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181
[tm] 2019-05-01 13:30:53,395 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Using 
configured hostname/address for TaskManager: 10.1.2.173.
[tm] 2019-05-01 13:30:53,401 ERROR 
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
Authentication failed
[tm] 2019-05-01 13:30:53,418 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start 
actor system at 10.1.2.173:0
[tm] 2019-05-01 13:30:53,420 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket 
connection established to 
analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating session
[tm] 2019-05-01 13:30:53,500 WARN  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket  - 
Connected to an old server; r-o mode will be unavailable
[tm] 2019-05-01 13:30:53,500 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
establishment complete on server 
analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = 
0xbf06a739001d446, negotiated timeout = 6
[tm] 2019-05-01 13:30:53,525 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
  - State change: CONNECTED{code}
Repro:

Start an etcd-cluster, with e.g. etcd-operator, with three members. Start zetcd 
in front. Configure the sesssion cluster to go against zetcd.

Ensure the job can start successfully.

Now, kill the etcd pods one by one, letting the quorum re-establish in between, 
so that the cluster is still OK.

Now restart the job/tm pods. You'll end up in this no-mans-land.

 

---

Workaround: clean out the etcd cluster and remove all its data, however, this 
resets all time windows and state, despite having that saved in GCS, so it's a 
crappy workaround.

  was:
{code:java}
[tm] 2019-05-01 13:30:53,316 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating 
client connection, connectString=analytics-zetcd:2181 sessionTimeout=6 
watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f
[tm] 2019-05-01 13:30:53,384 WARN  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
configuration failed: javax.security.auth.login.LoginException: No JAAS 
configuration section named 'Client' was found in specified JAAS configuration 
file: '/tmp/jaas-3674237213070587877.conf'. Will continue connection to 
Zookeeper server without SASL authentication, if Zookeeper server allows it.
[tm] 2019-05-01 13:30:53,395 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
socket connection to server 
analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181
[tm] 2019-05-01 13:30:53,395 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Using 
configured hostname/address for TaskManager: 10.1.2.173.
[tm] 2019-05-01 13:30:53,401 ERROR 
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
Authentication failed
[tm] 2019-05-01 13:30:53,418 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start 
actor system at 10.1.2.173:0
[tm] 2019-05-01 13:30:53,420 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket 
connection established to 
analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating session
[tm] 2019-05-01 13:30:53,500 WARN  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket  - 
Connected to an old server; r-o mode will be unavailable
[tm] 2019-05-01 13:30:53,500 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
establishment complete on server 
analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = 
0xbf06a739001d446, negotiated timeout = 6
[tm] 2019-05-01 13:30:53,525 INFO  
org.apache.flink.shaded.curato

[jira] [Created] (FLINK-12384) Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable"

2019-05-01 Thread Henrik (JIRA)
Henrik created FLINK-12384:
--

 Summary: Rolling the etcd servers causes "Connected to an old 
server; r-o mode will be unavailable"
 Key: FLINK-12384
 URL: https://issues.apache.org/jira/browse/FLINK-12384
 Project: Flink
  Issue Type: Bug
Reporter: Henrik


{code:java}
[tm] 2019-05-01 13:30:53,316 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating 
client connection, connectString=analytics-zetcd:2181 sessionTimeout=6 
watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f
[tm] 2019-05-01 13:30:53,384 WARN  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
configuration failed: javax.security.auth.login.LoginException: No JAAS 
configuration section named 'Client' was found in specified JAAS configuration 
file: '/tmp/jaas-3674237213070587877.conf'. Will continue connection to 
Zookeeper server without SASL authentication, if Zookeeper server allows it.
[tm] 2019-05-01 13:30:53,395 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
socket connection to server 
analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181
[tm] 2019-05-01 13:30:53,395 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Using 
configured hostname/address for TaskManager: 10.1.2.173.
[tm] 2019-05-01 13:30:53,401 ERROR 
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
Authentication failed
[tm] 2019-05-01 13:30:53,418 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start 
actor system at 10.1.2.173:0
[tm] 2019-05-01 13:30:53,420 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket 
connection established to 
analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating session
[tm] 2019-05-01 13:30:53,500 WARN  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket  - 
Connected to an old server; r-o mode will be unavailable
[tm] 2019-05-01 13:30:53,500 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
establishment complete on server 
analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = 
0xbf06a739001d446, negotiated timeout = 6
[tm] 2019-05-01 13:30:53,525 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
  - State change: CONNECTED{code}
Repro:

Start an etcd-cluster, with e.g. etcd-operator, with three members. Start zetcd 
in front. Configure the sesssion cluster to go against zetcd.

Ensure the job can start successfully.

Now, kill the etcd pods one by one, letting the quorum re-establish in between, 
so that the cluster is still OK.

Now restart the job/tm pods. You'll end up in this no-mans-land.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12383) "Log file environment variable 'log.file' is not set" despite web.log.path being set

2019-05-01 Thread Henrik (JIRA)
Henrik created FLINK-12383:
--

 Summary: "Log file environment variable 'log.file' is not set" 
despite web.log.path being set
 Key: FLINK-12383
 URL: https://issues.apache.org/jira/browse/FLINK-12383
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.8.0
Reporter: Henrik


You get these warnings when starting a session cluster, despite having 
configured all things logs as specified by the configuration reference on the 
[web 
site|https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#web-frontend]:
{code:java}
[job] 2019-05-01 13:25:35,418 WARN  
org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Log file 
environment variable 'log.file' is not set.
[job] 2019-05-01 13:25:35,419 INFO  
org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
location of main cluster component log file: /var/lib/log/flink/jobmanager.log
[job] 2019-05-01 13:25:35,419 INFO  
org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
location of main cluster component stdout file: 
/var/lib/log/flink/jobmanager.out
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12382) HA + ResourceManager exception: Fencing token not set

2019-05-01 Thread Henrik (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henrik updated FLINK-12382:
---
Description: 
I'm testing zetcd + session jobs in k8s, and testing what happens when I kill 
both the job-cluster and task-manager at the same time, but maintain ZK/zetcd 
up and running.

Then I get this stacktrace, that's completely non-actionable for me, and also 
resolves itself. I expect a number of retries, and if this exception is part of 
the protocol signalling to retry, then it should not be printed as a log entry.

This might be related to an older bug: 
[https://jira.apache.org/jira/browse/FLINK-7734]
{code:java}
[tm] 2019-05-01 11:32:01,641 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Registration at 
ResourceManager failed due to an error
[tm] java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, 
RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, 
HardwareDescription, Time))) sent to 
akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing 
token is null.
[tm]     at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
[tm]     at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
[tm]     at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
[tm]     at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
[tm]     at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
[tm]     at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
[tm]     at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:815)
[tm]     at akka.dispatch.OnComplete.internal(Future.scala:258)
[tm]     at akka.dispatch.OnComplete.internal(Future.scala:256)
[tm]     at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
[tm]     at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
[tm]     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
[tm]     at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
[tm]     at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
[tm]     at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
[tm]     at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
[tm]     at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:97)
[tm]     at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982)
[tm]     at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
[tm]     at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
[tm]     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[tm]     at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[tm]     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[tm]     at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[tm]     at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[tm]     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[tm]     at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[tm]     at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[tm]     at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[tm] Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
Fencing token not set: Ignoring message 
RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, 
RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, 
HardwareDescription, Time))) sent to 
akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing 
token is null.
[tm]     at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
[tm]     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
[tm]     at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
[tm]     at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
[tm]     at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
[tm]     at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
[tm]     ... 9 more
[tm] 2019-05-01 11:32:01,650 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Pausing and 
re-attempting registration in 1 ms
[tm] 2019-05-01 11:32:03,070 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor    - The heartbeat 
of JobManager with id 3642aa576f132fecd6811ae0d314c2b5 timed out.
[tm] 2019-05-01 11:32:03,070 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Close 
Jo

[jira] [Updated] (FLINK-12382) HA + ResourceManager exception: Fencing token not set

2019-05-01 Thread Henrik (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henrik updated FLINK-12382:
---
Description: 
I'm testing zetcd + session jobs in k8s, and testing what happens when I kill 
both the job-cluster and task-manager at the same time, but maintain ZK/zetcd 
up and running.

Then I get this stacktrace, that's completely non-actionable for me, and also 
resolves itself. I expect a number of retries, and if this exception is part of 
the protocol signalling to retry, then it should not be printed as a log entry.

This might be related to an older bug: 
[https://jira.apache.org/jira/browse/FLINK-7734]
{code:java}
[tm] 2019-05-01 11:32:01,641 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Registration at 
ResourceManager failed due to an error
[tm] java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, 
RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, 
HardwareDescription, Time))) sent to 
akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing 
token is null.
[tm]     at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
[tm]     at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
[tm]     at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
[tm]     at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
[tm]     at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
[tm]     at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
[tm]     at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:815)
[tm]     at akka.dispatch.OnComplete.internal(Future.scala:258)
[tm]     at akka.dispatch.OnComplete.internal(Future.scala:256)
[tm]     at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
[tm]     at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
[tm]     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
[tm]     at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
[tm]     at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
[tm]     at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
[tm]     at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
[tm]     at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:97)
[tm]     at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982)
[tm]     at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
[tm]     at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
[tm]     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[tm]     at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[tm]     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[tm]     at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[tm]     at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[tm]     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[tm]     at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[tm]     at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[tm]     at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[tm] Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
Fencing token not set: Ignoring message 
RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, 
RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, 
HardwareDescription, Time))) sent to 
akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing 
token is null.
[tm]     at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
[tm]     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
[tm]     at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
[tm]     at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
[tm]     at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
[tm]     at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
[tm]     ... 9 more
[tm] 2019-05-01 11:32:01,650 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Pausing and 
re-attempting registration in 1 ms
[tm] 2019-05-01 11:32:03,070 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor    - The heartbeat 
of JobManager with id 3642aa576f132fecd6811ae0d314c2b5 timed out.
[tm] 2019-05-01 11:32:03,070 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Close 
Jo

[jira] [Created] (FLINK-12382) HA + ResourceManager exception: Fencing token not set

2019-05-01 Thread Henrik (JIRA)
Henrik created FLINK-12382:
--

 Summary: HA + ResourceManager exception: Fencing token not set
 Key: FLINK-12382
 URL: https://issues.apache.org/jira/browse/FLINK-12382
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.8.0
 Environment: Same all all previous bugs filed by myself, today, but 
this time with HA with zetcd.
Reporter: Henrik


I'm testing zetcd + session jobs in k8s, and testing what happens when I kill 
both the job-cluster and task-manager at the same time, but maintain ZK/zetcd 
up and running.

Then I get this stacktrace, that's completely non-actionable for me, and also 
resolves itself. I expect a number of retries, and if this exception is part of 
the protocol signalling to retry, then it should not be printed as a log entry.

This might be related to an older bug: 
https://jira.apache.org/jira/browse/FLINK-7734

 

 
{code:java}
[tm] 2019-05-01 11:32:01,641 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Registration at 
ResourceManager failed due to an error
[tm] java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, 
RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, 
HardwareDescription, Time))) sent to 
akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing 
token is null.
[tm]     at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
[tm]     at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
[tm]     at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
[tm]     at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
[tm]     at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
[tm]     at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
[tm]     at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:815)
[tm]     at akka.dispatch.OnComplete.internal(Future.scala:258)
[tm]     at akka.dispatch.OnComplete.internal(Future.scala:256)
[tm]     at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
[tm]     at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
[tm]     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
[tm]     at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
[tm]     at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
[tm]     at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
[tm]     at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
[tm]     at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:97)
[tm]     at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982)
[tm]     at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
[tm]     at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
[tm]     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[tm]     at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[tm]     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[tm]     at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[tm]     at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[tm]     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[tm]     at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[tm]     at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[tm]     at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[tm] Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
Fencing token not set: Ignoring message 
RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, 
RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, 
HardwareDescription, Time))) sent to 
akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing 
token is null.
[tm]     at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
[tm]     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
[tm]     at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
[tm]     at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
[tm]     at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
[tm]     at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
[tm]     ... 9 more
[tm] 2019-05-01 11:32:01,650 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Pausing and 
re-attempting registr

[GitHub] [flink] haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-05-01 Thread GitBox
haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source 
connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#issuecomment-488239680
 
 
   The checkpoints take e.g. `12528 ms` to write, but they are not large at all:
   
   https://user-images.githubusercontent.com/193115/57011163-8c3b3100-6c00-11e9-88de-5e3f59086ab5.png";>
   
   I put the checkpoint period to:
   
   ```
   env.enableCheckpointing(9000); // checkpoint every N seconds
   env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
   ```
   
   But I've tried 1000 and 1500 respectively, as well with approximately the 
same results.
   
   The creation looks like this:
   
   ```java
   final String subscriptionName = params.get("pubsub-subscription", 
Env.getOrElse("ANALYTICS_PUBSUB_SUBSCRIPTION", "logs-analytics-dev"));
   
   String jsonFilePath =
   Optional
   .ofNullable(System.getenv("GOOGLE_APPLICATION_CREDENTIALS"))
   .orElse("./analytics-dev.json");
   
   if (! new File(jsonFilePath).exists()) {
   throw new Exception(String.format("Couldn't find path: '%s'", 
jsonFilePath));
   }
   Credentials credentials = GoogleCredentials.fromStream(new 
FileInputStream(jsonFilePath));
   logger.info("Using Credential JSON: {}, subscription name: {}", 
jsonFilePath, subscriptionName);
   
   PubSubSource source = PubSubSource.newBuilder(new 
AppEventSerializer(), projectId, 
subscriptionName).withCredentials(credentials).build();
   return env.addSource(source).uid(String.format("pubsub-%s", 
subscriptionName));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Leeviiii commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-05-01 Thread GitBox
Leev commented on a change in pull request #8300: [FLINK-11638][docs-zh] 
Translate Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#discussion_r279326911
 
 

 ##
 File path: docs/ops/state/savepoints.zh.md
 ##
 @@ -25,32 +25,24 @@ under the License.
 * toc
 {:toc}
 
-## What is a Savepoint? How is a Savepoint different from a Checkpoint?
+## 什么是 Savepoint ? Savepoint 与 Checkpoint 有什么不同?
 
-A Savepoint is a consistent image of the execution state of a streaming job, 
created via Flink's [checkpointing mechanism]({{ site.baseurl 
}}/internals/stream_checkpointing.html). You can use Savepoints to 
stop-and-resume, fork,
-or update your Flink jobs. Savepoints consist of two parts: a directory with 
(typically large) binary files on stable storage (e.g. HDFS, S3, ...) and a 
(relatively small) meta data file. The files on stable storage represent the 
net data of the job's execution state
-image. The meta data file of a Savepoint contains (primarily) pointers to all 
files on stable storage that are part of the Savepoint, in form of absolute 
paths.
+Savepoint 是依据 Flink [检查点机制]({{ site.baseurl 
}}/internals/stream_checkpointing.html)所创建的流作业执行状态的一致镜像。 您可以使用 Savepoint 
来停止并恢复,fork,或更新您的 Flink 工作。 Savepoint 由两部分组成:具有稳定存储(例如 
HDFS,S3,...)上的(通常是大的)二进制文件的目录和(相对较小的)元数据文件。 稳定存储上的文件表示作业执行状态的净数据图片。 Savepoint 
的元数据文件以(绝对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。
 
 
-Attention: In order to allow upgrades between programs and 
Flink versions, it is important to check out the following section about assigning IDs to your operators.
+注意: 为了允许程序和 Flink 版本之间的升级,请务必查看以下有关分配算子 ID 的部分 。
 
+从概念上讲,Flink 的 Savepoint 与 Checkpoint 的不同之处在于备份与传统数据库系统中的恢复日志不同。 
检查点的主要目的是提供恢复机制,以防万一
 
 Review comment:
   1. 从概念上讲,Flink 的 Savepoint 与 Checkpoint 的不同之处在于备份与传统数据库系统中的恢复日志不同 -->从概念上讲, 
Flink 的 Savepoint 与 Checkpoint 的不同之处类似于传统数据库中的备份与恢复日志之间的差异。
   2. 检查点的主要目的是提供恢复机制,以防万一出乎意料的失业 --> Checkpoint的主要目的是为意外失败的作为提供恢复机制。


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-05-01 Thread GitBox
Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source 
connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#issuecomment-488235510
 
 
   Hi @haf 
   
   The connector will acknowledge messages on every successful checkpoint. How 
often do you checkpoint? What might happen is that for some reason messages are 
not acknowledged fast enough.
   
   Could you also show how you create the PubSubSource? So what configuration 
you've used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12380) Add thread name in the log4j.properties

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12380:
---
Labels: pull-request-available  (was: )

> Add thread name in the log4j.properties
> ---
>
> Key: FLINK-12380
> URL: https://issues.apache.org/jira/browse/FLINK-12380
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
>
> This is inspired by FLINK-12368 when users want to add sub-task index 
> information in the source code. We could add thread name, which already 
> contains sub-task index information, in the logs to avoid have to change the 
> source code.
> Moreover, I found existing {{logback.xml}} in Flink already contains {{thread 
> name}} information. We should also add this in the {{log4j.properties.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8328: [FLINK-12380] Add thread name in the log4j.properties

2019-05-01 Thread GitBox
flinkbot commented on issue #8328: [FLINK-12380] Add thread name in the 
log4j.properties
URL: https://github.com/apache/flink/pull/8328#issuecomment-488234756
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka opened a new pull request #8328: [FLINK-12380] Add thread name in the log4j.properties

2019-05-01 Thread GitBox
Myasuka opened a new pull request #8328: [FLINK-12380] Add thread name in the 
log4j.properties
URL: https://github.com/apache/flink/pull/8328
 
 
   ## What is the purpose of the change
   
   This is inspired by FLINK-12368 when users want to add sub-task index 
information in the source code. We could add thread name, which already 
contains sub-task index information, in the logs to avoid have to change the 
source code.
   
   Moreover, I found existing `logback.xm`l in Flink already contains thread 
name information. We should also add this in the `log4j.properties`.
   
   
   ## Brief change log
   
   Add thread name in the log4j.properties
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on issue #8315: [FLINK-12368] add subtask index to FlinkKafkaConsumerBase logging, wh…

2019-05-01 Thread GitBox
Myasuka commented on issue #8315: [FLINK-12368] add subtask index to 
FlinkKafkaConsumerBase logging, wh…
URL: https://github.com/apache/flink/pull/8315#issuecomment-488234179
 
 
   @stevenzwu I found existing `logback.xml` in Flink already contains `thread 
name` information, and I create a jira 
[FLINK-12380](https://issues.apache.org/jira/browse/FLINK-12380) to track this.
   
   In my opinion, developers would always feel bothered if we want sub-task 
index information to help debug but have to change the source code. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8327: [FLINK-12333][docs] Add documentation for all async operations through REST API

2019-05-01 Thread GitBox
flinkbot commented on issue #8327: [FLINK-12333][docs] Add documentation for 
all async operations through REST API
URL: https://github.com/apache/flink/pull/8327#issuecomment-488232864
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12333) Add documentation for all async operations through REST

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12333:
---
Labels: pull-request-available  (was: )

> Add documentation for all async operations through REST 
> 
>
> Key: FLINK-12333
> URL: https://issues.apache.org/jira/browse/FLINK-12333
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / REST
>Affects Versions: 1.8.0
>Reporter: Vishal Santoshi
>Assignee: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
>
> Exit code 2 seen ( after 5 minutes ) when 
> {code:java}
> curl  --header "Content-Type: application/json" --request POST --data 
> '{"target-directory":"***","cancel-job":true}'    
> https://***/jobs//savepoints{code}
> It seems that when a REST call is 
> "Triggered the cancel with savepoint command from via the REST call. This 
> command is an asynchronous operation which produces a result (the savepoint 
> path). In order to deliver asynchronous results to the caller, Flink waits 
> before shutting down until they are delivered or until it times out after 5 
> minutes."
>  
> That implies that one has to execute 
> {code:java}
> curl  --request GET   
> https://**/jobs//savepoints/[Request_id]
> {code}
> on the request_id returned by the first call ( within 5 minutes ) , for a 
> clean exit ( code 0 ) 
>  
> Please add this life cycle in flink documentation , in all probability here 
> [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-savepoints]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Myasuka opened a new pull request #8327: [FLINK-12333][docs] Add documentation for all async operations through REST API

2019-05-01 Thread GitBox
Myasuka opened a new pull request #8327: [FLINK-12333][docs] Add documentation 
for all async operations through REST API
URL: https://github.com/apache/flink/pull/8327
 
 
   
   
   ## What is the purpose of the change
   
   Improve the description for all async-operations through REST API. `Trigger 
savepoint` and `rescaling` operations are all async-operations which would 
return a request-id for further query identifier. However, current REST 
documentation does not describe this clearly and users might be confused to use 
these REST APIs
   
   
   ## Brief change log
   
 - Add description for all async operation headers.
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes

2019-05-01 Thread Henrik (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henrik updated FLINK-12381:
---
Description: 
{code:java}
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
'gs://example_bucket/flink/checkpoints//chk-16/_metadata'
 already exists
    at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
    at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
    at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
    at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
    at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
    at 
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
    at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
    at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
    ... 8 more
{code}
Instead, it should either just overwrite the checkpoint or fail to start the 
job completely. Partial and undefined failure is not what should happen.

 

Repro:
 # Set up a single purpose job cluster (which could use much better docs btw!)
 # Let it run with GCS checkpointing for a while with rocksdb/gs://example
 # Kill it
 # Start it

  was:
{code:java}
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
'gs://example_bucket/flink/checkpoints//chk-16/_metadata'
 already exists
    at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
    at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
    at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
    at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
    at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
    at 
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
    at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
    at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
    ... 8 more
{code}
Instead, it should either just overwrite the checkpoint or fail to start the 
job completely. Partial and undefined failure is not what should happen.


> W/o HA, upon a full restart, checkpointing crashes
> --
>
> Key: FLINK-12381
> URL: https://issues.apache.org/jira/browse/FLINK-12381
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0
> Environment: Same as FLINK-\{12379, 12377, 12376}
>Reporter: Henrik
>Priority: Major
>
> {code:java}
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> 'gs://example_bucket/flink/checkpoints//chk-16/_metadata'
>  already exists
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCh

[jira] [Updated] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes

2019-05-01 Thread Henrik (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henrik updated FLINK-12381:
---
Summary: W/o HA, upon a full restart, checkpointing crashes  (was: Without 
failover (aka "HA") configured, full restarts' checkpointing crashes)

> W/o HA, upon a full restart, checkpointing crashes
> --
>
> Key: FLINK-12381
> URL: https://issues.apache.org/jira/browse/FLINK-12381
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.8.0
> Environment: Same as FLINK-\{12379, 12377, 12376}
>Reporter: Henrik
>Priority: Major
>
> {code:java}
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> 'gs://example_bucket/flink/checkpoints//chk-16/_metadata'
>  already exists
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
>     at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
>     ... 8 more
> {code}
> Instead, it should either just overwrite the checkpoint or fail to start the 
> job completely. Partial and undefined failure is not what should happen.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12381) Without failover (aka "HA") configured, full restarts' checkpointing crashes

2019-05-01 Thread Henrik (JIRA)
Henrik created FLINK-12381:
--

 Summary: Without failover (aka "HA") configured, full restarts' 
checkpointing crashes
 Key: FLINK-12381
 URL: https://issues.apache.org/jira/browse/FLINK-12381
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.8.0
 Environment: Same as FLINK-\{12379, 12377, 12376}
Reporter: Henrik


{code:java}
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
'gs://example_bucket/flink/checkpoints//chk-16/_metadata'
 already exists
    at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
    at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
    at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
    at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
    at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
    at 
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
    at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
    at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
    ... 8 more
{code}
Instead, it should either just overwrite the checkpoint or fail to start the 
job completely. Partial and undefined failure is not what should happen.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12380) Add thread name in the log4j.properties

2019-05-01 Thread Yun Tang (JIRA)
Yun Tang created FLINK-12380:


 Summary: Add thread name in the log4j.properties
 Key: FLINK-12380
 URL: https://issues.apache.org/jira/browse/FLINK-12380
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Yun Tang
Assignee: Yun Tang


This is inspired by FLINK-12368 when users want to add sub-task index 
information in the source code. We could add thread name, which already 
contains sub-task index information, in the logs to avoid have to change the 
source code.

Moreover, I found existing {{logback.xml}} in Flink already contains {{thread 
name}} information. We should also add this in the {{log4j.properties.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-05-01 Thread GitBox
haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source 
connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#issuecomment-488227037
 
 
   @xeli I'm getting pretty bad throughput from the source; about 20 messages 
per second with a lot of expired messages (200/s):
   
   https://user-images.githubusercontent.com/193115/56998726-b531d680-6bac-11e9-88ea-fd73387e3d95.png";>
   
   Any ideas why? I checked the "backpressure" tab in Flink's UI and can't see 
anything indicative of a problem there. I'm running outside the cloud on my own 
laptop with incremental snapshots to Google Cloud Storage.
   
   This is what it looks like after running all night:
   
   https://user-images.githubusercontent.com/193115/57008932-ceaa4100-6bf3-11e9-879b-9c3303ca61b5.png";>
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services