[jira] [Created] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-10-30 Thread yuemeng (Jira)
yuemeng created FLINK-29801:
---

 Summary: OperatorCoordinator need open the way to operate 
metricGroup interface
 Key: FLINK-29801
 URL: https://issues.apache.org/jira/browse/FLINK-29801
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: yuemeng


Currently, We have no way to get metric group instances in OperatorCoordinator

In some cases, we may report some metric in OperatorCoordinator such as Flink 
hudi integrate scene, some meta will send to operator coordinator to commit to 
hdfs or hms

but we also need to report some metrics in operator coordinator for monitor 
purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode

2021-01-11 Thread yuemeng (Jira)
yuemeng created FLINK-20935:
---

 Summary: can't write flink configuration to tmp file and add it to 
local resource in yarn session mode
 Key: FLINK-20935
 URL: https://issues.apache.org/jira/browse/FLINK-20935
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.12.0, 1.13.0
Reporter: yuemeng


In flink 1.12.0 or lastest version,when we execute command such as 
bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with 
follow errors:
{code}
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy 
Yarn session cluster
at 
org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730)
Caused by: java.io.FileNotFoundException: File does not exist: 
/tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
{code}

when we called startAppMaster method in YarnClusterDescriptor,it will be try to 
write flink configuration to tmp file and add it to local resource. but the 
follow code will make the tmp file system as a distribute file system
{code}
// Upload the flink configuration
// write out configuration file
File tmpConfigurationFile = null;
try {
tmpConfigurationFile = File.createTempFile(appId + 
"-flink-conf.yaml", null);
BootstrapTools.writeConfiguration(configuration, 
tmpConfigurationFile);

String flinkConfigKey = "flink-conf.yaml";
fileUploader.registerSingleLocalResource(
flinkConfigKey,
new 
Path(tmpConfigurationFile.getAbsolutePath()),
"",
LocalResourceType.FILE,
true,
true);

classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
} finally {
if (tmpConfigurationFile != null && 
!tmpConfigurationFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", 
tmpConfigurationFile.toPath());
}
}
{code}

{code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a 
path without file schema and the file system will be considered as a distribute 
file system







--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14666) support multiple catalog in flink table sql

2019-11-07 Thread yuemeng (Jira)
yuemeng created FLINK-14666:
---

 Summary: support multiple catalog in flink table sql
 Key: FLINK-14666
 URL: https://issues.apache.org/jira/browse/FLINK-14666
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.9.0, 1.8.2, 1.8.0
Reporter: yuemeng


currently, calcite will only use the current catalog as schema path to validate 
sql node,
maybe this is not reasonable

