[jira] [Commented] (FLINK-12389) flink codegen set String type for ByteBuffer fields
[ https://issues.apache.org/jira/browse/FLINK-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831429#comment-16831429 ] Yu Yang commented on FLINK-12389: - [~fhue...@gmail.com] could you share any pointers on how we should resolve this issue? > flink codegen set String type for ByteBuffer fields > --- > > Key: FLINK-12389 > URL: https://issues.apache.org/jira/browse/FLINK-12389 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.8.0 >Reporter: Yu Yang >Priority: Major > > We try to write a simple flink sql program using "select .. from " > statement, and encounter a compile exception. > *Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column > 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"* > Further debugging shows that the following flink-generated code snippet > caused problem: > {code} > final java.lang.reflect.Field > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds = > org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField( > com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds"); > ... > boolean isNull$5 = (java.nio.ByteBuffer) > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == > null; > java.lang.String result$4; > if (isNull$5) { > result$4 = ""; > } > else { > result$4 = (java.lang.String) (java.nio.ByteBuffer) > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1); > } > > {code} > The following is the stack track: > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 17 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column > 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049) > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) > at org.codehaus.janino.Java$Cast.accept(Java.java:4887) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790) > at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732) > at org.codehaus.janino.Java$Assignment.accept(Java.java:4466) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > or
[jira] [Comment Edited] (FLINK-12389) flink codegen set String type for ByteBuffer fields
[ https://issues.apache.org/jira/browse/FLINK-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831429#comment-16831429 ] Yu Yang edited comment on FLINK-12389 at 5/2/19 6:30 AM: - [~fhue...@gmail.com] not sure whom we should contact on this. could you share any pointers on how we should resolve this issue? was (Author: yuyang08): [~fhue...@gmail.com] could you share any pointers on how we should resolve this issue? > flink codegen set String type for ByteBuffer fields > --- > > Key: FLINK-12389 > URL: https://issues.apache.org/jira/browse/FLINK-12389 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.8.0 >Reporter: Yu Yang >Priority: Major > > We try to write a simple flink sql program using "select .. from " > statement, and encounter a compile exception. > *Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column > 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"* > Further debugging shows that the following flink-generated code snippet > caused problem: > {code} > final java.lang.reflect.Field > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds = > org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField( > com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds"); > ... > boolean isNull$5 = (java.nio.ByteBuffer) > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == > null; > java.lang.String result$4; > if (isNull$5) { > result$4 = ""; > } > else { > result$4 = (java.lang.String) (java.nio.ByteBuffer) > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1); > } > > {code} > The following is the stack track: > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 17 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column > 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049) > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) > at org.codehaus.janino.Java$Cast.accept(Java.java:4887) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790) > at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732) > at org.codehaus.janino.Java$Assignment.accept(Java.java:4466) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionS
[jira] [Created] (FLINK-12389) flink codegen set String type for ByteBuffer fields
Yu Yang created FLINK-12389: --- Summary: flink codegen set String type for ByteBuffer fields Key: FLINK-12389 URL: https://issues.apache.org/jira/browse/FLINK-12389 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.8.0 Reporter: Yu Yang We try to write a simple flink sql program using "select .. from " statement, and encounter a compile exception. *Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"* Further debugging shows that the following flink-generated code snippet caused problem: {code} final java.lang.reflect.Field field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds = org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField( com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds"); ... boolean isNull$5 = (java.nio.ByteBuffer) field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == null; java.lang.String result$4; if (isNull$5) { result$4 = ""; } else { result$4 = (java.lang.String) (java.nio.ByteBuffer) field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1); } {code} The following is the stack track: Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) ... 17 more Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049) at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at org.codehaus.janino.Java$Cast.accept(Java.java:4887) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790) at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752) at org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732) at org.codehaus.janino.Java$Assignment.accept(Java.java:4466) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553) at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493) at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487) at org.codehaus.janino.Java$Block.accept(
[jira] [Commented] (FLINK-12342) Yarn Resource Manager Acquires Too Many Containers
[ https://issues.apache.org/jira/browse/FLINK-12342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831427#comment-16831427 ] Zhenqiu Huang commented on FLINK-12342: --- As using the config and set it to 3000 milliseconds, the job with 256 containers can be successfully launched with only 1000+ total requested containers. The number can be further reduced by using larger number, such as 5000 or even higher. So, for small jobs with 32 containers, user should just default value for sending out request as soon as possible. For large jobs, user need to tune the parameter to trade-off the fast request and negative impact of repetitively as more containers. > Yarn Resource Manager Acquires Too Many Containers > -- > > Key: FLINK-12342 > URL: https://issues.apache.org/jira/browse/FLINK-12342 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Environment: We runs job in Flink release 1.6.3. >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > Attachments: Screen Shot 2019-04-29 at 12.06.23 AM.png, > container.log, flink-1.4.png, flink-1.6.png > > Time Spent: 10m > Remaining Estimate: 0h > > In currently implementation of YarnFlinkResourceManager, it starts to acquire > new container one by one when get request from SlotManager. The mechanism > works when job is still, say less than 32 containers. If the job has 256 > container, containers can't be immediately allocated and appending requests > in AMRMClient will be not removed accordingly. We observe the situation that > AMRMClient ask for current pending request + 1 (the new request from slot > manager) containers. In this way, during the start time of such job, it asked > for 4000+ containers. If there is an external dependency issue happens, for > example hdfs access is slow. Then, the whole job will be blocked without > getting enough resource and finally killed with SlotManager request timeout. > Thus, we should use the total number of container asked rather than pending > request in AMRMClient as threshold to make decision whether we need to add > one more resource request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] HuangZhenQiu commented on issue #8306: [FLINK-12342] Add fast-heartbeat-delay yarn config for jobs with large number of containers
HuangZhenQiu commented on issue #8306: [FLINK-12342] Add fast-heartbeat-delay yarn config for jobs with large number of containers URL: https://github.com/apache/flink/pull/8306#issuecomment-488563188 @tillrohrmann @rmetzger As using the config and set it to 3000 milliseconds, the job with 256 containers can be successfully launched with only 1000+ total requested containers. The number can be further reduced by using larger number, such as 5000 or even higher. So, for small jobs with 32 containers, user should just default value for sending out request as soon as possible. For large jobs, user need to tune the parameter to trade-off the fast request and negative impact of repetitively as more containers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on issue #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#issuecomment-488562799 Thanks @xuefuz for reviewing! I've addressed the comments. Would also be great to have @KurtYoung @JingsongLi @zjffdu take a look This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280257423 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java ## @@ -18,32 +18,179 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; - +import java.util.stream.Collectors; /** * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. */ public class GenericHiveMetastoreCatalogUtil { + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + + // Flink tables should be stored as 'external' tables in Hive metastore + private static final Map EXTERNAL_TABLE_PROPERTY = new HashMap() {{ + put("EXTERNAL", "TRUE"); + }}; + private GenericHiveMetastoreCatalogUtil() { } // -- Utils -- /** -* Creates a Hive database from CatalogDatabase. +* Creates a Hive database from a CatalogDatabase. +* +* @param databaseName name of the database +* @param catalogDatabase the CatalogDatabase instance +* @return a Hive database */ - public static Database createHiveDatabase(String dbName, CatalogDatabase db) { - Map props = db.getProperties(); + public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) { return new Database( - dbName, - db.getDescription().isPresent() ? db.getDescription().get() : null, + databaseName, + catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null, null, - props); + catalogDatabase.getProperties()); + } + + /** +* Creates a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table the CatalogBaseTable instance +* @return a Hive table +*/ + public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + Map properties = new HashMap<>(table.getProperties()); + + // Table description + if (table.getDescription().isPresent()) { + properties.put(HiveTableConfig.TABLE_DESCRITPION, table.getDescription().get()); + } + + Table hiveTable = new Table(); + hiveTable.setDbName(tablePath.getDatabaseName()); + hiveTable.setTableName(tablePath.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); + + // Table properties + hiveTable.setParameters(buildFlinkProperties(properties)); + hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY); + + // Hive table's StorageDescriptor + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>())); + + List allColumns = createHiveColumns(table.getSchema()); + + // Table columns and partition keys + CatalogTable catalogTable = (CatalogTable) table; + + if (catalogTable.isPartitioned()) { + int partitionKeySize = catalogTable.getPartitionKeys().size(); + List regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize); + List partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size()); + + sd.setCols(regularColumns); + hiveTable.setPartitionKeys(part
[GitHub] [flink] bowenli86 commented on issue #8325: [hotfix] [table] Refactor GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase
bowenli86 commented on issue #8325: [hotfix] [table] Refactor GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase URL: https://github.com/apache/flink/pull/8325#issuecomment-488559175 This is the first time I merged a PR and I'm still trying to figure out the right steps/process with this [Merging PR](https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests) doc as reference. Sorry that the commit message contains the PR title this time. Seems that I should have just delete it. Will improve next time This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11909) Provide default failure/timeout/backoff handling strategy for AsyncIO functions
[ https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831421#comment-16831421 ] Rong Rong edited comment on FLINK-11909 at 5/2/19 5:40 AM: --- I took some time to look at the {{AsyncFunction}} API and the {{AsyncWaitOperator}}. My understanding is that: when user implements an AsyncFunction, it has full control on how the asyncInvoke being executed (including creation of the ExecutorService used, # of threads). Thus user can also include any of the retry policies if needed (by directly create a retry mechanism) The limitation of this is: 1. Users does not have access to timer service from AsyncFunction - On the contrary, {{AsyncWaitOperator}} have access to register / cleanup / invoke {{AsyncFunction#timeout}} API which makes it more flexible in terms of creating retry policies. 2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is input-element independent - This does not fit the "rate limiting" requirement from the mailing list: I am assuming this is more like a retry queue for a specific outbound service, not limited to a specific input element. 3. Retry within {{AsyncFunction}} cannot allow access to {{StreamElementQueue}}, which means it cannot alter the ordering of the elements being executed - This doesn't fit into the description in Shuyi's comment of a DLQ (where failed component are put in the back of the queue before it can be retried the 2nd time) I have a general idea to extend the {{RuntimeContext}} to create a {{RetryContext}} to solve these issues. But I want to also make sure that the above concerns I mentioned are valid and should be addressed by Flink. otherwise, I think the current AsyncFunction API is good to go for some basic retry policies. Any thoughts? [~till.rohrmann] [~suez1224] was (Author: walterddr): I took some time to look at the {{AsyncFunction}} API and the {{AsyncWaitOperator}}. My understanding is that: when user implements an AsyncFunction, it has full control on how the asyncInvoke being executed (including creation of the ExecutorService used, # of threads). Thus user can also include any of the retry policies if needed (by directly create a retry mechanism) The limitation of this is: 1. Users does not have access to timer service from AsyncFunction - On the contrary, {{AsyncWaitOperator}} have access to register / cleanup / invoke {{AsyncFunction#timeout}} API which makes it more flexible in terms of creating retry policies. 2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is input-element independent - this does not fit the "rate limiting" requirement from the mailing list - I am assuming this is more like a retry queue for a specific outbound service, not limited to a specific input element. 3. Retry within {{AsyncFunction}} cannot allow access to {{StreamElementQueue}}, which means it cannot alter the ordering of the elements being executed. This doesn't fit into the description in Shuyi's comment of a DLQ (where failed component are put in the back of the queue before it can be retried the 2nd time) I have a general idea to extend the {{RuntimeContext}} to create a {{RetryContext}} to solve these issues. But I want to also make sure that the above concerns I mentioned are valid and should be addressed by Flink. otherwise, I think the current AsyncFunction API is good to go for some basic retry policies. Any thoughts? [~till.rohrmann] [~suez1224] > Provide default failure/timeout/backoff handling strategy for AsyncIO > functions > --- > > Key: FLINK-11909 > URL: https://issues.apache.org/jira/browse/FLINK-11909 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently Flink AsyncIO by default fails the entire job when async function > invoke fails [1]. It would be nice to have some default Async IO > failure/timeout handling strategy, or opens up some APIs for AsyncFunction > timeout method to interact with the AsyncWaitOperator. For example (quote > [~suez1224] in [2]): > * FAIL_OPERATOR (default & current behavior) > * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times) > * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) > Discussion also extended to introduce configuration such as: > * MAX_RETRY_COUNT > * RETRY_FAILURE_POLICY > REF: > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling > [2] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Backoff-strategies-for-async-IO-functions-tt26580.html -- This message was sent by Atlass
[jira] [Comment Edited] (FLINK-11909) Provide default failure/timeout/backoff handling strategy for AsyncIO functions
[ https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831421#comment-16831421 ] Rong Rong edited comment on FLINK-11909 at 5/2/19 5:39 AM: --- I took some time to look at the {{AsyncFunction}} API and the {{AsyncWaitOperator}}. My understanding is that: when user implements an AsyncFunction, it has full control on how the asyncInvoke being executed (including creation of the ExecutorService used, # of threads). Thus user can also include any of the retry policies if needed (by directly create a retry mechanism) The limitation of this is: 1. Users does not have access to timer service from AsyncFunction - On the contrary, {{AsyncWaitOperator}} have access to register / cleanup / invoke {{AsyncFunction#timeout}} API which makes it more flexible in terms of creating retry policies. 2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is input-element independent - this does not fit the "rate limiting" requirement from the mailing list - I am assuming this is more like a retry queue for a specific outbound service, not limited to a specific input element. 3. Retry within {{AsyncFunction}} cannot allow access to {{StreamElementQueue}}, which means it cannot alter the ordering of the elements being executed. This doesn't fit into the description in Shuyi's comment of a DLQ (where failed component are put in the back of the queue before it can be retried the 2nd time) I have a general idea to extend the {{RuntimeContext}} to create a {{RetryContext}} to solve these issues. But I want to also make sure that the above concerns I mentioned are valid and should be addressed by Flink. otherwise, I think the current AsyncFunction API is good to go for some basic retry policies. Any thoughts? [~till.rohrmann] [~suez1224] was (Author: walterddr): I took some time to look at the {{AsyncFunction}} API and the {{AsyncWaitOperator}}. My understanding is that: when user implements an AsyncFunction, it has full control on how the asyncInvoke being executed (including creation of the ExecutorService used, # of threads). Thus user can also include any of the retry policies if needed (by directly create a retry mechanism) The limitation of this is: 1. Users does not have access to timer service from AsyncFunction. However, AsyncWaitOperator have access to register / cleanup / invoke {{AsyncFunction#timeout}} API 2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is input-element independent. this does not fit the "rate limiting" requirement from the mailing list - I am assuming this is more like a retry queue for a specific outbound service, not limited to a specific input element. 3. Retry within {{AsyncFunction}} cannot allow access to {{StreamElementQueue}}, which means it cannot alter the ordering of the elements being executed. This doesn't fit into the description in Shuyi's comment of a DLQ (where failed component are put in the back of the queue before it can be retried the 2nd time) I have a general idea to extend the {{RuntimeContext}} to create a {{RetryContext}} to solve these issues. But I want to also make sure that the above concerns I mentioned are valid and should be addressed by Flink. otherwise, I think the current AsyncFunction API is good to go for some basic retry policies. Any thoughts? [~till.rohrmann] [~suez1224] > Provide default failure/timeout/backoff handling strategy for AsyncIO > functions > --- > > Key: FLINK-11909 > URL: https://issues.apache.org/jira/browse/FLINK-11909 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently Flink AsyncIO by default fails the entire job when async function > invoke fails [1]. It would be nice to have some default Async IO > failure/timeout handling strategy, or opens up some APIs for AsyncFunction > timeout method to interact with the AsyncWaitOperator. For example (quote > [~suez1224] in [2]): > * FAIL_OPERATOR (default & current behavior) > * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times) > * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) > Discussion also extended to introduce configuration such as: > * MAX_RETRY_COUNT > * RETRY_FAILURE_POLICY > REF: > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling > [2] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Backoff-strategies-for-async-IO-functions-tt26580.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11909) Provide default failure/timeout/backoff handling strategy for AsyncIO functions
[ https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831421#comment-16831421 ] Rong Rong commented on FLINK-11909: --- I took some time to look at the {{AsyncFunction}} API and the {{AsyncWaitOperator}}. My understanding is that: when user implements an AsyncFunction, it has full control on how the asyncInvoke being executed (including creation of the ExecutorService used, # of threads). Thus user can also include any of the retry policies if needed (by directly create a retry mechanism) The limitation of this is: 1. Users does not have access to timer service from AsyncFunction. However, AsyncWaitOperator have access to register / cleanup / invoke {{AsyncFunction#timeout}} API 2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is input-element independent. this does not fit the "rate limiting" requirement from the mailing list - I am assuming this is more like a retry queue for a specific outbound service, not limited to a specific input element. 3. Retry within {{AsyncFunction}} cannot allow access to {{StreamElementQueue}}, which means it cannot alter the ordering of the elements being executed. This doesn't fit into the description in Shuyi's comment of a DLQ (where failed component are put in the back of the queue before it can be retried the 2nd time) I have a general idea to extend the {{RuntimeContext}} to create a {{RetryContext}} to solve these issues. But I want to also make sure that the above concerns I mentioned are valid and should be addressed by Flink. otherwise, I think the current AsyncFunction API is good to go for some basic retry policies. Any thoughts? [~till.rohrmann] [~suez1224] > Provide default failure/timeout/backoff handling strategy for AsyncIO > functions > --- > > Key: FLINK-11909 > URL: https://issues.apache.org/jira/browse/FLINK-11909 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently Flink AsyncIO by default fails the entire job when async function > invoke fails [1]. It would be nice to have some default Async IO > failure/timeout handling strategy, or opens up some APIs for AsyncFunction > timeout method to interact with the AsyncWaitOperator. For example (quote > [~suez1224] in [2]): > * FAIL_OPERATOR (default & current behavior) > * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times) > * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) > Discussion also extended to introduce configuration such as: > * MAX_RETRY_COUNT > * RETRY_FAILURE_POLICY > REF: > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling > [2] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Backoff-strategies-for-async-IO-functions-tt26580.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 merged pull request #8325: [hotfix] [table] Refactor GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase
bowenli86 merged pull request #8325: [hotfix] [table] Refactor GenericInMemoryCatalogTest to prepare for moving common tests to CatalogTestBase URL: https://github.com/apache/flink/pull/8325 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-10758) Refactor TableEnvironment so that all registration calls delegate to CatalogManager
[ https://issues.apache.org/jira/browse/FLINK-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang resolved FLINK-10758. - Resolution: Done The actual work is captured in FLINK-11476. Thus, I close this in favor of FLINK-11476. > Refactor TableEnvironment so that all registration calls delegate to > CatalogManager > > > Key: FLINK-10758 > URL: https://issues.apache.org/jira/browse/FLINK-10758 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > > There are many different APIs defined in {{TableEnvironment}} class that > register tables/views/functions. Based on the design doc, those calls need to > be delegated to {{CatalogManager}}. However, not all delegations are > straightforward. For example. table registration could mean registering > permanent tables, temp tables, or views. This JIRA takes care of the details. > Please refer to the "TableEnvironment Class" section in the design doc > (attached to the parent task) for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10618) Introduce catalog for Flink tables
[ https://issues.apache.org/jira/browse/FLINK-10618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang resolved FLINK-10618. - Resolution: Done This JIRA seems to be a hive-level proposal, while most of the actual work is captured in FLINK-11275. Thus, I close this as "done". > Introduce catalog for Flink tables > -- > > Key: FLINK-10618 > URL: https://issues.apache.org/jira/browse/FLINK-10618 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Client >Affects Versions: 1.6.1 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > > This JIRA covers the 2nd aspect of Flink-Hive metastore integration. > Besides meta objects such as tables that may come from an > {{ExternalCatalog}}, Flink also deals with tables/views/functions that are > created on the fly (in memory), or specified in a configuration file. Those > objects don't belong to any {{ExternalCatalog}}, yet Flink either stores them > in memory, which are non-persistent, or recreates them from a file, which is > a big pain for the user. Those objects are only known to Flink but Flink has > a poor management for them. > Since they are typical objects in a database catalog, it's natural to have a > catalog that manages those objects. The interface will be similar to > {{ExternalCatalog}}, which contains meta objects that are not managed by > Flink. There are several possible implementations of the Flink internal > catalog interface: memory, file, external registry (such as confluent schema > registry or Hive metastore), and relational database, etc. > The initial functionality as well as the catalog hierarchy could be very > simple. The basic functionality of the catalog will be mostly create, alter, > and drop tables, views, functions, etc. Obviously, this can evolve over the > time. > We plan to provide implementations: in-memory and in Hive metastore. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot edited a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
flinkbot edited a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-488032392 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @twalthr [PMC] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-488551197 @flinkbot attention @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-488551100 @flinkbot attention @twalthr could you kindly take a look when you have time? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr edited a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
walterddr edited a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-488551100 @flinkbot attention @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr removed a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
walterddr removed a comment on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-488551100 @flinkbot attention @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280287299 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java ## @@ -18,32 +18,179 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; - +import java.util.stream.Collectors; /** * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. */ public class GenericHiveMetastoreCatalogUtil { + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + + // Flink tables should be stored as 'external' tables in Hive metastore + private static final Map EXTERNAL_TABLE_PROPERTY = new HashMap() {{ + put("EXTERNAL", "TRUE"); + }}; + private GenericHiveMetastoreCatalogUtil() { } // -- Utils -- /** -* Creates a Hive database from CatalogDatabase. +* Creates a Hive database from a CatalogDatabase. +* +* @param databaseName name of the database +* @param catalogDatabase the CatalogDatabase instance +* @return a Hive database */ - public static Database createHiveDatabase(String dbName, CatalogDatabase db) { - Map props = db.getProperties(); + public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) { return new Database( - dbName, - db.getDescription().isPresent() ? db.getDescription().get() : null, + databaseName, + catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null, null, - props); + catalogDatabase.getProperties()); + } + + /** +* Creates a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table the CatalogBaseTable instance +* @return a Hive table +*/ + public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + Map properties = new HashMap<>(table.getProperties()); + + // Table description + if (table.getDescription().isPresent()) { + properties.put(HiveTableConfig.TABLE_DESCRITPION, table.getDescription().get()); + } + + Table hiveTable = new Table(); + hiveTable.setDbName(tablePath.getDatabaseName()); + hiveTable.setTableName(tablePath.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); + + // Table properties + hiveTable.setParameters(buildFlinkProperties(properties)); + hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY); + + // Hive table's StorageDescriptor + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>())); + + List allColumns = createHiveColumns(table.getSchema()); + + // Table columns and partition keys + CatalogTable catalogTable = (CatalogTable) table; + + if (catalogTable.isPartitioned()) { + int partitionKeySize = catalogTable.getPartitionKeys().size(); + List regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize); + List partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size()); + + sd.setCols(regularColumns); + hiveTable.setPartitionKeys(part
[GitHub] [flink] flinkbot edited a comment on issue #8330: [FLINK-12388][docs] Update the production readiness checklist
flinkbot edited a comment on issue #8330: [FLINK-12388][docs] Update the production readiness checklist URL: https://github.com/apache/flink/pull/8330#issuecomment-488536697 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @fhueske [PMC] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman commented on issue #8330: [FLINK-12388][docs] Update the production readiness checklist
sjwiesman commented on issue #8330: [FLINK-12388][docs] Update the production readiness checklist URL: https://github.com/apache/flink/pull/8330#issuecomment-488536823 @flinkbot attention @fhueske This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8330: [FLINK-12388][docs] Update the production readiness checklist
flinkbot commented on issue #8330: [FLINK-12388][docs] Update the production readiness checklist URL: https://github.com/apache/flink/pull/8330#issuecomment-488536697 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12388) Update the production readiness checklist
[ https://issues.apache.org/jira/browse/FLINK-12388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12388: --- Labels: pull-request-available (was: ) > Update the production readiness checklist > -- > > Key: FLINK-12388 > URL: https://issues.apache.org/jira/browse/FLINK-12388 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > > The production readiness checklist has grown organically over the years, and > while it provides valuable information, the content does not flow cohesively > as it has been worked on by a number of users. > We should improve the overall structure and readability of the checklist and > also update any outdated information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sjwiesman opened a new pull request #8330: [FLINK-12388][docs] Update the production readiness checklist
sjwiesman opened a new pull request #8330: [FLINK-12388][docs] Update the production readiness checklist URL: https://github.com/apache/flink/pull/8330 ## What is the purpose of the change The production readiness checklist has grown organically over the years, and while it provides valuable information, the content does not flow cohesively as it has been worked on by a number of users. We should improve the overall structure and readability of the checklist and also update any outdated information. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12388) Update the production readiness checklist
[ https://issues.apache.org/jira/browse/FLINK-12388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seth Wiesman updated FLINK-12388: - Summary: Update the production readiness checklist (was: Update the production readyness checklist ) > Update the production readiness checklist > -- > > Key: FLINK-12388 > URL: https://issues.apache.org/jira/browse/FLINK-12388 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > > The production readiness checklist has grown organically over the years, and > while it provides valuable information, the content does not flow cohesively > as it has been worked on by a number of users. > We should improve the overall structure and readability of the checklist and > also update any outdated information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12388) Update the production readyness checklist
Seth Wiesman created FLINK-12388: Summary: Update the production readyness checklist Key: FLINK-12388 URL: https://issues.apache.org/jira/browse/FLINK-12388 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Seth Wiesman Assignee: Seth Wiesman The production readiness checklist has grown organically over the years, and while it provides valuable information, the content does not flow cohesively as it has been worked on by a number of users. We should improve the overall structure and readability of the checklist and also update any outdated information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12387) Salesforce
Mukesh Jha created FLINK-12387: -- Summary: Salesforce Key: FLINK-12387 URL: https://issues.apache.org/jira/browse/FLINK-12387 Project: Flink Issue Type: Test Reporter: Mukesh Jha -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280238806 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -229,31 +231,91 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno @Override public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + client.dropTable(tablePath.getDatabaseName(), tablePath.getObjectName(), true, ignoreIfNotExists); Review comment: sure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280257423 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java ## @@ -18,32 +18,179 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; - +import java.util.stream.Collectors; /** * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. */ public class GenericHiveMetastoreCatalogUtil { + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + + // Flink tables should be stored as 'external' tables in Hive metastore + private static final Map EXTERNAL_TABLE_PROPERTY = new HashMap() {{ + put("EXTERNAL", "TRUE"); + }}; + private GenericHiveMetastoreCatalogUtil() { } // -- Utils -- /** -* Creates a Hive database from CatalogDatabase. +* Creates a Hive database from a CatalogDatabase. +* +* @param databaseName name of the database +* @param catalogDatabase the CatalogDatabase instance +* @return a Hive database */ - public static Database createHiveDatabase(String dbName, CatalogDatabase db) { - Map props = db.getProperties(); + public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) { return new Database( - dbName, - db.getDescription().isPresent() ? db.getDescription().get() : null, + databaseName, + catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null, null, - props); + catalogDatabase.getProperties()); + } + + /** +* Creates a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table the CatalogBaseTable instance +* @return a Hive table +*/ + public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + Map properties = new HashMap<>(table.getProperties()); + + // Table description + if (table.getDescription().isPresent()) { + properties.put(HiveTableConfig.TABLE_DESCRITPION, table.getDescription().get()); + } + + Table hiveTable = new Table(); + hiveTable.setDbName(tablePath.getDatabaseName()); + hiveTable.setTableName(tablePath.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); + + // Table properties + hiveTable.setParameters(buildFlinkProperties(properties)); + hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY); + + // Hive table's StorageDescriptor + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>())); + + List allColumns = createHiveColumns(table.getSchema()); + + // Table columns and partition keys + CatalogTable catalogTable = (CatalogTable) table; + + if (catalogTable.isPartitioned()) { + int partitionKeySize = catalogTable.getPartitionKeys().size(); + List regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize); + List partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size()); + + sd.setCols(regularColumns); + hiveTable.setPartitionKeys(part
[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280257096 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java ## @@ -18,32 +18,179 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; - +import java.util.stream.Collectors; /** * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. */ public class GenericHiveMetastoreCatalogUtil { + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + + // Flink tables should be stored as 'external' tables in Hive metastore + private static final Map EXTERNAL_TABLE_PROPERTY = new HashMap() {{ + put("EXTERNAL", "TRUE"); + }}; + private GenericHiveMetastoreCatalogUtil() { } // -- Utils -- /** -* Creates a Hive database from CatalogDatabase. +* Creates a Hive database from a CatalogDatabase. +* +* @param databaseName name of the database +* @param catalogDatabase the CatalogDatabase instance +* @return a Hive database */ - public static Database createHiveDatabase(String dbName, CatalogDatabase db) { - Map props = db.getProperties(); + public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) { return new Database( - dbName, - db.getDescription().isPresent() ? db.getDescription().get() : null, + databaseName, + catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null, null, - props); + catalogDatabase.getProperties()); + } + + /** +* Creates a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table the CatalogBaseTable instance +* @return a Hive table +*/ + public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + Map properties = new HashMap<>(table.getProperties()); + + // Table description + if (table.getDescription().isPresent()) { + properties.put(HiveTableConfig.TABLE_DESCRITPION, table.getDescription().get()); + } + + Table hiveTable = new Table(); + hiveTable.setDbName(tablePath.getDatabaseName()); + hiveTable.setTableName(tablePath.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); + + // Table properties + hiveTable.setParameters(buildFlinkProperties(properties)); + hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY); + + // Hive table's StorageDescriptor + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>())); + + List allColumns = createHiveColumns(table.getSchema()); + + // Table columns and partition keys + CatalogTable catalogTable = (CatalogTable) table; + + if (catalogTable.isPartitioned()) { + int partitionKeySize = catalogTable.getPartitionKeys().size(); + List regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize); + List partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size()); + + sd.setCols(regularColumns); + hiveTable.setPartitionKeys(part
[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280256225 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java ## @@ -41,28 +41,28 @@ private final List partitionKeys; // Properties of the table private final Map properties; - // Comment of the table - private String comment = "This is a generic catalog table."; + // Description of the table + private String description = "This is a generic catalog table."; Review comment: I agree. To add on, I also doubt the necessity to have a predefined description and a predefined detailed description. Nevertheless, I think we'd better leave them to a different PR. I'll revert this part to make this PR focus on what it intends to accomplish This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11935) Remove DateTimeUtils pull-in and fix datetime casting problem
[ https://issues.apache.org/jira/browse/FLINK-11935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831327#comment-16831327 ] Julian Hyde commented on FLINK-11935: - If the language you are implementing is SQL, then you don't have a choice, and you shouldn't give the user a choice. You need to follow the standard. There are already functions to map SQL timestamps from one timezone to another, so it would be possible to add functions to map SQL timestamps from one calendar to another. > Remove DateTimeUtils pull-in and fix datetime casting problem > - > > Key: FLINK-11935 > URL: https://issues.apache.org/jira/browse/FLINK-11935 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Rong Rong >Assignee: vinoyang >Priority: Major > > This {{DateTimeUtils}} was pulled in in FLINK-7235. > Originally the time operation was not correctly done via the {{ymdToJulian}} > function before the date {{1970-01-01}} thus we need the fix. similar to > addressing this problem: > {code:java} > Optimized :1017-12-05 22:58:58.998 > Expected :1017-11-29 22:58:58.998 > Actual :1017-12-05 22:58:58.998 > {code} > > However, after pulling in avatica 1.13, I found out that the optimized plans > of the time operations are actually correct. it is in fact the casting part > that creates problem: > For example, the following: > *{{(plus(-12000.months, cast('2017-11-29 22:58:58.998', TIMESTAMP))}}* > result in a StringTestExpression of: > *{{CAST(1017-11-29 22:58:58.998):VARCHAR(65536) CHARACTER SET "UTF-16LE" > COLLATE "ISO-8859-1$en_US$primary" NOT NULL}}* > but the testing results are: > {code:java} > Optimized :1017-11-29 22:58:58.998 > Expected :1017-11-29 22:58:58.998 > Actual :1017-11-23 22:58:58.998 > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280248865 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java ## @@ -102,11 +102,10 @@ void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNot * if set to false, throw an exception, * if set to true, do nothing. * @throws TableNotExistException if the table does not exist -* @throws DatabaseNotExistException if the database in tablePath to doesn't exist * @throws CatalogException in case of any runtime exception */ void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException, CatalogException; + throws TableNotExistException, TableAlreadyExistException, CatalogException; Review comment: Good cleanup! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280248483 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java ## @@ -41,28 +41,28 @@ private final List partitionKeys; // Properties of the table private final Map properties; - // Comment of the table - private String comment = "This is a generic catalog table."; + // Description of the table + private String description = "This is a generic catalog table."; Review comment: since you're changing this part, I'm wondering if it makes sense to move the comment to properties, as that's what Hive does. Doing this might simply things. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280245883 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java ## @@ -18,32 +18,179 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; - +import java.util.stream.Collectors; /** * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. */ public class GenericHiveMetastoreCatalogUtil { + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + + // Flink tables should be stored as 'external' tables in Hive metastore + private static final Map EXTERNAL_TABLE_PROPERTY = new HashMap() {{ + put("EXTERNAL", "TRUE"); + }}; + private GenericHiveMetastoreCatalogUtil() { } // -- Utils -- /** -* Creates a Hive database from CatalogDatabase. +* Creates a Hive database from a CatalogDatabase. +* +* @param databaseName name of the database +* @param catalogDatabase the CatalogDatabase instance +* @return a Hive database */ - public static Database createHiveDatabase(String dbName, CatalogDatabase db) { - Map props = db.getProperties(); + public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) { return new Database( - dbName, - db.getDescription().isPresent() ? db.getDescription().get() : null, + databaseName, + catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null, null, - props); + catalogDatabase.getProperties()); + } + + /** +* Creates a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table the CatalogBaseTable instance +* @return a Hive table +*/ + public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + Map properties = new HashMap<>(table.getProperties()); + + // Table description + if (table.getDescription().isPresent()) { + properties.put(HiveTableConfig.TABLE_DESCRITPION, table.getDescription().get()); + } + + Table hiveTable = new Table(); + hiveTable.setDbName(tablePath.getDatabaseName()); + hiveTable.setTableName(tablePath.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); + + // Table properties + hiveTable.setParameters(buildFlinkProperties(properties)); + hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY); + + // Hive table's StorageDescriptor + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>())); + + List allColumns = createHiveColumns(table.getSchema()); + + // Table columns and partition keys + CatalogTable catalogTable = (CatalogTable) table; + + if (catalogTable.isPartitioned()) { + int partitionKeySize = catalogTable.getPartitionKeys().size(); + List regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize); + List partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size()); + + sd.setCols(regularColumns); + hiveTable.setPartitionKeys(partiti
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280245087 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java ## @@ -18,32 +18,179 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; - +import java.util.stream.Collectors; /** * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. */ public class GenericHiveMetastoreCatalogUtil { + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + + // Flink tables should be stored as 'external' tables in Hive metastore + private static final Map EXTERNAL_TABLE_PROPERTY = new HashMap() {{ + put("EXTERNAL", "TRUE"); + }}; + private GenericHiveMetastoreCatalogUtil() { } // -- Utils -- /** -* Creates a Hive database from CatalogDatabase. +* Creates a Hive database from a CatalogDatabase. +* +* @param databaseName name of the database +* @param catalogDatabase the CatalogDatabase instance +* @return a Hive database */ - public static Database createHiveDatabase(String dbName, CatalogDatabase db) { - Map props = db.getProperties(); + public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) { return new Database( - dbName, - db.getDescription().isPresent() ? db.getDescription().get() : null, + databaseName, + catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null, null, - props); + catalogDatabase.getProperties()); + } + + /** +* Creates a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table the CatalogBaseTable instance +* @return a Hive table +*/ + public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + Map properties = new HashMap<>(table.getProperties()); + + // Table description + if (table.getDescription().isPresent()) { + properties.put(HiveTableConfig.TABLE_DESCRITPION, table.getDescription().get()); + } + + Table hiveTable = new Table(); + hiveTable.setDbName(tablePath.getDatabaseName()); + hiveTable.setTableName(tablePath.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); + + // Table properties + hiveTable.setParameters(buildFlinkProperties(properties)); + hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY); + + // Hive table's StorageDescriptor + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>())); + + List allColumns = createHiveColumns(table.getSchema()); + + // Table columns and partition keys + CatalogTable catalogTable = (CatalogTable) table; + + if (catalogTable.isPartitioned()) { + int partitionKeySize = catalogTable.getPartitionKeys().size(); + List regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize); + List partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size()); + + sd.setCols(regularColumns); + hiveTable.setPartitionKeys(partiti
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280244614 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java ## @@ -18,32 +18,179 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; - +import java.util.stream.Collectors; /** * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. */ public class GenericHiveMetastoreCatalogUtil { + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + + // Flink tables should be stored as 'external' tables in Hive metastore + private static final Map EXTERNAL_TABLE_PROPERTY = new HashMap() {{ + put("EXTERNAL", "TRUE"); + }}; + private GenericHiveMetastoreCatalogUtil() { } // -- Utils -- /** -* Creates a Hive database from CatalogDatabase. +* Creates a Hive database from a CatalogDatabase. +* +* @param databaseName name of the database +* @param catalogDatabase the CatalogDatabase instance +* @return a Hive database */ - public static Database createHiveDatabase(String dbName, CatalogDatabase db) { - Map props = db.getProperties(); + public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) { return new Database( - dbName, - db.getDescription().isPresent() ? db.getDescription().get() : null, + databaseName, + catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null, null, - props); + catalogDatabase.getProperties()); + } + + /** +* Creates a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table the CatalogBaseTable instance +* @return a Hive table +*/ + public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + Map properties = new HashMap<>(table.getProperties()); + + // Table description + if (table.getDescription().isPresent()) { + properties.put(HiveTableConfig.TABLE_DESCRITPION, table.getDescription().get()); + } + + Table hiveTable = new Table(); + hiveTable.setDbName(tablePath.getDatabaseName()); + hiveTable.setTableName(tablePath.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); + + // Table properties + hiveTable.setParameters(buildFlinkProperties(properties)); + hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY); + + // Hive table's StorageDescriptor + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>())); + + List allColumns = createHiveColumns(table.getSchema()); + + // Table columns and partition keys + CatalogTable catalogTable = (CatalogTable) table; + + if (catalogTable.isPartitioned()) { + int partitionKeySize = catalogTable.getPartitionKeys().size(); + List regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize); + List partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size()); + + sd.setCols(regularColumns); + hiveTable.setPartitionKeys(partiti
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280242198 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -262,13 +324,33 @@ public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean } @Override - public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + Table hiveTable = getHiveTable(tablePath); + + return GenericHiveMetastoreCatalogUtil.createCatalogTable(hiveTable); + } + + protected Table getHiveTable(ObjectPath tablePath) throws TableNotExistException { + try { + return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + } catch (NoSuchObjectException e) { + throw new TableNotExistException(catalogName, tablePath); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get Hive table %s", tablePath.getFullName()), e); Review comment: => Failed to get table from Hive? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280239908 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -229,31 +231,91 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno @Override public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + client.dropTable(tablePath.getDatabaseName(), tablePath.getObjectName(), true, ignoreIfNotExists); + } catch (NoSuchObjectException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to drop table %s", tablePath.getFullName()), e); + } } @Override public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to alter table %s", tablePath.getFullName()), e); Review comment: => Failed to rename table? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280238806 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -229,31 +231,91 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno @Override public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + client.dropTable(tablePath.getDatabaseName(), tablePath.getObjectName(), true, ignoreIfNotExists); Review comment: sure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280236484 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -229,31 +231,91 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno @Override public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + client.dropTable(tablePath.getDatabaseName(), tablePath.getObjectName(), true, ignoreIfNotExists); Review comment: Can we document the meaning of "true" and add a TODO for future work? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
flinkbot commented on issue #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#issuecomment-488438750 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12239) Support table related operations in GenericHiveMetastoreCatalog
[ https://issues.apache.org/jira/browse/FLINK-12239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12239: --- Labels: pull-request-available (was: ) > Support table related operations in GenericHiveMetastoreCatalog > --- > > Key: FLINK-12239 > URL: https://issues.apache.org/jira/browse/FLINK-12239 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > Support table related operations in GenericHiveMetastoreCatalog, which > implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 opened a new pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 opened a new pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329 ## What is the purpose of the change This PR enables GenericHiveMetastoreCatalog to operate Flink tables by using Hive metastore as a storage. Flink tables will be stored as Hive tables in metastore, and GenericHiveMetastoreCatalog can convert between Flink and Hive tables upon read and write. ## Brief change log - implemented Table APIs in GenericHiveMetastoreCatalog - reused unit tests in GenericInMemoryCatalog, and moved them to CatalogTestBase ## Verifying this change This change added tests and can be verified as follows: - reused unit tests in GenericInMemoryCatalog, and moved them to CatalogTestBase ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This PR depends on https://github.com/apache/flink/pull/8325 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12386) Support complete mapping between Flink and Hive data types
Bowen Li created FLINK-12386: Summary: Support complete mapping between Flink and Hive data types Key: FLINK-12386 URL: https://issues.apache.org/jira/browse/FLINK-12386 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Flink right now doesn't have a 1:1 data type mapping for Hive's CHAR, VARCHAR, DECIMAL, MAP, STRUCT. Needs to add the mapping once FLIP-37 [rework Flink type system] is finished -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-488354670 I've spoken with @haf through email and it seems the low acknowledge rate is related to the checkpoint frequency. The actual throughput of messages is fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs
xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs URL: https://github.com/apache/flink/pull/8314#issuecomment-488351982 Thanks for the review, @bowenli86 ! I pushed another commit addressing above review comments. Plus, I will create a followup JIRA to add more test cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs
bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs URL: https://github.com/apache/flink/pull/8314#discussion_r280134188 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.stats; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Column statistics a table or partition. Review comment: nit: "Column statistics `of` a table or partition."? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs
bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs URL: https://github.com/apache/flink/pull/8314#discussion_r280134823 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.stats; + +import java.util.HashMap; +import java.util.Map; + +/** + * Statistics for a non-partitioned table or a partition of a partitioned table. + */ +public class CatalogTableStatistics { + public static final CatalogTableStatistics UNKNOWN = new CatalogTableStatistics(0, 0, 0, 0); + + /** +* The number of rows in the table or partition. +*/ + private final long rowCount; + + /** +* The number of files on disk. +*/ + private final int fileCount; + + /** +* The total size in bytes. +*/ + private final long totalSize; + + /** +* The raw data size (size when loaded in memory) in bytes. +*/ + private final long rawDataSize; + + private Map properties; + + public CatalogTableStatistics(long rowCount, int fileCount, long totalSize, long rawDataSize) { + this(rowCount, fileCount, totalSize, rawDataSize, new HashMap<>()); + } + + public CatalogTableStatistics(long rowCount, int fileCount, long totalSize, long rawDataSize, + Map properties) { + this.rowCount = rowCount; + this.fileCount = fileCount; + this.totalSize = totalSize; + this.rawDataSize = rawDataSize; + this.properties = properties; + } + + /** +* The number of rows. +*/ + public long getRowCount() { + return this.rowCount; + } + + public int getFileCount() { + return this.fileCount; + } + + public long getTotalSize() { + return this.totalSize; + } + + public long getRawDataSize() { + return this.rawDataSize; + } + + public Map getProperties() { + return this.properties; + } + + /** +* Create a deep copy of "this" instance. +* @return a deep copy Review comment: nit: please add an empty line between comment and annotation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs
bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs URL: https://github.com/apache/flink/pull/8314#discussion_r280133871 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.stats; + +import java.util.Map; + +/** + * Column statistics value of long type. + */ +public class CatalogColumnStatisticsDataLong extends CatalogColumnStatisticsDataBase { + /** +* mim value. +*/ + private final long min; + + /** +* max value. +*/ + private final long max; + + /** +* number of distinct values. +*/ + private final long ndv; + + public CatalogColumnStatisticsDataLong(long min, long max, long ndv, long nullCount) { + super(nullCount); + this.min = min; + this.max = max; + this.ndv = ndv; + } + + public CatalogColumnStatisticsDataLong(long min, long max, long ndv, long nullCount, Map properties) { + super(nullCount, properties); + this.min = min; + this.max = max; + this.ndv = ndv; + } + + public long getMin() { + return min; + } + + public long getMax() { + return max; + } + + public long getNdv() { + return ndv; + } + + public CatalogColumnStatisticsDataLong copy() { + return new CatalogColumnStatisticsDataLong(min, max, ndv, getNullCount(), getProperties()); Review comment: create a copy of the properties? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs
bowenli86 commented on a change in pull request #8314: [FLINK-12365][table] Add stats related catalog APIs URL: https://github.com/apache/flink/pull/8314#discussion_r280133941 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.stats; + +import java.util.Map; + +/** + * Column statistics value of boolean type. + */ +public class CatalogColumnStatisticsDataBoolean extends CatalogColumnStatisticsDataBase { + /** +* number of "true" values. +*/ + private final long trueCount; + + /** +* number of "false" values. +*/ + private final long falseCount; + + public CatalogColumnStatisticsDataBoolean(long trueCount, long falseCount, long nullCount) { + super(nullCount); + this.trueCount = trueCount; + this.falseCount = falseCount; + } + + public CatalogColumnStatisticsDataBoolean(long trueCount, long falseCount, long nullCount, Map properties) { + super(nullCount, properties); + this.trueCount = trueCount; + this.falseCount = falseCount; + } + + public Long getTrueCount() { + return trueCount; + } + + public Long getFalseCount() { + return falseCount; + } + + public CatalogColumnStatisticsDataBoolean copy() { + return new CatalogColumnStatisticsDataBoolean(trueCount, falseCount, getNullCount(), getProperties()); Review comment: create a copy of the properties? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on issue #8328: [FLINK-12380] Add thread name in the log4j.properties
Myasuka commented on issue #8328: [FLINK-12380] Add thread name in the log4j.properties URL: https://github.com/apache/flink/pull/8328#issuecomment-488324862 @link3280 This PR is inspired by FLINK-12368 when users want to add sub-task index information in the source code. Actually, we could add thread name, which already contains sub-task index information, in the logs to avoid have to change the source code. And `logback` in Flink already configured `thread name` in the pattern (you could refer to the [doc of logginng](https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html#configuring-logback) to confirm). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12385) RestClusterClient can hang indefinitely during job submission
Matt Dailey created FLINK-12385: --- Summary: RestClusterClient can hang indefinitely during job submission Key: FLINK-12385 URL: https://issues.apache.org/jira/browse/FLINK-12385 Project: Flink Issue Type: Bug Components: Runtime / REST Affects Versions: 1.8.0 Reporter: Matt Dailey We have had situations where clients would hang indefinitely during job submission, even when job submission would succeed. We have not yet characterized what happened on the server to cause this, but we thought that the client should have a timeout for these requests. This was observed in Flink 1.5.5, but the code seems to still have this problem in 1.8.0. One option is to include a timeout in calls to {{CompletableFuture.get()}}: * [RestClusterClient in 1.5.5|https://github.com/apache/flink/blob/release-1.5.5/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L246] * [RestClusterClient in 1.8.0|https://github.com/apache/flink/blob/release-1.8.0/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L247] Thread dump from client running Flink 1.5.5, running in Java 8: {noformat} http-nio-0.0.0.0-8443-exec-6" #34 daemon prio=5 os_prio=0 tid=0x55b421fd2000 nid=0x29 waiting on condition [0x7f932e176000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xb331d7c0> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:246) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410) {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12384) Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable"
[ https://issues.apache.org/jira/browse/FLINK-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henrik updated FLINK-12384: --- Description: {code:java} [tm] 2019-05-01 13:30:53,316 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=analytics-zetcd:2181 sessionTimeout=6 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f [tm] 2019-05-01 13:30:53,384 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-3674237213070587877.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. [tm] 2019-05-01 13:30:53,395 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening socket connection to server analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181 [tm] 2019-05-01 13:30:53,395 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Using configured hostname/address for TaskManager: 10.1.2.173. [tm] 2019-05-01 13:30:53,401 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed [tm] 2019-05-01 13:30:53,418 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start actor system at 10.1.2.173:0 [tm] 2019-05-01 13:30:53,420 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket connection established to analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating session [tm] 2019-05-01 13:30:53,500 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket - Connected to an old server; r-o mode will be unavailable [tm] 2019-05-01 13:30:53,500 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session establishment complete on server analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = 0xbf06a739001d446, negotiated timeout = 6 [tm] 2019-05-01 13:30:53,525 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED{code} Repro: Start an etcd-cluster, with e.g. etcd-operator, with three members. Start zetcd in front. Configure the sesssion cluster to go against zetcd. Ensure the job can start successfully. Now, kill the etcd pods one by one, letting the quorum re-establish in between, so that the cluster is still OK. Now restart the job/tm pods. You'll end up in this no-mans-land. --- Workaround: clean out the etcd cluster and remove all its data, however, this resets all time windows and state, despite having that saved in GCS, so it's a crappy workaround. was: {code:java} [tm] 2019-05-01 13:30:53,316 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=analytics-zetcd:2181 sessionTimeout=6 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f [tm] 2019-05-01 13:30:53,384 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-3674237213070587877.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. [tm] 2019-05-01 13:30:53,395 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening socket connection to server analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181 [tm] 2019-05-01 13:30:53,395 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Using configured hostname/address for TaskManager: 10.1.2.173. [tm] 2019-05-01 13:30:53,401 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed [tm] 2019-05-01 13:30:53,418 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start actor system at 10.1.2.173:0 [tm] 2019-05-01 13:30:53,420 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket connection established to analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating session [tm] 2019-05-01 13:30:53,500 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket - Connected to an old server; r-o mode will be unavailable [tm] 2019-05-01 13:30:53,500 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session establishment complete on server analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = 0xbf06a739001d446, negotiated timeout = 6 [tm] 2019-05-01 13:30:53,525 INFO org.apache.flink.shaded.curato
[jira] [Created] (FLINK-12384) Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable"
Henrik created FLINK-12384: -- Summary: Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable" Key: FLINK-12384 URL: https://issues.apache.org/jira/browse/FLINK-12384 Project: Flink Issue Type: Bug Reporter: Henrik {code:java} [tm] 2019-05-01 13:30:53,316 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=analytics-zetcd:2181 sessionTimeout=6 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f [tm] 2019-05-01 13:30:53,384 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-3674237213070587877.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. [tm] 2019-05-01 13:30:53,395 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening socket connection to server analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181 [tm] 2019-05-01 13:30:53,395 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Using configured hostname/address for TaskManager: 10.1.2.173. [tm] 2019-05-01 13:30:53,401 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed [tm] 2019-05-01 13:30:53,418 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start actor system at 10.1.2.173:0 [tm] 2019-05-01 13:30:53,420 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket connection established to analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating session [tm] 2019-05-01 13:30:53,500 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket - Connected to an old server; r-o mode will be unavailable [tm] 2019-05-01 13:30:53,500 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session establishment complete on server analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = 0xbf06a739001d446, negotiated timeout = 6 [tm] 2019-05-01 13:30:53,525 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED{code} Repro: Start an etcd-cluster, with e.g. etcd-operator, with three members. Start zetcd in front. Configure the sesssion cluster to go against zetcd. Ensure the job can start successfully. Now, kill the etcd pods one by one, letting the quorum re-establish in between, so that the cluster is still OK. Now restart the job/tm pods. You'll end up in this no-mans-land. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12383) "Log file environment variable 'log.file' is not set" despite web.log.path being set
Henrik created FLINK-12383: -- Summary: "Log file environment variable 'log.file' is not set" despite web.log.path being set Key: FLINK-12383 URL: https://issues.apache.org/jira/browse/FLINK-12383 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.8.0 Reporter: Henrik You get these warnings when starting a session cluster, despite having configured all things logs as specified by the configuration reference on the [web site|https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#web-frontend]: {code:java} [job] 2019-05-01 13:25:35,418 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file environment variable 'log.file' is not set. [job] 2019-05-01 13:25:35,419 INFO org.apache.flink.runtime.webmonitor.WebMonitorUtils - Determined location of main cluster component log file: /var/lib/log/flink/jobmanager.log [job] 2019-05-01 13:25:35,419 INFO org.apache.flink.runtime.webmonitor.WebMonitorUtils - Determined location of main cluster component stdout file: /var/lib/log/flink/jobmanager.out {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12382) HA + ResourceManager exception: Fencing token not set
[ https://issues.apache.org/jira/browse/FLINK-12382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henrik updated FLINK-12382: --- Description: I'm testing zetcd + session jobs in k8s, and testing what happens when I kill both the job-cluster and task-manager at the same time, but maintain ZK/zetcd up and running. Then I get this stacktrace, that's completely non-actionable for me, and also resolves itself. I expect a number of retries, and if this exception is part of the protocol signalling to retry, then it should not be printed as a log entry. This might be related to an older bug: [https://jira.apache.org/jira/browse/FLINK-7734] {code:java} [tm] 2019-05-01 11:32:01,641 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - Registration at ResourceManager failed due to an error [tm] java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, HardwareDescription, Time))) sent to akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing token is null. [tm] at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) [tm] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) [tm] at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) [tm] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) [tm] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) [tm] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) [tm] at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:815) [tm] at akka.dispatch.OnComplete.internal(Future.scala:258) [tm] at akka.dispatch.OnComplete.internal(Future.scala:256) [tm] at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) [tm] at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) [tm] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) [tm] at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) [tm] at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) [tm] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) [tm] at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) [tm] at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:97) [tm] at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982) [tm] at akka.actor.Actor$class.aroundReceive(Actor.scala:502) [tm] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446) [tm] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) [tm] at akka.actor.ActorCell.invoke(ActorCell.scala:495) [tm] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [tm] at akka.dispatch.Mailbox.run(Mailbox.scala:224) [tm] at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [tm] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [tm] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [tm] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [tm] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [tm] Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, HardwareDescription, Time))) sent to akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing token is null. [tm] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63) [tm] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) [tm] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) [tm] at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) [tm] at akka.actor.Actor$class.aroundReceive(Actor.scala:502) [tm] at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) [tm] ... 9 more [tm] 2019-05-01 11:32:01,650 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Pausing and re-attempting registration in 1 ms [tm] 2019-05-01 11:32:03,070 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - The heartbeat of JobManager with id 3642aa576f132fecd6811ae0d314c2b5 timed out. [tm] 2019-05-01 11:32:03,070 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close Jo
[jira] [Updated] (FLINK-12382) HA + ResourceManager exception: Fencing token not set
[ https://issues.apache.org/jira/browse/FLINK-12382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henrik updated FLINK-12382: --- Description: I'm testing zetcd + session jobs in k8s, and testing what happens when I kill both the job-cluster and task-manager at the same time, but maintain ZK/zetcd up and running. Then I get this stacktrace, that's completely non-actionable for me, and also resolves itself. I expect a number of retries, and if this exception is part of the protocol signalling to retry, then it should not be printed as a log entry. This might be related to an older bug: [https://jira.apache.org/jira/browse/FLINK-7734] {code:java} [tm] 2019-05-01 11:32:01,641 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - Registration at ResourceManager failed due to an error [tm] java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, HardwareDescription, Time))) sent to akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing token is null. [tm] at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) [tm] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) [tm] at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) [tm] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) [tm] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) [tm] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) [tm] at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:815) [tm] at akka.dispatch.OnComplete.internal(Future.scala:258) [tm] at akka.dispatch.OnComplete.internal(Future.scala:256) [tm] at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) [tm] at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) [tm] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) [tm] at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) [tm] at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) [tm] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) [tm] at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) [tm] at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:97) [tm] at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982) [tm] at akka.actor.Actor$class.aroundReceive(Actor.scala:502) [tm] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446) [tm] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) [tm] at akka.actor.ActorCell.invoke(ActorCell.scala:495) [tm] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [tm] at akka.dispatch.Mailbox.run(Mailbox.scala:224) [tm] at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [tm] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [tm] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [tm] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [tm] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [tm] Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, HardwareDescription, Time))) sent to akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing token is null. [tm] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63) [tm] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) [tm] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) [tm] at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) [tm] at akka.actor.Actor$class.aroundReceive(Actor.scala:502) [tm] at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) [tm] ... 9 more [tm] 2019-05-01 11:32:01,650 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Pausing and re-attempting registration in 1 ms [tm] 2019-05-01 11:32:03,070 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - The heartbeat of JobManager with id 3642aa576f132fecd6811ae0d314c2b5 timed out. [tm] 2019-05-01 11:32:03,070 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close Jo
[jira] [Created] (FLINK-12382) HA + ResourceManager exception: Fencing token not set
Henrik created FLINK-12382: -- Summary: HA + ResourceManager exception: Fencing token not set Key: FLINK-12382 URL: https://issues.apache.org/jira/browse/FLINK-12382 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.8.0 Environment: Same all all previous bugs filed by myself, today, but this time with HA with zetcd. Reporter: Henrik I'm testing zetcd + session jobs in k8s, and testing what happens when I kill both the job-cluster and task-manager at the same time, but maintain ZK/zetcd up and running. Then I get this stacktrace, that's completely non-actionable for me, and also resolves itself. I expect a number of retries, and if this exception is part of the protocol signalling to retry, then it should not be printed as a log entry. This might be related to an older bug: https://jira.apache.org/jira/browse/FLINK-7734 {code:java} [tm] 2019-05-01 11:32:01,641 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - Registration at ResourceManager failed due to an error [tm] java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, HardwareDescription, Time))) sent to akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing token is null. [tm] at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) [tm] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) [tm] at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) [tm] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) [tm] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) [tm] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) [tm] at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:815) [tm] at akka.dispatch.OnComplete.internal(Future.scala:258) [tm] at akka.dispatch.OnComplete.internal(Future.scala:256) [tm] at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) [tm] at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) [tm] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) [tm] at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) [tm] at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) [tm] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) [tm] at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) [tm] at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:97) [tm] at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982) [tm] at akka.actor.Actor$class.aroundReceive(Actor.scala:502) [tm] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446) [tm] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) [tm] at akka.actor.ActorCell.invoke(ActorCell.scala:495) [tm] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [tm] at akka.dispatch.Mailbox.run(Mailbox.scala:224) [tm] at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [tm] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [tm] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [tm] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [tm] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [tm] Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(b8fb92460bdd5c250de1415e6cf04d4e, RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, HardwareDescription, Time))) sent to akka.tcp://flink@analytics-job:6123/user/resourcemanager because the fencing token is null. [tm] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63) [tm] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) [tm] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) [tm] at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) [tm] at akka.actor.Actor$class.aroundReceive(Actor.scala:502) [tm] at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) [tm] ... 9 more [tm] 2019-05-01 11:32:01,650 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Pausing and re-attempting registr
[GitHub] [flink] haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-488239680 The checkpoints take e.g. `12528 ms` to write, but they are not large at all: https://user-images.githubusercontent.com/193115/57011163-8c3b3100-6c00-11e9-88de-5e3f59086ab5.png";> I put the checkpoint period to: ``` env.enableCheckpointing(9000); // checkpoint every N seconds env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); ``` But I've tried 1000 and 1500 respectively, as well with approximately the same results. The creation looks like this: ```java final String subscriptionName = params.get("pubsub-subscription", Env.getOrElse("ANALYTICS_PUBSUB_SUBSCRIPTION", "logs-analytics-dev")); String jsonFilePath = Optional .ofNullable(System.getenv("GOOGLE_APPLICATION_CREDENTIALS")) .orElse("./analytics-dev.json"); if (! new File(jsonFilePath).exists()) { throw new Exception(String.format("Couldn't find path: '%s'", jsonFilePath)); } Credentials credentials = GoogleCredentials.fromStream(new FileInputStream(jsonFilePath)); logger.info("Using Credential JSON: {}, subscription name: {}", jsonFilePath, subscriptionName); PubSubSource source = PubSubSource.newBuilder(new AppEventSerializer(), projectId, subscriptionName).withCredentials(credentials).build(); return env.addSource(source).uid(String.format("pubsub-%s", subscriptionName)); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Leeviiii commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
Leev commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#discussion_r279326911 ## File path: docs/ops/state/savepoints.zh.md ## @@ -25,32 +25,24 @@ under the License. * toc {:toc} -## What is a Savepoint? How is a Savepoint different from a Checkpoint? +## 什么是 Savepoint ? Savepoint 与 Checkpoint 有什么不同? -A Savepoint is a consistent image of the execution state of a streaming job, created via Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html). You can use Savepoints to stop-and-resume, fork, -or update your Flink jobs. Savepoints consist of two parts: a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, ...) and a (relatively small) meta data file. The files on stable storage represent the net data of the job's execution state -image. The meta data file of a Savepoint contains (primarily) pointers to all files on stable storage that are part of the Savepoint, in form of absolute paths. +Savepoint 是依据 Flink [检查点机制]({{ site.baseurl }}/internals/stream_checkpointing.html)所创建的流作业执行状态的一致镜像。 您可以使用 Savepoint 来停止并恢复,fork,或更新您的 Flink 工作。 Savepoint 由两部分组成:具有稳定存储(例如 HDFS,S3,...)上的(通常是大的)二进制文件的目录和(相对较小的)元数据文件。 稳定存储上的文件表示作业执行状态的净数据图片。 Savepoint 的元数据文件以(绝对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。 -Attention: In order to allow upgrades between programs and Flink versions, it is important to check out the following section about assigning IDs to your operators. +注意: 为了允许程序和 Flink 版本之间的升级,请务必查看以下有关分配算子 ID 的部分 。 +从概念上讲,Flink 的 Savepoint 与 Checkpoint 的不同之处在于备份与传统数据库系统中的恢复日志不同。 检查点的主要目的是提供恢复机制,以防万一 Review comment: 1. 从概念上讲,Flink 的 Savepoint 与 Checkpoint 的不同之处在于备份与传统数据库系统中的恢复日志不同 -->从概念上讲, Flink 的 Savepoint 与 Checkpoint 的不同之处类似于传统数据库中的备份与恢复日志之间的差异。 2. 检查点的主要目的是提供恢复机制,以防万一出乎意料的失业 --> Checkpoint的主要目的是为意外失败的作为提供恢复机制。 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-488235510 Hi @haf The connector will acknowledge messages on every successful checkpoint. How often do you checkpoint? What might happen is that for some reason messages are not acknowledged fast enough. Could you also show how you create the PubSubSource? So what configuration you've used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12380) Add thread name in the log4j.properties
[ https://issues.apache.org/jira/browse/FLINK-12380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12380: --- Labels: pull-request-available (was: ) > Add thread name in the log4j.properties > --- > > Key: FLINK-12380 > URL: https://issues.apache.org/jira/browse/FLINK-12380 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > > This is inspired by FLINK-12368 when users want to add sub-task index > information in the source code. We could add thread name, which already > contains sub-task index information, in the logs to avoid have to change the > source code. > Moreover, I found existing {{logback.xml}} in Flink already contains {{thread > name}} information. We should also add this in the {{log4j.properties.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8328: [FLINK-12380] Add thread name in the log4j.properties
flinkbot commented on issue #8328: [FLINK-12380] Add thread name in the log4j.properties URL: https://github.com/apache/flink/pull/8328#issuecomment-488234756 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka opened a new pull request #8328: [FLINK-12380] Add thread name in the log4j.properties
Myasuka opened a new pull request #8328: [FLINK-12380] Add thread name in the log4j.properties URL: https://github.com/apache/flink/pull/8328 ## What is the purpose of the change This is inspired by FLINK-12368 when users want to add sub-task index information in the source code. We could add thread name, which already contains sub-task index information, in the logs to avoid have to change the source code. Moreover, I found existing `logback.xm`l in Flink already contains thread name information. We should also add this in the `log4j.properties`. ## Brief change log Add thread name in the log4j.properties ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on issue #8315: [FLINK-12368] add subtask index to FlinkKafkaConsumerBase logging, wh…
Myasuka commented on issue #8315: [FLINK-12368] add subtask index to FlinkKafkaConsumerBase logging, wh… URL: https://github.com/apache/flink/pull/8315#issuecomment-488234179 @stevenzwu I found existing `logback.xml` in Flink already contains `thread name` information, and I create a jira [FLINK-12380](https://issues.apache.org/jira/browse/FLINK-12380) to track this. In my opinion, developers would always feel bothered if we want sub-task index information to help debug but have to change the source code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8327: [FLINK-12333][docs] Add documentation for all async operations through REST API
flinkbot commented on issue #8327: [FLINK-12333][docs] Add documentation for all async operations through REST API URL: https://github.com/apache/flink/pull/8327#issuecomment-488232864 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12333) Add documentation for all async operations through REST
[ https://issues.apache.org/jira/browse/FLINK-12333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12333: --- Labels: pull-request-available (was: ) > Add documentation for all async operations through REST > > > Key: FLINK-12333 > URL: https://issues.apache.org/jira/browse/FLINK-12333 > Project: Flink > Issue Type: Improvement > Components: Documentation, Runtime / REST >Affects Versions: 1.8.0 >Reporter: Vishal Santoshi >Assignee: Yun Tang >Priority: Minor > Labels: pull-request-available > > Exit code 2 seen ( after 5 minutes ) when > {code:java} > curl --header "Content-Type: application/json" --request POST --data > '{"target-directory":"***","cancel-job":true}' > https://***/jobs//savepoints{code} > It seems that when a REST call is > "Triggered the cancel with savepoint command from via the REST call. This > command is an asynchronous operation which produces a result (the savepoint > path). In order to deliver asynchronous results to the caller, Flink waits > before shutting down until they are delivered or until it times out after 5 > minutes." > > That implies that one has to execute > {code:java} > curl --request GET > https://**/jobs//savepoints/[Request_id] > {code} > on the request_id returned by the first call ( within 5 minutes ) , for a > clean exit ( code 0 ) > > Please add this life cycle in flink documentation , in all probability here > [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-savepoints] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Myasuka opened a new pull request #8327: [FLINK-12333][docs] Add documentation for all async operations through REST API
Myasuka opened a new pull request #8327: [FLINK-12333][docs] Add documentation for all async operations through REST API URL: https://github.com/apache/flink/pull/8327 ## What is the purpose of the change Improve the description for all async-operations through REST API. `Trigger savepoint` and `rescaling` operations are all async-operations which would return a request-id for further query identifier. However, current REST documentation does not describe this clearly and users might be confused to use these REST APIs ## Brief change log - Add description for all async operation headers. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes
[ https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henrik updated FLINK-12381: --- Description: {code:java} Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 'gs://example_bucket/flink/checkpoints//chk-16/_metadata' already exists at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) ... 8 more {code} Instead, it should either just overwrite the checkpoint or fail to start the job completely. Partial and undefined failure is not what should happen. Repro: # Set up a single purpose job cluster (which could use much better docs btw!) # Let it run with GCS checkpointing for a while with rocksdb/gs://example # Kill it # Start it was: {code:java} Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 'gs://example_bucket/flink/checkpoints//chk-16/_metadata' already exists at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) ... 8 more {code} Instead, it should either just overwrite the checkpoint or fail to start the job completely. Partial and undefined failure is not what should happen. > W/o HA, upon a full restart, checkpointing crashes > -- > > Key: FLINK-12381 > URL: https://issues.apache.org/jira/browse/FLINK-12381 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.8.0 > Environment: Same as FLINK-\{12379, 12377, 12376} >Reporter: Henrik >Priority: Major > > {code:java} > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > 'gs://example_bucket/flink/checkpoints//chk-16/_metadata' > already exists > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCh
[jira] [Updated] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes
[ https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henrik updated FLINK-12381: --- Summary: W/o HA, upon a full restart, checkpointing crashes (was: Without failover (aka "HA") configured, full restarts' checkpointing crashes) > W/o HA, upon a full restart, checkpointing crashes > -- > > Key: FLINK-12381 > URL: https://issues.apache.org/jira/browse/FLINK-12381 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.8.0 > Environment: Same as FLINK-\{12379, 12377, 12376} >Reporter: Henrik >Priority: Major > > {code:java} > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > 'gs://example_bucket/flink/checkpoints//chk-16/_metadata' > already exists > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) > ... 8 more > {code} > Instead, it should either just overwrite the checkpoint or fail to start the > job completely. Partial and undefined failure is not what should happen. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12381) Without failover (aka "HA") configured, full restarts' checkpointing crashes
Henrik created FLINK-12381: -- Summary: Without failover (aka "HA") configured, full restarts' checkpointing crashes Key: FLINK-12381 URL: https://issues.apache.org/jira/browse/FLINK-12381 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.8.0 Environment: Same as FLINK-\{12379, 12377, 12376} Reporter: Henrik {code:java} Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 'gs://example_bucket/flink/checkpoints//chk-16/_metadata' already exists at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) ... 8 more {code} Instead, it should either just overwrite the checkpoint or fail to start the job completely. Partial and undefined failure is not what should happen. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12380) Add thread name in the log4j.properties
Yun Tang created FLINK-12380: Summary: Add thread name in the log4j.properties Key: FLINK-12380 URL: https://issues.apache.org/jira/browse/FLINK-12380 Project: Flink Issue Type: Improvement Components: Build System Reporter: Yun Tang Assignee: Yun Tang This is inspired by FLINK-12368 when users want to add sub-task index information in the source code. We could add thread name, which already contains sub-task index information, in the logs to avoid have to change the source code. Moreover, I found existing {{logback.xml}} in Flink already contains {{thread name}} information. We should also add this in the {{log4j.properties.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
haf commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-488227037 @xeli I'm getting pretty bad throughput from the source; about 20 messages per second with a lot of expired messages (200/s): https://user-images.githubusercontent.com/193115/56998726-b531d680-6bac-11e9-88ea-fd73387e3d95.png";> Any ideas why? I checked the "backpressure" tab in Flink's UI and can't see anything indicative of a problem there. I'm running outside the cloud on my own laptop with incremental snapshots to Google Cloud Storage. This is what it looks like after running all night: https://user-images.githubusercontent.com/193115/57008932-ceaa4100-6bf3-11e9-879b-9c3303ca61b5.png";> This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services