[jira] [Created] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
yuemeng created FLINK-29801: --- Summary: OperatorCoordinator need open the way to operate metricGroup interface Key: FLINK-29801 URL: https://issues.apache.org/jira/browse/FLINK-29801 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: yuemeng Currently, We have no way to get metric group instances in OperatorCoordinator In some cases, we may report some metric in OperatorCoordinator such as Flink hudi integrate scene, some meta will send to operator coordinator to commit to hdfs or hms but we also need to report some metrics in operator coordinator for monitor purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode
yuemeng created FLINK-20935: --- Summary: can't write flink configuration to tmp file and add it to local resource in yarn session mode Key: FLINK-20935 URL: https://issues.apache.org/jira/browse/FLINK-20935 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.12.0, 1.13.0 Reporter: yuemeng In flink 1.12.0 or lastest version,when we execute command such as bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with follow errors: {code} org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730) Caused by: java.io.FileNotFoundException: File does not exist: /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) {code} when we called startAppMaster method in YarnClusterDescriptor,it will be try to write flink configuration to tmp file and add it to local resource. but the follow code will make the tmp file system as a distribute file system {code} // Upload the flink configuration // write out configuration file File tmpConfigurationFile = null; try { tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null); BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile); String flinkConfigKey = "flink-conf.yaml"; fileUploader.registerSingleLocalResource( flinkConfigKey, new Path(tmpConfigurationFile.getAbsolutePath()), "", LocalResourceType.FILE, true, true); classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); } finally { if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) { LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath()); } } {code} {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a path without file schema and the file system will be considered as a distribute file system -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14666) support multiple catalog in flink table sql
yuemeng created FLINK-14666: --- Summary: support multiple catalog in flink table sql Key: FLINK-14666 URL: https://issues.apache.org/jira/browse/FLINK-14666 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.1, 1.9.0, 1.8.2, 1.8.0 Reporter: yuemeng currently, calcite will only use the current catalog as schema path to validate sql node, maybe this is not reasonable {code} tableEnvironment.useCatalog("user_catalog"); tableEnvironment.useDatabase("user_db"); Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' SECOND)"); tableEnvironment.registerTable("v1", table); Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1"); tableEnvironment.registerTable("v2", t2); tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT action, os,cast (cnt as BIGINT) as cnt from v2"); {code} suppose source table music_queue_3 and sink table kafka_table_test1 both in user_catalog catalog but some temp table or view such as v1, v2,v3 will register in default catalog. when we select temp table v2 and insert it into our own catalog table database2.kafka_table_test1 it always failed with sql node validate, because of schema path in catalog reader is the current catalog without default catalog,the temp table or view will never be Identified -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-11057) where in grammar will cause stream inner join loigcal
yuemeng created FLINK-11057: --- Summary: where in grammar will cause stream inner join loigcal Key: FLINK-11057 URL: https://issues.apache.org/jira/browse/FLINK-11057 Project: Flink Issue Type: Bug Reporter: yuemeng {code} select action , count ( * ) as cnt from user_action where action in ( 'view' , 'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' , 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) group by tumble ( proctime , interval '60' SECOND ) , action {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source
yuemeng created FLINK-9329: -- Summary: hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source Key: FLINK-9329 URL: https://issues.apache.org/jira/browse/FLINK-9329 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: yuemeng Assignee: yuemeng {{{code}}} {{KafkaTableSource source = Kafka010JsonTableSource.builder() // ... .withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) .field("temp", Types.DOUBLE()) // field "ptime" is of type SQL_TIMESTAMP .field("ptime", Types.SQL_TIMESTAMP()).build()) // declare "ptime" as processing time attribute .withProctimeAttribute("ptime") .build();}} tableEnv.registerTableSource("flights", kafkaTableSource); {{{code}}} {{ }} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9201) same merge window will fire twice if watermark already pass the window for merge windows
yuemeng created FLINK-9201: -- Summary: same merge window will fire twice if watermark already pass the window for merge windows Key: FLINK-9201 URL: https://issues.apache.org/jira/browse/FLINK-9201 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.3.3 Reporter: yuemeng Assignee: yuemeng sum with session window,. suppose the session gap is 3 seconds and allowedlateness is 60 seconds w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9 if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11. w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),then w3 will be fired later by call triggerContext.onElement(element) because of the watermark pass the w3. but w3 will be fired again because of the timer < current watermark. that mean w3 will be fired twice because of watermark pass the new merge window w3. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-7151) FLINK SQL support create temporary function and table
yuemeng created FLINK-7151: -- Summary: FLINK SQL support create temporary function and table Key: FLINK-7151 URL: https://issues.apache.org/jira/browse/FLINK-7151 Project: Flink Issue Type: New Feature Reporter: yuemeng Based on create temporary function and table.we can register a udf,udaf,udtf use sql: {code} CREATE TEMPORARY function 'TOPK' AS 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP BY id; {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7148) Flink SQL API support DDL
yuemeng created FLINK-7148: -- Summary: Flink SQL API support DDL Key: FLINK-7148 URL: https://issues.apache.org/jira/browse/FLINK-7148 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: yuemeng For now,Flink SQL can't support DDL operation,user can only register a table by call registerTableInternal in TableEnvironment. we should support DDL such as create table or create function like: {code} CREATE TABLE kafka_source ( id INT, price INT ) PROPERTIES ( category = 'source', type = 'kafka', version = '0.9.0.1', separator = ',', topic = 'test', brokers = ':9092', group_id = 'test' ); CREATE TABLE db_sink ( id INT, price DOUBLE ) PROPERTIES ( category = 'sink', type = 'mysql', table_name = 'udaf_test', url = 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', username = 'ds_dev', password = 's]k51_(>R' ); CREATE TEMPORARY function 'AVGUDAF' AS 'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction
yuemeng created FLINK-7145: -- Summary: Flink SQL API should support multiple parameters for UserDefinedAggFunction Key: FLINK-7145 URL: https://issues.apache.org/jira/browse/FLINK-7145 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: yuemeng Priority: Critical UDAF such as topK and some other udaf with bloom filter need more than one parameters ,we should make flink sql support this. base on flink sql support DML and multiple parameters udaf,we can execute sql like: {code} CREATE TEMPORARY function 'TOPK' AS 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP BY id; {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7146) FLINK SQLs support DDL
yuemeng created FLINK-7146: -- Summary: FLINK SQLs support DDL Key: FLINK-7146 URL: https://issues.apache.org/jira/browse/FLINK-7146 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: yuemeng For now,Flink SQL can't support DDL, we can only register a table by call registerTableInternal in TableEnvironment we should support DDL for sql such as create a table or create function like: {code} CREATE TABLE kafka_source ( id INT, price INT ) PROPERTIES ( category = 'source', type = 'kafka', version = '0.9.0.1', separator = ',', topic = 'test', brokers = 'xx:9092', group_id = 'test' ); CREATE TABLE db_sink ( id INT, price DOUBLE ) PROPERTIES ( category = 'sink', type = 'mysql', table_name = 'udaf_test', url = 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', username = 'ds_dev', password = 's]k51_(>R' ); CREATE TEMPORARY function 'AVGUDAF' AS 'com.x.server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode
yuemeng created FLINK-5324: -- Summary: JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode Key: FLINK-5324 URL: https://issues.apache.org/jira/browse/FLINK-5324 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.1.3 Reporter: yuemeng Priority: Critical YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm options {code} final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") {code/} so when we add some jvm options for one of them ,it will be both worked -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink
yuemeng created FLINK-4879: -- Summary: class KafkaTableSource should be public just like KafkaTableSink Key: FLINK-4879 URL: https://issues.apache.org/jira/browse/FLINK-4879 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.1.3, 1.1.1 Reporter: yuemeng Priority: Minor Fix For: 1.1.4 class KafkaTableSource should be public just like KafkaTableSink,by default,it's modifier is default ,and we cann't access out of it's package,for example: def createKafkaTableSource( topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row], fieldsNames: Array[String], typeInfo: Array[TypeInformation[_]]): KafkaTableSource = { if (deserializationSchema != null) { new Kafka09TableSource(topic, properties, deserializationSchema, fieldsNames, typeInfo) } else { new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo) } } Because of the class KafkaTableSource modifier is default,we cann't define this function result type with KafkaTableSource ,we must give the specific type. if some other kafka source extends KafkaTableSource ,and we don't sure which subclass of KafkaTableSource should be use,how can we specific the type? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4827) Sql on streaming example use scala with wrong variable name
yuemeng created FLINK-4827: -- Summary: Sql on streaming example use scala with wrong variable name Key: FLINK-4827 URL: https://issues.apache.org/jira/browse/FLINK-4827 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.1.2, 1.1.0 Reporter: yuemeng Priority: Minor Fix For: 1.1.3 val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result = tableEnv.sql( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") There is no variable named tableEnv defined here,it's tEnv defined here -- This message was sent by Atlassian JIRA (v6.3.4#6332)