{code}
tableEnvironment.useCatalog("user_catalog");
tableEnvironment.useDatabase("user_db");
 Table table = tableEnvironment.sqlQuery("SELECT action, os,count(*) as cnt 
from music_queue_3 group by action, os,tumble(proctime, INTERVAL '10' 
SECOND)"); tableEnvironment.registerTable("v1", table);
Table t2 = tableEnvironment.sqlQuery("select action, os, 1 as cnt from v1");
tableEnvironment.registerTable("v2", t2);
tableEnvironment.sqlUpdate("INSERT INTO database2.kafka_table_test1 SELECT 
action, os,cast (cnt as BIGINT) as cnt from v2");
{code}

suppose source table music_queue_3  and sink table kafka_table_test1 both in 
user_catalog
 catalog 
 but some temp table or view such as v1, v2,v3 will register in default catalog.

when we select temp table v2 and insert it into our own catalog table 
database2.kafka_table_test1 
it always failed with sql node validate, because of schema path in
catalog reader is the current catalog without default catalog,the temp table or 
view will never be Identified
















--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-11057) where in grammar will cause stream inner join loigcal

2018-12-03 Thread yuemeng (JIRA)
yuemeng created FLINK-11057:
---

 Summary: where in grammar will cause stream inner join loigcal
 Key: FLINK-11057
 URL: https://issues.apache.org/jira/browse/FLINK-11057
 Project: Flink
  Issue Type: Bug
Reporter: yuemeng


{code}

select action , count ( * ) as cnt from user_action where action in ( 'view' , 
'impress' , 'sysaction' , 'commentimpress' , 'play' , 'click' , 'page' , 
'abtestreqsuss' , 'bannerimpress' , 'abtestserver' , 'active' , 'search' , 
'activeclient' , 'like' , 'zan' , 'adclick' , 'login' , 'comment' , 
'subscribeartist' , 'subscribevideo' , 'subscribedjradio' , 'share' , 'private' 
, 'register' , 'downloadall' , 'forward' , 'newdj' , 'recommendimpress' , 
'hotkeywordimpress' , 'nogetad' , 'add' , 'subscribe' , 'follow' , 'new' ) 
group by tumble ( proctime , interval '60' SECOND ) , action

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source

2018-05-10 Thread yuemeng (JIRA)
yuemeng created FLINK-9329:
--

 Summary: hasRowtimeAttribute will throw NPE if user use 
setProctimeAttribute for table source
 Key: FLINK-9329
 URL: https://issues.apache.org/jira/browse/FLINK-9329
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: yuemeng
Assignee: yuemeng


{{{code}}}

{{KafkaTableSource source = Kafka010JsonTableSource.builder() // ...  
.withSchema(TableSchema.builder() .field("sensorId", Types.LONG()) 
.field("temp", Types.DOUBLE()) // field "ptime" is of type SQL_TIMESTAMP 
.field("ptime", Types.SQL_TIMESTAMP()).build()) // declare "ptime" as 
processing time attribute .withProctimeAttribute("ptime") .build();}}

tableEnv.registerTableSource("flights", kafkaTableSource);

{{{code}}}

{{ }}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9201) same merge window will fire twice if watermark already pass the window for merge windows

2018-04-18 Thread yuemeng (JIRA)
yuemeng created FLINK-9201:
--

 Summary: same merge window will fire twice if watermark already 
pass the window for merge windows
 Key: FLINK-9201
 URL: https://issues.apache.org/jira/browse/FLINK-9201
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.3.3
Reporter: yuemeng
Assignee: yuemeng


sum with session window,.

suppose the session gap is 3 seconds and allowedlateness is 60 seconds

w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9

 if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 
11.

w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),then w3 will be fired 
later by call triggerContext.onElement(element) because of the watermark pass 
the w3.

but w3 will be fired again because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-7151) FLINK SQL support create temporary function and table

2017-07-11 Thread yuemeng (JIRA)
yuemeng created FLINK-7151:
--

 Summary: FLINK SQL support create temporary function and table
 Key: FLINK-7151
 URL: https://issues.apache.org/jira/browse/FLINK-7151
 Project: Flink
  Issue Type: New Feature
Reporter: yuemeng


Based on create temporary function and table.we can register a udf,udaf,udtf 
use sql:
{code}
CREATE TEMPORARY function 'TOPK' AS 
'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
BY id;
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7148) Flink SQL API support DDL

2017-07-11 Thread yuemeng (JIRA)
yuemeng created FLINK-7148:
--

 Summary: Flink SQL API support  DDL
 Key: FLINK-7148
 URL: https://issues.apache.org/jira/browse/FLINK-7148
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: yuemeng


For now,Flink SQL can't support DDL operation,user can only register a table by 
call registerTableInternal in TableEnvironment. we should support DDL such as  
create table or create function like:
{code}
CREATE TABLE kafka_source (
  id INT,
  price INT
) PROPERTIES (
  category = 'source',
  type = 'kafka',
  version = '0.9.0.1',
  separator = ',',
  topic = 'test',
  brokers = ':9092',
  group_id = 'test'
);

CREATE TABLE db_sink (
  id INT,
  price DOUBLE
) PROPERTIES (
  category = 'sink',
  type = 'mysql',
  table_name = 'udaf_test',
  url = 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
  username = 'ds_dev',
  password = 's]k51_(>R'
);

CREATE TEMPORARY function 'AVGUDAF' AS 
'com..server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';
INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id

{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7145) Flink SQL API should support multiple parameters for UserDefinedAggFunction

2017-07-11 Thread yuemeng (JIRA)
yuemeng created FLINK-7145:
--

 Summary: Flink SQL API should support multiple parameters for 
UserDefinedAggFunction
 Key: FLINK-7145
 URL: https://issues.apache.org/jira/browse/FLINK-7145
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: yuemeng
Priority: Critical


UDAF such as topK and some other udaf with bloom filter need more than one 
parameters ,we should make flink sql support this.
base on flink sql support DML and multiple parameters udaf,we can execute sql 
like:
{code}
CREATE TEMPORARY function 'TOPK' AS 
'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF';
INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP 
BY id;
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7146) FLINK SQLs support DDL

2017-07-11 Thread yuemeng (JIRA)
yuemeng created FLINK-7146:
--

 Summary: FLINK SQLs support DDL
 Key: FLINK-7146
 URL: https://issues.apache.org/jira/browse/FLINK-7146
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: yuemeng


For now,Flink SQL can't support DDL, we can only register a table by call 
registerTableInternal in TableEnvironment
we should support DDL for sql such as create a table or create function like:
{code}

CREATE TABLE kafka_source (
  id INT,
  price INT
) PROPERTIES (
  category = 'source',
  type = 'kafka',
  version = '0.9.0.1',
  separator = ',',
  topic = 'test',
  brokers = 'xx:9092',
  group_id = 'test'
);

CREATE TABLE db_sink (
  id INT,
  price DOUBLE
) PROPERTIES (
  category = 'sink',
  type = 'mysql',
  table_name = 'udaf_test',
  url = 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
  username = 'ds_dev',
  password = 's]k51_(>R'
);

CREATE TEMPORARY function 'AVGUDAF' AS 
'com.x.server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';

INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id


{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-5324) JVM Opitons will be work both for YarnApplicationMasterRunner and YarnTaskManager with yarn mode

2016-12-12 Thread yuemeng (JIRA)
yuemeng created FLINK-5324:
--

 Summary: JVM Opitons will be work both for 
YarnApplicationMasterRunner and YarnTaskManager with yarn mode
 Key: FLINK-5324
 URL: https://issues.apache.org/jira/browse/FLINK-5324
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.1.3
Reporter: yuemeng
Priority: Critical


YarnApplicationMasterRunner and YarnTaskManager both use follow code to get jvm 
options
{code}
final String javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
{code/}
so when we add some jvm options for one of them ,it will be both worked



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4879) class KafkaTableSource should be public just like KafkaTableSink

2016-10-21 Thread yuemeng (JIRA)
yuemeng created FLINK-4879:
--

 Summary: class KafkaTableSource should be public just like 
KafkaTableSink
 Key: FLINK-4879
 URL: https://issues.apache.org/jira/browse/FLINK-4879
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.1.3, 1.1.1
Reporter: yuemeng
Priority: Minor
 Fix For: 1.1.4


class KafkaTableSource should be public just like KafkaTableSink,by 
default,it's modifier is default ,and we cann't access out of it's package,for 
example:
 def createKafkaTableSource(
  topic: String,
  properties: Properties,
  deserializationSchema: DeserializationSchema[Row],
  fieldsNames: Array[String],
  typeInfo: Array[TypeInformation[_]]): KafkaTableSource = {

if (deserializationSchema != null) {
  new Kafka09TableSource(topic, properties, deserializationSchema, 
fieldsNames, typeInfo)
} else {
  new Kafka09JsonTableSource(topic, properties, fieldsNames, typeInfo)
}
  }


Because of the class KafkaTableSource modifier is default,we cann't define this 
function result type with KafkaTableSource ,we must give the specific type.
if some other kafka source extends KafkaTableSource ,and we don't sure which 
subclass of KafkaTableSource should be use,how can we specific the type?






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4827) Sql on streaming example use scala with wrong variable name

2016-10-13 Thread yuemeng (JIRA)
yuemeng created FLINK-4827:
--

 Summary: Sql on streaming example use scala with wrong variable 
name
 Key: FLINK-4827
 URL: https://issues.apache.org/jira/browse/FLINK-4827
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.1.2, 1.1.0
Reporter: yuemeng
Priority: Minor
 Fix For: 1.1.3


val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")


There is no variable named tableEnv defined here,it's tEnv defined here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)