sink.rolling-policy.file-size不生效

2020-12-03 Thread admin
Hi all,
使用flink 1.11.1的filesystem 
connector,配置了sink.rolling-policy.file-size=50MB,结果依然有100+M的文件
DDL如下:Checkpoint间隔1min
CREATE TABLE cpc_bd_recall_log_hdfs (
log_timestamp BIGINT,
ip STRING,
`raw` STRING,
`day` STRING, `hour` STRING,`minute` STRING
) PARTITIONED BY (`day` , `hour` ,`minute`) WITH (
'connector'='filesystem',
'path'='hdfs://xxx/test.db/cpc_bd_recall_log_hdfs',
'format'='parquet',
'parquet.compression'='SNAPPY',
'sink.rolling-policy.file-size' = '50MB',
'sink.partition-commit.policy.kind' = 'success-file',
'sink.partition-commit.delay'='60s'
);


Hdfs文件如下:

  0 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/_SUCCESS
-rw-r--r--   3 hadoop hadoop 31.7 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-0-2500
-rw-r--r--   3 hadoop hadoop121.8 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-0-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-1-2499
-rw-r--r--   3 hadoop hadoop122.0 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-1-2500
-rw-r--r--   3 hadoop hadoop 31.8 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-10-2501
-rw-r--r--   3 hadoop hadoop121.8 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-10-2502
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-11-2500
-rw-r--r--   3 hadoop hadoop122.2 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-11-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-12-2500
-rw-r--r--   3 hadoop hadoop122.2 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-12-2501
-rw-r--r--   3 hadoop hadoop 31.8 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-13-2499
-rw-r--r--   3 hadoop hadoop122.0 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-13-2500
-rw-r--r--   3 hadoop hadoop 31.6 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-14-2500
-rw-r--r--   3 hadoop hadoop122.1 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-14-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-15-2498
-rw-r--r--   3 hadoop hadoop121.8 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-15-2499
-rw-r--r--   3 hadoop hadoop 31.7 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-16-2501
-rw-r--r--   3 hadoop hadoop122.0 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-16-2502
-rw-r--r--   3 hadoop hadoop 31.7 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-17-2500
-rw-r--r--   3 hadoop hadoop122.5 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-17-2501
-rw-r--r--   3 hadoop hadoop 31.8 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-18-2500
-rw-r--r--   3 hadoop hadoop121.7 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-18-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-19-2501
-rw-r--r--   3 hadoop hadoop121.7 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-19-2502
-rw-r--r--   3 hadoop hadoop 31.6 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-2-2499
-rw-r--r--   3 hadoop hadoop

Re: 为什么要关闭calcite的隐式转换功能

2020-12-03 Thread stgztsw
我觉得既然社区准备兼容hive,隐式转换和其他hive的语法兼容还是必须的。实际生产环境里运行的hive
sql往往都是很复杂的,目前按flink对于hive的兼容程度,大部分的hivesql基本都无法运行成功。(其他欠缺的还有不支持bangEquel,
create table as 等等,这边就不一一列举了),希望社区能够对hive这块支持的更完善一点。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink TableAPI Issue: cannot assign instance of org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.LRUMap to field

2020-12-03 Thread Zed
我本地运行是没有问题的,上传到服务器就会出现这个问题,本地跟服务器都是1.11.2版本,flink-shaded-jackson是2.9.8-7.0。不太清楚flink-shaded-jackson跟flink的对应版本选择。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: How to parse list values in csv file

2020-12-03 Thread Yun Gao

Hi,

The CSV only supports the types listed in [1] and must use the types in 
this list, thus for other types some kind of workaround is needed, like first 
parsed as string and parsed again later in the program. 

Best,
Yun



[1] 
https://github.com/apache/flink/blob/e10e548feb2bedf54c3863bbd49ed4f9140546cf/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java#L287



 --Original Mail --
Sender:narasimha 
Send Date:Fri Dec 4 00:45:53 2020
Recipients:user 
Subject:How to parse list values in csv file

Hi,

Getting below error when trying to read a csv file, one of the field is list 
tupe 

Can someone help if fixing the issue 

jobmanager_1   | Caused by: java.lang.IllegalArgumentException: The type 
'java.util.List' is not supported for the CSV input format.
jobmanager_1   | at 
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:67)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:83)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:87)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]

-- 
A.Narasimha Swamy


Re: Duplicate operators generated by plan

2020-12-03 Thread Rex Fenley
cc Brad

On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley  wrote:

> Yes, the same exact input operators go into both joins.
>
> The chunk of code for the joins from the specific part of the plan I
> showed is as follows. The orgUsersTable is later filtered into one table
> and aggregated and another table and aggregated. The planner seems to
> duplicate orgUsersTable into 2 operators even though I create only 1 of it.
>
> // in the main function
> val orgUsersTable = splatRoles(
> this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
> OrgUsersRoleSplatPrefix,
> this.tableEnv
> )
>
> // helper function
> def splatRoles(
> table: Table,
> columnPrefix: String,
> tableEnv: TableEnvironment
> ): Table = {
> // Flink does not have a contains function so we have to splat out our
> role array's contents
> // and join it to the originating table.
> val func = new SplatRolesFunc()
> val splatted = table
> .map(func($"roles", $"id"))
> .as(
> "id_splatted",
> s"${columnPrefix}_is_admin",
> s"${columnPrefix}_is_teacher",
> s"${columnPrefix}_is_student",
> s"${columnPrefix}_is_parent"
> )
> // FIRST_VALUE is only available in SQL - so this is SQL.
> // Rationale: We have to group by after a map to preserve the pk
> inference, otherwise flink will
> // toss it out and all future joins will not have a unique key.
> tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
> val grouped = tableEnv.sqlQuery(s"""
> SELECT
> id_splatted,
> FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
> FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
> FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
> FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
> FROM ${columnPrefix}_splatted
> GROUP BY id_splatted
> """)
> return table
> .join(grouped, $"id" === $"id_splatted")
> .dropColumns($"id_splatted")
> .renameColumns($"roles".as(s"${columnPrefix}_roles"))
> }
>
> @FunctionHint(
> output = new DataTypeHint(
> "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student
> BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
> )
> )
> class SplatRolesFunc extends ScalarFunction {
> def eval(roles: Array[String], id: java.lang.Long): Row = {
> val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
> val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
> val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
> val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
> return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
> }
>
> override def getResultType(signature: Array[Class[_]]):
> TypeInformation[_] =
> Types.ROW(
> Types.LONG,
> Types.BOOLEAN,
> Types.BOOLEAN,
> Types.BOOLEAN,
> Types.BOOLEAN
> )
> }
>
>
> On Thu, Dec 3, 2020 at 7:49 PM Yun Gao  wrote:
>
>> Hi Rex,
>>
>> Could  you also attach one example for these sql / table ? And one
>> possible issue to confirm is that does the operators with the same names
>> also have the same inputs ?
>>
>> Best,
>> Yun
>>
>> --Original Mail --
>> *Sender:*Rex Fenley 
>> *Send Date:*Fri Dec 4 02:55:41 2020
>> *Recipients:*user 
>> *Subject:*Duplicate operators generated by plan
>>
>>> Hello,
>>>
>>> I'm running into an issue where my execution plan is creating the same
>>> exact join operator multiple times simply because the subsequent operator
>>> filters on a different boolean value. This is a massive duplication of
>>> storage and work. The filtered operators which follow result in only a
>>> small set of elements filtered out per set too.
>>>
>>> eg. of two separate operators that are equal
>>>
>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent]
>>>
>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent])
>>>
>>> Which are entirely the same datasets being processed.
>>>
>>> The first one points to
>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>>> TMP_0.f0 AS admin_organization_ids])
>>>
>>> The second one points to
>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> 

Re: Duplicate operators generated by plan

2020-12-03 Thread Rex Fenley
Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed
is as follows. The orgUsersTable is later filtered into one table and
aggregated and another table and aggregated. The planner seems to duplicate
orgUsersTable into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
OrgUsersRoleSplatPrefix,
this.tableEnv
)

// helper function
def splatRoles(
table: Table,
columnPrefix: String,
tableEnv: TableEnvironment
): Table = {
// Flink does not have a contains function so we have to splat out our role
array's contents
// and join it to the originating table.
val func = new SplatRolesFunc()
val splatted = table
.map(func($"roles", $"id"))
.as(
"id_splatted",
s"${columnPrefix}_is_admin",
s"${columnPrefix}_is_teacher",
s"${columnPrefix}_is_student",
s"${columnPrefix}_is_parent"
)
// FIRST_VALUE is only available in SQL - so this is SQL.
// Rationale: We have to group by after a map to preserve the pk inference,
otherwise flink will
// toss it out and all future joins will not have a unique key.
tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
val grouped = tableEnv.sqlQuery(s"""
SELECT
id_splatted,
FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
FROM ${columnPrefix}_splatted
GROUP BY id_splatted
""")
return table
.join(grouped, $"id" === $"id_splatted")
.dropColumns($"id_splatted")
.renameColumns($"roles".as(s"${columnPrefix}_roles"))
}

@FunctionHint(
output = new DataTypeHint(
"(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student
BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
)
)
class SplatRolesFunc extends ScalarFunction {
def eval(roles: Array[String], id: java.lang.Long): Row = {
val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
}

override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(
Types.LONG,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN
)
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao  wrote:

> Hi Rex,
>
> Could  you also attach one example for these sql / table ? And one
> possible issue to confirm is that does the operators with the same names
> also have the same inputs ?
>
> Best,
> Yun
>
> --Original Mail --
> *Sender:*Rex Fenley 
> *Send Date:*Fri Dec 4 02:55:41 2020
> *Recipients:*user 
> *Subject:*Duplicate operators generated by plan
>
>> Hello,
>>
>> I'm running into an issue where my execution plan is creating the same
>> exact join operator multiple times simply because the subsequent operator
>> filters on a different boolean value. This is a massive duplication of
>> storage and work. The filtered operators which follow result in only a
>> small set of elements filtered out per set too.
>>
>> eg. of two separate operators that are equal
>>
>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>> leftInputSpec=[JoinKeyContainsUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent]
>>
>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>> leftInputSpec=[JoinKeyContainsUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent])
>>
>> Which are entirely the same datasets being processed.
>>
>> The first one points to
>> GroupAggregate(groupBy=[user_id], select=[user_id,
>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>> TMP_0.f0 AS admin_organization_ids])
>>
>> The second one points to
>> GroupAggregate(groupBy=[user_id], select=[user_id,
>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>> TMP_0.f0 AS teacher_organization_ids])
>>
>> And these are both intersecting sets of data though slightly different. I
>> don't see why that would make the 1 join from before split into 2 though.
>> There's even a case where I'm seeing a join tripled.

Re: Duplicate operators generated by plan

2020-12-03 Thread Yun Gao
Hi Rex,

Could  you also attach one example for these sql / table ? And one possible 
issue to confirm is that does the operators with the same names also have the 
same inputs ?

Best,
Yun

 --Original Mail --
Sender:Rex Fenley 
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user 
Subject:Duplicate operators generated by plan

Hello,

I'm running into an issue where my execution plan is creating the same exact 
join operator multiple times simply because the subsequent operator filters on 
a different boolean value. This is a massive duplication of storage and work. 
The filtered operators which follow result in only a small set of elements 
filtered out per set too.

eg. of two separate operators that are equal

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, 
organization_id, user_id, roles, id_splatted, org_user_is_admin, 
org_user_is_teacher, org_user_is_student, org_user_is_parent], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, 
user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, 
org_user_is_student, org_user_is_parent]

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, 
organization_id, user_id, roles, id_splatted, org_user_is_admin, 
org_user_is_teacher, org_user_is_student, org_user_is_parent], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, 
user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, 
org_user_is_student, org_user_is_parent]) 

Which are entirely the same datasets being processed.

The first one points to 
 GroupAggregate(groupBy=[user_id], select=[user_id, 
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS 
admin_organization_ids]) 

The second one points to
 GroupAggregate(groupBy=[user_id], select=[user_id, 
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS 
teacher_organization_ids]) 

And these are both intersecting sets of data though slightly different. I don't 
see why that would make the 1 join from before split into 2 though. There's 
even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to 
not duplicate operators where it doesn't need to?

Thanks!

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

????hive sql ????flink 11 ????????????????

2020-12-03 Thread ????????
?? hive sql  flink 1.11.2 ??flink 11 
??hive SQL1 ?? ??
2 != 
3 
4 split 
5??hive ??flink??
6??join Calcite bug 
https://issues.apache.org/jira/browse/CALCITE-2152??
7 create table table1 as select * from pokes;  as



flink11 ?? hive SQL 
hive sql  ??flink ??

Re: Re:Re: flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.

2020-12-03 Thread yanzi
1、retry times =1报错日志:
[2020-12-02 22:01:00.800] [ERROR] [jdbc-upsert-output-format-thread-1]
[org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat] >>> JDBC
executeBatch error, retry times = 1
java.sql.SQLException: No operations allowed after statement closed.
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.StatementImpl.checkClosed(StatementImpl.java:426)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at 
com.mysql.jdbc.PreparedStatement.clearBatch(PreparedStatement.java:1051)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at
com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1323)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:71)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:200)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.attemptFlush(JdbcRowDataOutputFormat.java:153)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:120)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_262]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[?:1.8.0_262]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_262]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[?:1.8.0_262]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_262]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_262]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
2、retry times =2:
[2020-12-02 22:01:01.402] [ERROR] [jdbc-upsert-output-format-thread-1]
[org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat] >>> JDBC
executeBatch error, retry times = 2
java.sql.SQLException: No operations allowed after statement closed.
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.StatementImpl.checkClosed(StatementImpl.java:426)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at 
com.mysql.jdbc.PreparedStatement.setString(PreparedStatement.java:3943)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$createExternalConverter$57fde215$8(AbstractJdbcRowConverter.java:219)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableExternalConverter$2bf50691$1(AbstractJdbcRowConverter.java:193)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toExternal(AbstractJdbcRowConverter.java:86)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$createKeyedRowExecutor$3fd497bb$1(JdbcRowDataOutputFormat.java:164)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.executor.KeyedBatchStatementExecutor.executeBatch(KeyedBatchStatementExecutor.java:71)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.attemptFlush(JdbcRowDataOutputFormat.java:154)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at

Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题

2020-12-03 Thread yang xu
Hi 
如果不支持ACID,那如果监听binlog日志的更新和删除操作需要另外写任务来处理么,如何才能做到真的批流统一



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: taskmanager.out配置滚动

2020-12-03 Thread zilong xiao
好的,了解了,感谢您的解答

Yang Wang  于2020年12月4日周五 上午10:33写道:

> 目前是支持不了的,因为STDOUT/STDERR本身并不是通过slf4j来写的
> 如果要支持是需要在Flink代码里面将Stdout重定向之后,再配置log4j才能解决
>
> Best,
> Yang
>
> zilong xiao  于2020年12月3日周四 下午7:50写道:
>
> > 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
> >
>


Re: taskmanager.out配置滚动

2020-12-03 Thread Yang Wang
目前是支持不了的,因为STDOUT/STDERR本身并不是通过slf4j来写的
如果要支持是需要在Flink代码里面将Stdout重定向之后,再配置log4j才能解决

Best,
Yang

zilong xiao  于2020年12月3日周四 下午7:50写道:

> 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
>


Re: Query regarding HA mode and checkpointing

2020-12-03 Thread Yang Wang
Hi Kaushal,

Only the state handle pointer is stored in the ZooKeeper node. Since
ZooKeeper is built for
small data(KB level) storage. The real data will be persisted in the *"*
*high-availability.storageDir"*.
Note that it should be distributed storage(HDFS, S3, etc.).

The ZooKeeper HA service has the following usage, as well as the Kubernetes
HA service,
you could find more information here[1].
* Leader election/retrieval
* Running job registry
* Submitted job graph store
* Checkpoint store

[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-HAcomponents


Best,
Yang


Kaushal Raj  于2020年12月3日周四 下午7:13写道:

> Hello,
>
> I am new to flink. Have few queries regarding the HA mode with zookeeper
> and checkpointing. When flink is configured in HA mode with zookeeper,
> where do the job checkpoints are stored? zookeeper only used for recovering
> the jobmanager or even checkpoints? what is the significance of 
> *"**high-availability.storageDir"
> *here.
>
> Thanks,
> Kaushal
>


Re: flink使用多个keytab

2020-12-03 Thread amen...@163.com
hi,

可以在调用executeSql()或execute()(多SQL)的时候使用ugi认证进行任务提交,以及在run命令中动态传入你所说的两个参数,以-yD的形式。

best,
amenhub



 
发件人: zhengmao776
发送时间: 2020-12-03 17:16
收件人: user-zh
主题: flink使用多个keytab
你好,我在使用flink
run提交任务时,由于集群是kerberos化的Hadoop集群,我想为不同的用户提供不同的keytab进行认证,但是我在flink-conf.yaml中看到了的security.kerberos.login.keytab和security.kerberos.login.principal的相关配置,但是这不能动态配置;我尝试了使用
-yD进行配置,但是并不起作用,我想知道如何处理这样的情况?期待您的回复~~~
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-cdc 无法读出binlog,程序也不报错

2020-12-03 Thread chenjb
谢谢老哥关注,我是int对应成bigint了,原以为bigint范围更大应该可以兼容,没认真看文档,然后mysql有错误日志但程序没报错,注意力就放mysql那边了。这两天试了下flink-cdc-sql发现有些有错误的场景不报错,比如连不上mysql(密码错误)也是程序直接退出exit
0,没任何报错或提示,是还要设置什么吗?我是java小白,这块不是很懂



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: How to tell what mode a Table operator is in

2020-12-03 Thread Danny Chan
If a stateful operator has also a stateful operator in its input
sub-pipeline, then it may receive retract messages. Operator like group
agg, stream-stream join or rank are stateful.

We can not show if the operator are receiving retract messages in the UI.
But your request is reasonable.

Rex Fenley 于2020年12月4日 周五上午4:48写道:

> Our sinks are uniquely keyed as well. A couple of our joins are not until
> an aggregate is performed however.
>
> On Thu, Dec 3, 2020 at 12:46 PM Rex Fenley  wrote:
>
>> Hi,
>>
>> When I'm looking at the Flink plan in the UI and at an operator, is there
>> a way to tell if an operator is in Retract mode vs Upsert mode? Ideally we
>> want as many of our operators in Upsert mode as possible since our data
>> sources are all uniquely keyed.
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: How to tell what mode a Table operator is in

2020-12-03 Thread Rex Fenley
Our sinks are uniquely keyed as well. A couple of our joins are not until
an aggregate is performed however.

On Thu, Dec 3, 2020 at 12:46 PM Rex Fenley  wrote:

> Hi,
>
> When I'm looking at the Flink plan in the UI and at an operator, is there
> a way to tell if an operator is in Retract mode vs Upsert mode? Ideally we
> want as many of our operators in Upsert mode as possible since our data
> sources are all uniquely keyed.
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



How to tell what mode a Table operator is in

2020-12-03 Thread Rex Fenley
Hi,

When I'm looking at the Flink plan in the UI and at an operator, is there a
way to tell if an operator is in Retract mode vs Upsert mode? Ideally we
want as many of our operators in Upsert mode as possible since our data
sources are all uniquely keyed.

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Duplicate operators generated by plan

2020-12-03 Thread Rex Fenley
Hello,

I'm running into an issue where my execution plan is creating the same
exact join operator multiple times simply because the subsequent operator
filters on a different boolean value. This is a massive duplication of
storage and work. The filtered operators which follow result in only a
small set of elements filtered out per set too.

eg. of two separate operators that are equal

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
organization_id, user_id, roles, id_splatted, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent]

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
organization_id, user_id, roles, id_splatted, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent])

Which are entirely the same datasets being processed.

The first one points to
GroupAggregate(groupBy=[user_id], select=[user_id,
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
TMP_0.f0 AS admin_organization_ids])

The second one points to
GroupAggregate(groupBy=[user_id], select=[user_id,
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
TMP_0.f0 AS teacher_organization_ids])

And these are both intersecting sets of data though slightly different. I
don't see why that would make the 1 join from before split into 2 though.
There's even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink
to not duplicate operators where it doesn't need to?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Running Flink job as a rest

2020-12-03 Thread Jaffe, Julian
I can't vouch for it personally, but perhaps the Apache Bahir Netty Source for 
Flink could help you? It sounds like you want to use HTTPS, which this doesn't 
support directly, but the source might be a helpful starting point to adding 
the functionality you need.

On 12/3/20, 1:33 AM, "Chesnay Schepler"  wrote:

What you are asking for is an HTTP(s) source. This currently does not 
exist for Flink, so you would have to implement it yourself.
Additionally you would have to figure out the host on which the source 
runs on yourself.

It may be easier to setup a separate HTTP(s) server that accepts data, 
which you then query from the source.

On 12/2/2020 10:31 PM, dhurandar S wrote:
> Can Flink job be running as Rest Server, Where Apache Flink job is
> listening on a port (443). When a user calls this URL with payload,
> data directly goes to the Apache Flink windowing function.
>
> Right now Flink can ingest data from Kafka or Kinesis, but we have a use
> case where we would like to push data to Flink, where Flink is listening 
on
> a port
>




FlinkKafkaProducer Fails with "Topic not present in metadata"

2020-12-03 Thread Joseph Lorenzini
Hi all,

I have a flink job that uses FlinkKafkaConsumer to consume messages off a kafka 
topic and a FlinkKafkaProducer to produce records on a Kafka topic. The 
consumer works fine. However, the flink job eventually fails with the following 
exception.

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic XXX not 
present in metadata after 6 ms.

I did find this issue but didn't have any details so I am not sure if its 
related or not.

https://issues.apache.org/jira/browse/FLINK-18757

Some details that might be important:

- yes I verified the topic exists__
- The kafka cluster that the flink job is integrating with is the Confluent 
cloud platform at version 5.5.0. This means it should be compatible with apache 
kafka 2.5.X.  See here for details 
https://docs.confluent.io/platform/current/installation/versions-interoperability.html
- ACLs and SASL SSL are turned on
- a springboot app that I wrote (which uses spring kafka) is able to write to 
this same topic using the same credentials as what the flink job is using
- I am on flink 1.11.2 and using the flink-connector-kafka_2.11 at version 
1.11.2.
- I turned on trace logs and verified that metadata requests from the flink job 
occur and metadata responses from the kafka broker are returned.
- I've set producer semantics to none and disabled checkpointing



Privileged/Confidential Information may be contained in this message. If you 
are not the addressee indicated in this message (or responsible for delivery of 
the message to such person), you may not copy or deliver this message to 
anyone. In such case, you should destroy this message and kindly notify the 
sender by reply email. Please advise immediately if you or your employer does 
not consent to Internet email for messages of this kind. Opinions, conclusions 
and other information in this message that do not relate to the official 
business of my firm shall be understood as neither given nor endorsed by it.


How to parse list values in csv file

2020-12-03 Thread narasimha
Hi,

Getting below error when trying to read a csv file, one of the field is
list tupe

Can someone help if fixing the issue

jobmanager_1   | Caused by: java.lang.IllegalArgumentException: The type
'java.util.List' is not supported for the CSV input format.

jobmanager_1   | at
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

jobmanager_1   | at
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:67)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

jobmanager_1   | at
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:83)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

jobmanager_1   | at
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:87)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

-- 
A.Narasimha Swamy


Re: Partitioned tables in SQL client configuration.

2020-12-03 Thread Maciek Próchniak

Hi Jark,

thanks for answer. I'm a bit puzzled, because in my yaml I'm using  
"connector: filesystem" (not connector.type). I don't think I end up using


https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html#file-system-connector 
- this connector as partitioning and orc format are handled correctly.



It's also not clear for me what is "not legacy" connector for reading 
files directly from filesystem (no Hive). I don't see any implementation 
of DynamicTableSourceFactory which would do this.


I assumed that using DDL I wrote below also gives me 
FileSystemTableFactory, am I wrong?



thanks,

maciek



On 03.12.2020 16:26, Jark Wu wrote:
Only legacy connectors (`connector.type=kafka` instead of 
`connector=kafka`) are supported in the YAML at the moment. You can 
use regular DDL instead. There is a similar discussion in 
https://issues.apache.org/jira/browse/FLINK-20260 
 these days.


Best,
Jark

On Thu, 3 Dec 2020 at 00:52, Till Rohrmann > wrote:


Hi Maciek,

I am pulling in Timo who might help you with this problem.

Cheers,
Till

On Tue, Dec 1, 2020 at 6:51 PM Maciek Próchniak mailto:m...@touk.pl>> wrote:

Hello,

I try to configure SQL Client to query partitioned ORC data on
local
filesystem. I have directory structure like that:

/tmp/table1/startdate=2020-11-28

/tmp/table1/startdate=2020-11-27

etc.


If I run SQL Client session and create table by hand:

create table tst (column1 string, startdate string)
partitioned by
(startdate) with ('connector'='filesystem', 'format'='orc',
'path'='/tmp/table1');

everything runs fine:

explain select * from tst where startdate='2020-11-27'

shows that only one partition in 'readPartitions'


However, I struggle to configure table in .yaml config.

I tried like this (after some struggle, as "partition.keys"
setting
doesn't seem to be documented...) :

tables:
   - name: tst2
 type: source-table
 connector: filesystem
 path: "/tmp/table1"
 format: orc
 partition.keys:
   - name: startdate
 schema:
   - name: column1
 data-type: string
   - name: startdate
 data-type: string

and it more or less works - queries are executed properly.
However,
partitions are not pruned:

explain select * from tst2 where startdate='2020-11-27'

show all partitions in 'readPartitions'


Any idea what can be wrong? I'm using Flink 1.11.2


thanks,

maciek




Re: Question: How to avoid local execution being terminated before session window closes

2020-12-03 Thread Klemens Muthmann

Hi,

Thanks for the hint. The infinite loop was the solution and my pipeline 
works now.


Regards

    Klemens

Am 24.11.20 um 16:59 schrieb Timo Walther:
For debugging you can also implement a simple non-parallel source 
using 
`org.apache.flink.streaming.api.functions.source.SourceFunction`. You 
would need to implement the run() method with an endless loop after 
emitting all your records.


Regards,
Timo

On 24.11.20 16:07, Klemens Muthmann wrote:

Hi,

Thanks for your reply. I am using processing time instead of event 
time, since we do get the events in batches and some might arrive 
days later.


But for my current dev setup I just use a CSV dump of finite size as 
input. I will hand over the pipeline to some other guys, who will 
need to integrate it with an Apache Kafka Service. Output is written 
to a Postgres-Database-System.


I'll have a look at your proposal and let you know if it worked, 
after having finished a few prerequisite parts.


Regards

 Klemens

Am 24.11.20 um 12:59 schrieb Timo Walther:

Hi Klemens,

what you are observing are reasons why event-time should be 
preferred over processing-time. Event-time uses the timestamp of 
your data while processing-time is to basic for many use cases. Esp. 
when you want to reprocess historic data, you want to do that at 
full speed instead of waiting 1 hour for 1-hour-windows.


If you want to use processing-time nevertheless, you need to use a 
source that produced unbounded streams instead of bounded streams 
such that the pipeline execution theoretically is infinite. Some 
documentation can be found here [1] where you need to use the 
`FileProcessingMode.PROCESS_CONTINUOUSLY`. But what kind of 
connector are you currently using?


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources 



On 24.11.20 09:59, Klemens Muthmann wrote:

Hi,

I have written an Apache Flink Pipeline containing the following 
piece of code (Java):


stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new 
CustomAggregator()).print();


If I run the pipeline using local execution I see the following 
behavior. The "CustomAggregator" calls the `createAccumulator` and 
`add` methods correctly with the correct data. However it never 
calls `getResult` and my pipeline simply finishes.


So I did a little research and found out that it works if I change 
the code to:


stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new 
CustomAggregator()).print();


Notice the reduced gap time for the processing time session window. 
So it seems that execution only continues if the window has been 
closed and if that takes too long, the execution simply aborts. I 
guess another factor playing a part in the problem is, that my 
initial data is read in much faster than 50 seconds. This results 
in the pipeline being in a state where it only waits for the window 
to be closed and having nothing else to do it decides that there is 
no work left and simply shuts down.


My question now is if it is possible to tell the local execution 
environment to wait for that window to be closed, instead of just 
shutting down.


Thanks and Regards

 Klemens Muthmann






--
Mit freundlichen Grüßen
  Dr.-Ing. Klemens Muthmann

---
Cyface GmbH
Hertha-Lindner-Straße 10
01067 Dresden
web: www.cyface.de
email: klemens.muthm...@cyface.de



Re: Partitioned tables in SQL client configuration.

2020-12-03 Thread Jark Wu
Only legacy connectors (`connector.type=kafka` instead of
`connector=kafka`) are supported in the YAML at the moment. You can use
regular DDL instead. There is a similar discussion in
https://issues.apache.org/jira/browse/FLINK-20260 these days.

Best,
Jark

On Thu, 3 Dec 2020 at 00:52, Till Rohrmann  wrote:

> Hi Maciek,
>
> I am pulling in Timo who might help you with this problem.
>
> Cheers,
> Till
>
> On Tue, Dec 1, 2020 at 6:51 PM Maciek Próchniak  wrote:
>
>> Hello,
>>
>> I try to configure SQL Client to query partitioned ORC data on local
>> filesystem. I have directory structure like that:
>>
>> /tmp/table1/startdate=2020-11-28
>>
>> /tmp/table1/startdate=2020-11-27
>>
>> etc.
>>
>>
>> If I run SQL Client session and create table by hand:
>>
>> create table tst (column1 string, startdate string) partitioned by
>> (startdate) with ('connector'='filesystem', 'format'='orc',
>> 'path'='/tmp/table1');
>>
>> everything runs fine:
>>
>> explain select * from tst where startdate='2020-11-27'
>>
>> shows that only one partition in 'readPartitions'
>>
>>
>> However, I struggle to configure table in .yaml config.
>>
>> I tried like this (after some struggle, as "partition.keys" setting
>> doesn't seem to be documented...) :
>>
>> tables:
>>- name: tst2
>>  type: source-table
>>  connector: filesystem
>>  path: "/tmp/table1"
>>  format: orc
>>  partition.keys:
>>- name: startdate
>>  schema:
>>- name: column1
>>  data-type: string
>>- name: startdate
>>  data-type: string
>>
>> and it more or less works - queries are executed properly. However,
>> partitions are not pruned:
>>
>> explain select * from tst2 where startdate='2020-11-27'
>>
>> show all partitions in 'readPartitions'
>>
>>
>> Any idea what can be wrong? I'm using Flink 1.11.2
>>
>>
>> thanks,
>>
>> maciek
>>
>>
>>


Re: 帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 Thread yinghua...@163.com
好的,感谢推荐!

> 在 2020年12月3日,21:57,Jark Wu  写道:
> 
> 可以尝试下 Zeppelin, 与 flink sql 的集成做的挺好的。
> 
> Best,
> Jark
> 
>> On Thu, 3 Dec 2020 at 21:55, yinghua...@163.com  wrote:
>> 
>> 这个我没说清楚,就是flink sql客户端,我们想弄个客户端给公司其他部门使用,那些部门同事只会一些sql,对于代码编程欠缺
>> 
 在 2020年12月3日,21:52,Shawn Huang  写道:
>>> 
>>> 你说的客户端是指什么?Flink 默认在 8081 端口提供了 Web UI,可以提交和取消任务,查看日志和一些基础指标。
>>> 
>>> Best,
>>> Shawn Huang
>>> 
>>> 
>>> yinghua...@163.com  于2020年12月3日周四 下午8:46写道:
>>> 
 
>> 


Re: flink sql实时计算分位数如何实现

2020-12-03 Thread Jark Wu
可以看下UDAF的文档:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregate-functions




On Thu, 3 Dec 2020 at 12:06, 爱成绕指柔 <1194803...@qq.com> wrote:

> 你好:
>   目前flink sql实时计算中没有percentile函数吗?如果没有,如何实现这一功能。
>   期待你的答复,谢谢!


Re: 帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 Thread Jark Wu
可以尝试下 Zeppelin, 与 flink sql 的集成做的挺好的。

Best,
Jark

On Thu, 3 Dec 2020 at 21:55, yinghua...@163.com  wrote:

> 这个我没说清楚,就是flink sql客户端,我们想弄个客户端给公司其他部门使用,那些部门同事只会一些sql,对于代码编程欠缺
>
> > 在 2020年12月3日,21:52,Shawn Huang  写道:
> >
> > 你说的客户端是指什么?Flink 默认在 8081 端口提供了 Web UI,可以提交和取消任务,查看日志和一些基础指标。
> >
> > Best,
> > Shawn Huang
> >
> >
> > yinghua...@163.com  于2020年12月3日周四 下午8:46写道:
> >
> >>
>


Re: 帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 Thread yinghua...@163.com
这个我没说清楚,就是flink sql客户端,我们想弄个客户端给公司其他部门使用,那些部门同事只会一些sql,对于代码编程欠缺

> 在 2020年12月3日,21:52,Shawn Huang  写道:
> 
> 你说的客户端是指什么?Flink 默认在 8081 端口提供了 Web UI,可以提交和取消任务,查看日志和一些基础指标。
> 
> Best,
> Shawn Huang
> 
> 
> yinghua...@163.com  于2020年12月3日周四 下午8:46写道:
> 
>> 


Re: 帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 Thread Shawn Huang
你说的客户端是指什么?Flink 默认在 8081 端口提供了 Web UI,可以提交和取消任务,查看日志和一些基础指标。

Best,
Shawn Huang


yinghua...@163.com  于2020年12月3日周四 下午8:46写道:

>


Re: flink-cdc 无法读出binlog,程序也不报错

2020-12-03 Thread Jark Wu
是不是 unsigned int 惹的祸...

On Thu, 3 Dec 2020 at 15:15, chenjb  wrote:

> 破案了,字段类型没按官网的要求对应起来,对应起来后正常了
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL共享source 问题

2020-12-03 Thread Jark Wu
1. 是不是共享了 source,看下 web ui 中的拓扑图就知道了
2. 追数据的时候,或者下游消费速度不一的时候,分区之间消费不均衡是很正常的。
3. 你可以调大 sink 的并发,以及增加 buffer size 来缓解这个问题。

Best,
Jark

On Wed, 2 Dec 2020 at 19:22, zz  wrote:

> hi各位:
> 目前我有一个任务,source table是读取一个topic生成的,但是有6个sink,使用了多条insert
> 语句输出到同一张mysql表中,按照我的理解,这些insert语句
> 应该都是共享这个source table的,读取kafka只需要读取一次,但是在运行过程中发现kafka
> topic有的分区消费的很快有的分区很慢,请问一下可能是什么原因呢?
> topic一共是18个分区,任务是18个并行度


Re: I defined a Kafka dynamic table in SQL-Client, but the kafka theme had some elements in the wrong format, so an exception was thrown in SQL-Client. Can we define the Kafka dynamic table with some

2020-12-03 Thread Jark Wu
我觉得这应该是个 bug,已创建 issue: https://issues.apache.org/jira/browse/FLINK-20470

On Wed, 2 Dec 2020 at 18:02, mr.meng...@ouglook.com 
wrote:

> <
> http://apache-flink.147419.n8.nabble.com/file/t1146/QQ%E6%88%AA%E5%9B%BE111.jpg>
>
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t1146/%E6%97%A0%E6%A0%87%E9%A2%981211.png
> >
> Caused by: java.io.IOException: Failed to deserialize JSON ''.
> at
>
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
> ~[flink-json-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
> ~[flink-json-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> Caused by: java.lang.ClassCastException:
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode
> cannot be cast to
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题

2020-12-03 Thread Rui Li
Hi,

我会找个hive
2.1.1的环境来复现一下这个问题。不过首先要说明的是,目前flink不支持hive的ACID表,即使你这个例子的数据写成功了也不满足ACID的语义,在hive那边可能也读不了。

On Thu, Dec 3, 2020 at 5:23 PM yang xu <316481...@qq.com> wrote:

> Hi Rui Li
> lib 下包如下:
>  flink-csv-1.11.2.jar
>  flink-dist_2.11-1.11.2.jar
>  flink-json-1.11.2.jar
>  flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar
>  flink-shaded-zookeeper-3.4.14.jar
>  flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar
>  flink-table_2.11-1.11.2.jar
>  flink-table-api-java-bridge_2.11-1.11.2.jar
>  flink-table-blink_2.11-1.11.2.jar
>  flink-table-planner-blink_2.11-1.11.2.jar
>  log4j-1.2-api-2.12.1.jar
>  log4j-api-2.12.1.jar
>  log4j-core-2.12.1.jar
>  log4j-slf4j-impl-2.12.1.jar
>
> 写hive的语句就是简单的insert:
> insert into hive_t1  SELECT  address  FROM  users
>
> 另外建表语句如下:
> create table hive_t1(address string)
> clustered by (address) into 8 buckets
> stored as orc TBLPROPERTIES ('transactional'='true','orc.compress' =
> 'SNAPPY');
>
> 非常感谢你的解答!
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


Re: 为什么要关闭calcite的隐式转换功能

2020-12-03 Thread Jark Wu
隐式转换功能,是一个非常重要的 public API ,需要经过社区仔细地讨论,例如哪些类型之间可以类型转换。
目前社区还没有规划这个功能,如果需要的话,可以在社区中开个 issue。

Best,
Jark

On Wed, 2 Dec 2020 at 18:33, stgztsw  wrote:

> 目前flink sql,flink hive
> sql都不支持隐式转换功能。我们在调试的时候发现其实calcite本身是支持的。但是flink这边强制关闭了。而hive本身是支持隐式转换的。这导致我们的hive任务无法迁移到flink上执行。请问关闭的原因是什么?如果我们这边开启会带来什么问题吗?


Re: flink sql 1.11.1 貌似出现bug

2020-12-03 Thread Jark Wu
看样子是提交作业超时失败了,请确认
1. flink cluster 已经起来了
2. sql client 的环境与 flink cluster 环境连通
3. sql-client-defaults.yaml 中配置了正确的 gateway-address 地址 (如果是本地 cluster,则不用配置)

Best,
Jark

On Wed, 2 Dec 2020 at 14:12, zzy  wrote:

> 遇到的问题如下, flink版本1.11.1,sql client 中使用flink sql
>
>
> sql语句如下:
> CREATE TABLE sls_log_sz_itsp (
>   request STRING,
>   http_bundleId STRING,
>   upstream_addr STRING,
>   http_appid STRING,
>   bodyUserId STRING,
>   http_sequence STRING,
>   http_version STRING,
>   response_body STRING,
>   uri STRING,
>   bytes_sent STRING,
>   http_userId STRING,
>   http_cityId STRING,
>   http_user_agent STRING,
>   http_deviceType STRING,
>   record_time STRING,
>   rt AS TO_TIMESTAMP(DATE_FORMAT(record_time,'-MM-dd HH:mm:ss')),
>   WATERMARK FOR rt AS rt - INTERVAL '5' SECOND,
>   request_time STRING,
>   request_body STRING,
>   request_length STRING,
>   nginx_id STRING,
>   proxy_add_x_forwarded_for STRING,
>   http_deviceId STRING,
>   host STRING,
>   upstream_response_time STRING,
>   status STRING
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = '0.11',
>  'connector.topic' = 'sls',
>  'connector.properties.zookeeper.connect' =
> 'hadoop85:2181,hadoop86:2181,hadoop87:2181',
>  'connector.properties.bootstrap.servers' =
> 'hadoop85:9092,hadoop86:9092,hadoop87:9092',
>  'connector.properties.group.id' = 'log-sz-itsp',
>  'connector.startup-mode' = 'latest-offset',
>  'format.type' = 'json'
> );
>
>
>
>  CREATE TABLE sz_itsp_test(
> request STRING,
> request_count BIGINT NOT NULL,
> window_end TIMESTAMP(3)
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' =
> 'jdbc:mysql://hadoop85:3306/test?useSSL=false=true',
> 'connector.table' = 'sz_itsp_test',
> 'connector.driver' = 'com.mysql.jdbc.Driver',
> 'connector.username' = 'root',
> 'connector.password' = '00',
> 'connector.write.flush.max-rows' = '1',
> 'connector.write.flush.interval' = '2s',
> 'connector.write.max-retries' = '3'
> );
>
>
> INSERT INTO sz_itsp_test
> SELECT
>request,
>count(request) request_count,
>TUMBLE_END(rt, INTERVAL '5' MINUTE) AS window_end
>  FROM sls_log_sz_itsp
>  WHERE nginx_id = 'sz-itsp' AND nginx_id IS NOT NULL
>  GROUP BY TUMBLE(rt, INTERVAL '5' MINUTE), request
>  ;
>
>
> sql client使用中出现如下报错:
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
> Caused by: java.lang.RuntimeException: Error running SQL job.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:608)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:529)
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:537)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299)
> at java.util.Optional.ifPresent(Optional.java:159)
> at
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
> at
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:605)
> ... 8 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> 

Re: Routing events to different kafka topics dynamically

2020-12-03 Thread Till Rohrmann
Great to hear :-)

Cheers,
Till

On Thu, Dec 3, 2020 at 10:15 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Thanks Till,
>
> Able to deduce topics by extending the KafkaSerializarion Schema class.
>
> Prasanna.
>
> On Wed, Dec 2, 2020 at 11:18 PM Till Rohrmann 
> wrote:
>
>> Hi Prasanna,
>>
>> I believe that what Aljoscha suggestd in the linked discussion is still
>> the best way to go forward. Given your description of the problem this
>> should actually be pretty straightforward as you can deduce the topic from
>> the message. Hence, you just need to create the ProducerRecord with the
>> right target topic you extracted from the record/message.
>>
>> Cheers,
>> Till
>>
>> On Wed, Dec 2, 2020 at 5:28 PM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Events need to be routed to different kafka topics dynamically based
>>> upon some info in the message.
>>>
>>> We have implemented using KeyedSerializationSchema similar to
>>> https://stackoverflow.com/questions/49508508/apache-flink-how-to-sink-events-to-different-kafka-topics-depending-on-the-even.
>>> But its deprecated and we cannot use it for production.
>>>
>>> I looked at the alternative KafkaSerializationSchema but there i do not
>>> see an option there.
>>>
>>> Then i stumbled upon this
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Usage-of-KafkaDeserializationSchema-and-KafkaSerializationSchema-td32347.html.
>>> which asks us to use KafkaContextAware.
>>>
>>> Is there a more intuitive/easier way to do the same ?
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>>
>>>


Re: Flink TableAPI Issue: cannot assign instance of org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.LRUMap to field

2020-12-03 Thread Jark Wu
检查下提交作业的 flink 版本,和 yarn 集群上部署的 flink 版本是否一致。
或者可能是你集群中有两个不同版本的 flink-shaded-jackson 包。

On Wed, 2 Dec 2020 at 11:55, Zed  wrote:

> When I submitted a flink-table-sql job to yarn, the following exception
> came
> out. Wondering how to solve it. Anyone can help me with that? Appreciate
> it
>
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
>
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.LRUMap
> to field
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers
> of type java.util.concurrent.ConcurrentHashMap in instance of
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache
> at
>
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
> at
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
>
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> at
>
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)
> 

Flink1.9设置TTL不生效

2020-12-03 Thread Yang Peng
Hi,咨询一个问题 我们生产环境使用flink1.9版本,使用的statebackend为RocksDB,具体代码设置如下:

private static final String EV_STATE_FLAG = "EV_EID_FLAG";

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(60))
.updateTtlOnCreateAndWrite()
.neverReturnExpired()
.cleanupInRocksdbCompactFilter(1000)
.build();
MapStateDescriptor eidMapStateDesc = new
MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO);
eidMapStateDesc.enableTimeToLive(ttlConfig);
eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);

设置TTL过期时间为60mins
但是目前已经运行了一天了,通过rocksdb监控我们查看EV_STATE_FLAG这个名称的SST文件一直在增加没有降低的趋势,我们从TM日志发现如下信息:
WARN org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
compaction filter for state < EV_EID_FLAG >: feature is disabled for the
state backend.
但是我们在添加完  state.backend.rocksdb.ttl.compaction.filter.enabled:
true这个参数重启任务之后上述warn
信息就会消失,但是任务运行一段时间后就会执行cp失败,我们查看jstack发现执行cp失败是卡在了获取state数据的代码位置,去掉这个参数之后任务就会恢复,但是TTL
配置不生效这个warn就会复现,大家有遇到过这种问题吗?


Re: 关于flink cdc sql转出Stream流问题

2020-12-03 Thread Jark Wu
row 里面的数据就是你 schema 中定义的字段和顺序,可以按下标取值。

On Tue, 1 Dec 2020 at 13:59, jsqf  wrote:

> 可以使用这种方式:
> DataStream dstream = tableEnv.toAppendStream(sourceTable,
> RowData.class);
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink sql client 报错java.net.NoRouteToHostException: 没有到主机的路由

2020-12-03 Thread Jark Wu
你本地 ping 一下 localhost 看看能不能 ping 通。
另外看看本地有没有开网络代理,有的话关掉试试。

Best,
Jark

On Tue, 1 Dec 2020 at 09:38, 奚焘 <759928...@qq.com> wrote:

> 本人刚学习flink ,下载解压了flink,启动./sql-client.sh embedded ,输入SELECT 'Hello
> World';报错
> Flink SQL> SELECT 'Hello World';
> [ERROR] Could not execute SQL statement. Reason:
> java.net.NoRouteToHostException: 没有到主机的路由
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 Thread yinghua...@163.com


Re: 摄像头视频流采集

2020-12-03 Thread Jark Wu
适合啊。

On Tue, 1 Dec 2020 at 09:37, Xia(Nate) Qu  wrote:

> 请教各位:
>
>
> 我们想做多个监控摄像头的视频流采集平台,摄像头的数量大概有1000-5000个,摄像头的流数据直接发到采集平台,之后平台可以将数据写到Hadoop或者用于机器学习消费,不知道flink是不是适合这样的场景呢?谢谢
>
>
> 屈夏
>


Re: flink cdc 如何保证group agg结果正确性

2020-12-03 Thread Jark Wu
你的数据源中是不是没有历史全量数据,所以发现结果对不上?

一般建议同步全量+增量数据到 kafka 中,然后flink 从头消费这个 topic。
另外 mysql-cdc connector [1] 也提供了全量+增量读取的能力。

Best,
Jark

[1]:
https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector


On Mon, 30 Nov 2020 at 22:54, kandy.wang  wrote:

> insert into kudu.default_database.index_agg
> SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as
> leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss')
> FROM  XX.XX.XX
> group by v_spu_id;
>
>
> XX.XX.XX 是通过自定义cdc
> format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。
> 怎么做初始化,这个初始化,是把数据初始化到state里么? 现在通过对数发现,数据量对不上。


Flink1.9设置TTL不生效

2020-12-03 Thread Yang Peng
Hi,咨询一个问题 我们生产环境使用flink1.9版本,使用的statebackend为RocksDB,具体代码设置如下:

private static final String EV_STATE_FLAG = "EV_EID_FLAG";

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(60))
.updateTtlOnCreateAndWrite()
.neverReturnExpired()
.cleanupInRocksdbCompactFilter(1000)
.build();
MapStateDescriptor eidMapStateDesc = new
MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO);
eidMapStateDesc.enableTimeToLive(ttlConfig);
eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);

设置TTL过期时间为60mins
但是目前已经运行了一天了,通过rocksdb监控我们查看EV_STATE_FLAG这个名称的SST文件一直在增加没有降低的趋势,我们从TM日志发现如下信息:
WARN org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
compaction filter for state < EV_EID_FLAG >: feature is disabled for the
state backend.
但是我们在添加完  state.backend.rocksdb.ttl.compaction.filter.enabled:
true这个参数重启任务之后上述warn
信息就会消失,但是任务运行一段时间后就会执行cp失败,我们查看jstack发现执行cp失败是卡在了获取state数据的代码位置,去掉这个参数之后任务就会恢复,但是TTL
配置不生效这个warn就会复现,大家有遇到过这种问题吗?


Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 Thread Wei Zhong
这个错误信息显示问题在后续版本已经修复,新版本发布后升级版本就能直接从错误信息中看到是哪些TableFactory冲突了:

https://issues.apache.org/jira/browse/FLINK-20186 




> 在 2020年12月3日,20:08,Wei Zhong  写道:
> 
> Hi,
> 
> 现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:
> 
> List result = new LinkedList<>();
> ServiceLoader
>.load(Factory.class, Thread.currentThread().getContextClassLoader())
>.iterator()
>.forEachRemaining(result::add);
> List jdbcResult = result.stream().filter(f ->
>DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
>f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
> System.out.println(jdbcResult);
> 
> 
>> 在 2020年12月3日,19:50,hailongwang <18868816...@163.com 
>> > 写道:
>> 
>> Hi,
>> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 
>> 的 Connector?
>> 
>> 
>> Best,
>> Hailong
>> 在 2020-12-03 14:44:18,"xuzh" mailto:huazhe...@qq.com>> 写道:
>>> 错误:
>>> 
>>> 
>>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>>> factories for identifier 'jdbc' that implement 
>>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>>> classpath
>>> 
>>> 
>>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>>> 
>>> 
>>> 代码如下:
>>> package org.apache.flink.examples;
>>> 
>>> 
>>> import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>>> 
>>> 
>>> public class CDC2ss2 {
>>>   public static void main(String[] args) throws Exception {
>>> 
>>> 
>>> // set up execution environment
>>> StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> StreamTableEnvironment tEnv;
>>> 
>>> 
>>> EnvironmentSettings settings = 
>>> EnvironmentSettings.newInstance()
>>> .useBlinkPlanner()
>>> .inStreamingMode()
>>> .build();
>>> tEnv = StreamTableEnvironment.create(env, 
>>> settings);
>>> String src_sql = "CREATE TABLE userss (\n" +
>>> "  
>>> user_id INT,\n" +
>>> "  
>>> user_nm STRING\n" +
>>> ") WITH (\n" +
>>> "  
>>>  'connector' = 'mysql-cdc',\n" +
>>> "  
>>>  'hostname' = '10.12.5.37',\n" +
>>> "  
>>>  'port' = '3306',\n" +
>>> "  
>>>  'username' = 'dps',\n" +
>>> "  
>>>  'password' = 'dps1234',\n" +
>>> "  
>>>  'database-name' = 'rpt',\n" +
>>> "  
>>>  'table-name' = 'users'\n" +
>>> "  
>>>  )";
>>> 
>>> 
>>> tEnv.executeSql(src_sql); // 创建表
>>> 
>>> 
>>> String sink="CREATE TABLE sink (\n" +
>>> "  
>>> user_id INT,\n" +
>>> "  
>>> user_nm STRING,\n" +
>>> "  
>>> primary key(user_id) NOT ENFORCED \n" +
>>> ") WITH (\n" +
>>> "  
>>>  'connector' = 'jdbc',\n" +
>>> "  
>>>  'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n 
>>> " +
>>> "  
>>>  'username' = 'dps',\n" +
>>> "  
>>>  'password' = 'dps1234',\n" +
>>> "  
>>>  'table-name' = 'sink'\n" +
>>> "  
>>>  )";
>>> String to_print_sql="insert into sink select 
>>> user_id ,user_nm from userss";
>>> tEnv.executeSql(sink);
>>> tEnv.executeSql(to_print_sql);
>>> env.execute();
>>>   }
>>> 
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 详细错误:
>>> 
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>> Unable to create a sink for writing table 
>>> 'default_catalog.default_database.sink'.
>>> 
>>> 
>>> Table options are:
>>> 
>>> 
>>> 'connector'='jdbc'
>>> 'password'='dps1234'
>>> 'table-name'='sink'
>>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false' 
>>> 
>>> 'username'='dps'
>>> at 
>>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at 

Re:Re: flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.

2020-12-03 Thread hailongwang
这个应该只是个 error 的日志,方便也发下retry times = 1 和 retry times = 2 的日志看下吗





在 2020-12-03 16:17:27,"yanzi"  写道:
>hi Leonard:
>
>报错信息如下:
>[2020-12-02 22:01:03.403] [ERROR] [jdbc-upsert-output-format-thread-1]
>[org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat] >>> JDBC
>executeBatch error, retry times = 3
>java.sql.SQLException: No operations allowed after statement closed.
>   at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at com.mysql.jdbc.StatementImpl.checkClosed(StatementImpl.java:426)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at 
> com.mysql.jdbc.PreparedStatement.setString(PreparedStatement.java:3943)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at
>org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$createExternalConverter$57fde215$8(AbstractJdbcRowConverter.java:219)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableExternalConverter$2bf50691$1(AbstractJdbcRowConverter.java:193)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toExternal(AbstractJdbcRowConverter.java:86)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$createKeyedRowExecutor$3fd497bb$1(JdbcRowDataOutputFormat.java:164)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.internal.executor.KeyedBatchStatementExecutor.executeBatch(KeyedBatchStatementExecutor.java:71)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.attemptFlush(JdbcRowDataOutputFormat.java:154)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:120)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>[?:1.8.0_262]
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>[?:1.8.0_262]
>   at
>java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>[?:1.8.0_262]
>   at
>java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>[?:1.8.0_262]
>   at
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>[?:1.8.0_262]
>   at
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>[?:1.8.0_262]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink CEP 动态加载 pattern

2020-12-03 Thread huang botao
好的,谢谢你的回答。现在在看alex的这两篇文档(
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html ;
https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html
),感觉有点儿启发。

On Wed, Dec 2, 2020 at 8:36 PM Wei Zhong  wrote:

> Hi 你好,
>
> 现在Flink CEP还不支持动态加载规则。社区现在有一个JIRA来跟踪这个需求:
>
> https://issues.apache.org/jira/browse/FLINK-7129 <
> https://issues.apache.org/jira/browse/FLINK-7129>
>
> 您可以关注这个JIRA来获取最新进展。
>
> > 在 2020年12月2日,17:48,huang botao  写道:
> >
> > Hi,在项目中常遇到规则变更的情况,我们一般怎么动态加载这些规则?Flink CEP有原生支持动态加载规则的API吗?
>
>


Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 Thread Wei Zhong
Hi,

现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:

List result = new LinkedList<>();
ServiceLoader
   .load(Factory.class, Thread.currentThread().getContextClassLoader())
   .iterator()
   .forEachRemaining(result::add);
List jdbcResult = result.stream().filter(f ->
   DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
   f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
System.out.println(jdbcResult);


> 在 2020年12月3日,19:50,hailongwang <18868816...@163.com> 写道:
> 
> Hi,
> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 
> 的 Connector?
> 
> 
> Best,
> Hailong
> 在 2020-12-03 14:44:18,"xuzh"  写道:
>> 错误:
>> 
>> 
>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>> factories for identifier 'jdbc' that implement 
>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>> classpath
>> 
>> 
>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>> 
>> 
>> 代码如下:
>> package org.apache.flink.examples;
>> 
>> 
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>> 
>> 
>> public class CDC2ss2 {
>>   public static void main(String[] args) throws Exception {
>> 
>> 
>> // set up execution environment
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tEnv;
>> 
>> 
>> EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build();
>> tEnv = StreamTableEnvironment.create(env, 
>> settings);
>> String src_sql = "CREATE TABLE userss (\n" +
>> "  
>> user_id INT,\n" +
>> "  
>> user_nm STRING\n" +
>> ") WITH (\n" +
>> "  
>>  'connector' = 'mysql-cdc',\n" +
>> "  
>>  'hostname' = '10.12.5.37',\n" +
>> "  
>>  'port' = '3306',\n" +
>> "  
>>  'username' = 'dps',\n" +
>> "  
>>  'password' = 'dps1234',\n" +
>> "  
>>  'database-name' = 'rpt',\n" +
>> "  
>>  'table-name' = 'users'\n" +
>> "  
>>  )";
>> 
>> 
>> tEnv.executeSql(src_sql); // 创建表
>> 
>> 
>> String sink="CREATE TABLE sink (\n" +
>> "  
>> user_id INT,\n" +
>> "  
>> user_nm STRING,\n" +
>> "  
>> primary key(user_id) NOT ENFORCED \n" +
>> ") WITH (\n" +
>> "  
>>  'connector' = 'jdbc',\n" +
>> "  
>>  'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>> "  
>>  'username' = 'dps',\n" +
>> "  
>>  'password' = 'dps1234',\n" +
>> "  
>>  'table-name' = 'sink'\n" +
>> "  
>>  )";
>> String to_print_sql="insert into sink select 
>> user_id ,user_nm from userss";
>> tEnv.executeSql(sink);
>> tEnv.executeSql(to_print_sql);
>> env.execute();
>>   }
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> 详细错误:
>> 
>> 
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> Unable to create a sink for writing table 
>> 'default_catalog.default_database.sink'.
>> 
>> 
>> Table options are:
>> 
>> 
>> 'connector'='jdbc'
>> 'password'='dps1234'
>> 'table-name'='sink'
>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>> 'username'='dps'
>>  at 
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>>  at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>>  at 
>> 

Re: Application Mode support on VVP v2.3

2020-12-03 Thread Fabian Paul
Hi Narasimha,

Nothing comes to my mind immediately why it should not work. We are using the 
StandaloneApplicationClusterEntryPoint to start the cluster. Can you provide 
some more information about which Flink image on vvp are you trying to use and 
maybe show the error message?

Best,
Fabian

Re:用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 Thread hailongwang
Hi,
 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 的 
Connector?


Best,
Hailong
在 2020-12-03 14:44:18,"xuzh"  写道:
>错误:
>
>
>Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
>for identifier 'jdbc' that implement 
>'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>classpath
>
>
>看意思是找到了两个一样的类:DynamicTableSinkFactory
>
>
>代码如下:
>package org.apache.flink.examples;
>
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.factories.DynamicTableSinkFactory;
>
>
>public class CDC2ss2 {
>  public static void main(String[] args) throws Exception {
>
>
>// set up execution environment
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tEnv;
>
>
>EnvironmentSettings settings = 
>EnvironmentSettings.newInstance()
>.useBlinkPlanner()
>.inStreamingMode()
>.build();
>tEnv = StreamTableEnvironment.create(env, 
>settings);
>String src_sql = "CREATE TABLE userss (\n" +
>"  
>user_id INT,\n" +
>"  
>user_nm STRING\n" +
>") WITH (\n" +
>"   
>'connector' = 'mysql-cdc',\n" +
>"   
>'hostname' = '10.12.5.37',\n" +
>"   
>'port' = '3306',\n" +
>"   
>'username' = 'dps',\n" +
>"   
>'password' = 'dps1234',\n" +
>"   
>'database-name' = 'rpt',\n" +
>"   
>'table-name' = 'users'\n" +
>"   
>)";
>
>
>tEnv.executeSql(src_sql); // 创建表
>
>
>String sink="CREATE TABLE sink (\n" +
>"  
>user_id INT,\n" +
>"  
>user_nm STRING,\n" +
>"  
>primary key(user_id) NOT ENFORCED \n" +
>") WITH (\n" +
>"   
>'connector' = 'jdbc',\n" +
>"   
>'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>"   
>'username' = 'dps',\n" +
>"   
>'password' = 'dps1234',\n" +
>"   
>'table-name' = 'sink'\n" +
>"   
>)";
>String to_print_sql="insert into sink select 
>user_id ,user_nm from userss";
>tEnv.executeSql(sink);
>tEnv.executeSql(to_print_sql);
>env.execute();
>  }
>
>
>}
>
>
>
>
>
>详细错误:
>
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Unable to create a sink for writing table 
>'default_catalog.default_database.sink'.
>
>
>Table options are:
>
>
>'connector'='jdbc'
>'password'='dps1234'
>'table-name'='sink'
>'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>'username'='dps'
>   at 
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>   at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
>Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
>connector using option ''connector'='jdbc''.
>   at 
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
>   at 
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>   ... 18 more
>Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
>for identifier 'jdbc' that implement 

taskmanager.out配置滚动

2020-12-03 Thread zilong xiao
想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?


Query regarding HA mode and checkpointing

2020-12-03 Thread Kaushal Raj
Hello,

I am new to flink. Have few queries regarding the HA mode with zookeeper
and checkpointing. When flink is configured in HA mode with zookeeper,
where do the job checkpoints are stored? zookeeper only used for recovering
the jobmanager or even checkpoints? what is the significance of
*"**high-availability.storageDir"
*here.

Thanks,
Kaushal


Re: Running Flink job as a rest

2020-12-03 Thread Chesnay Schepler
What you are asking for is an HTTP(s) source. This currently does not 
exist for Flink, so you would have to implement it yourself.
Additionally you would have to figure out the host on which the source 
runs on yourself.


It may be easier to setup a separate HTTP(s) server that accepts data, 
which you then query from the source.


On 12/2/2020 10:31 PM, dhurandar S wrote:

Can Flink job be running as Rest Server, Where Apache Flink job is
listening on a port (443). When a user calls this URL with payload,
data directly goes to the Apache Flink windowing function.

Right now Flink can ingest data from Kafka or Kinesis, but we have a use
case where we would like to push data to Flink, where Flink is listening on
a port





Re: zookeeper更换leader对flink的影响

2020-12-03 Thread Yang Wang
我查了一下,社区目前已经有相关的ticket和PR了,你可以关注一下

https://issues.apache.org/jira/browse/FLINK-10052
https://github.com/apache/flink/pull/11338

Best,
Yang

Yang Wang  于2020年12月3日周四 下午4:52写道:

> 我理解Curator和ZooKeeper的各个节点(包括Leader, Followers)之间都是长连接
> 如果你重启了ZK节点的其中一个,应该会导致和这个节点连着的Curator Client都Suspend,
> 进而导致相应的JobManager丢掉leader ship,所以会cancel掉当前任务然后重新运行
>
> 你可以验证一下是不是重启一个ZK节点只是特定的连到这台上面的Flink任务Failover,而不是全部的
>
> 最后,这个问题目前应该是没有办法通过Flink配置直接解决,除非是Curator#LeaderLatch对Suspend的状态处理可以进行改进
> 同时Flink里也需要在LeaderRetrieval中对Suspend状态的处理进行改进,不是直接notify一个empty leader
>
>
> Best,
> Yang
>
> 赵一旦  于2020年12月3日周四 上午10:28写道:
>
>>
>> 那Curator的state为什么会变成suspended或lost呢?我重启zk一般都是一台一台重启,而且我最近才刚刚又试过一次,我是先重启了follower
>> zk节点,结果刚刚kill一瞬间flink任务全部出问题了。
>>
>> Yang Wang  于2020年12月1日周二 下午8:18写道:
>>
>> > Flink是利用Curator Framework来进行Leader Election和Retrieval,当时Curator的State
>> > 变成Suspended或者Lost的时候都会触发leader的revoke,进而导致需要Cancel掉之前的job
>> > 等待新的leader出现再重新调度
>> >
>> > 你可以提供一下JobManager的log或者自己观察一下JobManager的log是不是有Curator Connection
>> State的变化
>> > 进而导致了Failover
>> >
>> >
>> > Best,
>> > Yang
>> >
>> > 赵一旦  于2020年12月1日周二 下午7:13写道:
>> >
>> > > 又石沉大海了,有没有懂的人出来解释下。
>> > >
>> > > RS  于2020年11月17日周二 上午9:35写道:
>> > >
>> > > > 哈哈, 我的也是, flink和ZK断开连接的话, 任务会全部重启, 这边测试了各种场景, 比如部署HA方案,
>> > > > 部署多个jobmanager都测试过, 任务都是会重启的, 同样不知道如何解决.
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > 在 2020-11-16 18:39:29,"赵一旦"  写道:
>> > > >
>> > > >
>> > >
>> >
>> >按照我在工作中经验,有过几次需要重启zk集群,我是单个zk节点逐个重启。结论是导致了flink集群中任务的全部自动重启(基于最近一次的ckpt)。这对任务还是有一定影响的,因为ckpt是10分钟一次,会导致瞬间压力变高。
>> > > > >
>> > > > >问下这个合理嘛,还是我配置的有问题or操作有问题。
>> > > >
>> > >
>> >
>>
>


Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题

2020-12-03 Thread yang xu
Hi Rui Li
lib 下包如下:
 flink-csv-1.11.2.jar
 flink-dist_2.11-1.11.2.jar
 flink-json-1.11.2.jar
 flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar
 flink-shaded-zookeeper-3.4.14.jar
 flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar
 flink-table_2.11-1.11.2.jar
 flink-table-api-java-bridge_2.11-1.11.2.jar
 flink-table-blink_2.11-1.11.2.jar
 flink-table-planner-blink_2.11-1.11.2.jar
 log4j-1.2-api-2.12.1.jar
 log4j-api-2.12.1.jar
 log4j-core-2.12.1.jar
 log4j-slf4j-impl-2.12.1.jar

写hive的语句就是简单的insert:
insert into hive_t1  SELECT  address  FROM  users

另外建表语句如下:
create table hive_t1(address string) 
clustered by (address) into 8 buckets 
stored as orc TBLPROPERTIES ('transactional'='true','orc.compress' =
'SNAPPY');

非常感谢你的解答!




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.

2020-12-03 Thread yanzi
hi Leonard:

报错信息如下:
[2020-12-02 22:01:03.403] [ERROR] [jdbc-upsert-output-format-thread-1]
[org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat] >>> JDBC
executeBatch error, retry times = 3
java.sql.SQLException: No operations allowed after statement closed.
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.StatementImpl.checkClosed(StatementImpl.java:426)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at 
com.mysql.jdbc.PreparedStatement.setString(PreparedStatement.java:3943)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$createExternalConverter$57fde215$8(AbstractJdbcRowConverter.java:219)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableExternalConverter$2bf50691$1(AbstractJdbcRowConverter.java:193)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toExternal(AbstractJdbcRowConverter.java:86)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$createKeyedRowExecutor$3fd497bb$1(JdbcRowDataOutputFormat.java:164)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.executor.KeyedBatchStatementExecutor.executeBatch(KeyedBatchStatementExecutor.java:71)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.attemptFlush(JdbcRowDataOutputFormat.java:154)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:120)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_262]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[?:1.8.0_262]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_262]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[?:1.8.0_262]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_262]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_262]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink使用多个keytab

2020-12-03 Thread zhengmao776
你好,我在使用flink
run提交任务时,由于集群是kerberos化的Hadoop集群,我想为不同的用户提供不同的keytab进行认证,但是我在flink-conf.yaml中看到了的security.kerberos.login.keytab和security.kerberos.login.principal的相关配置,但是这不能动态配置;我尝试了使用
-yD进行配置,但是并不起作用,我想知道如何处理这样的情况?期待您的回复~~~



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Routing events to different kafka topics dynamically

2020-12-03 Thread Prasanna kumar
Thanks Till,

Able to deduce topics by extending the KafkaSerializarion Schema class.

Prasanna.

On Wed, Dec 2, 2020 at 11:18 PM Till Rohrmann  wrote:

> Hi Prasanna,
>
> I believe that what Aljoscha suggestd in the linked discussion is still
> the best way to go forward. Given your description of the problem this
> should actually be pretty straightforward as you can deduce the topic from
> the message. Hence, you just need to create the ProducerRecord with the
> right target topic you extracted from the record/message.
>
> Cheers,
> Till
>
> On Wed, Dec 2, 2020 at 5:28 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi,
>>
>> Events need to be routed to different kafka topics dynamically based upon
>> some info in the message.
>>
>> We have implemented using KeyedSerializationSchema similar to
>> https://stackoverflow.com/questions/49508508/apache-flink-how-to-sink-events-to-different-kafka-topics-depending-on-the-even.
>> But its deprecated and we cannot use it for production.
>>
>> I looked at the alternative KafkaSerializationSchema but there i do not
>> see an option there.
>>
>> Then i stumbled upon this
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Usage-of-KafkaDeserializationSchema-and-KafkaSerializationSchema-td32347.html.
>> which asks us to use KafkaContextAware.
>>
>> Is there a more intuitive/easier way to do the same ?
>>
>> Thanks,
>> Prasanna.
>>
>>
>>


Re: 为什么要关闭calcite的隐式转换功能

2020-12-03 Thread Rui Li
Hi,

我理解可能是calcite隐式类型转换功能还比较新,暂时没有启用。不过即使开启
了跟hive的隐式转换逻辑也不一定完全一致,比如某些hive允许的转换calcite可能不允许。目前社区也在做hive语法兼容的工作,这个功能有了以后迁移hive任务会更容易。

On Wed, Dec 2, 2020 at 6:43 PM tangshiwei 
wrote:

> 目前flink sql,flink hive
> sql都不支持隐式转换功能。我们在调试的时候发现其实calcite本身是支持的。但是flink这边强制关闭了。而hive本身是支持隐式转换的。这导致我们的hive任务无法迁移到flink上执行。请问关闭的原因是什么?如果我们这边开启会带来什么问题吗?



-- 
Best regards!
Rui Li


Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题

2020-12-03 Thread Rui Li
Hi,

你的flink lib下都添加了哪些依赖呢,另外出问题的SQL是怎么写的?

On Thu, Dec 3, 2020 at 4:15 PM yang xu <316481...@qq.com> wrote:

> flink 版本1.11.2
> hive 版本2.1.1  基于cdh 6.2.1
> 写普通表或parquet没问题,写orc报如下错误:
> <
> http://apache-flink.147419.n8.nabble.com/file/t1150/flink_hive%E5%8C%85%E5%86%B2%E7%AA%81.png>
>
>
> 也看到其它邮件列表说修改:
> flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar
> OrcFile:
> WriterVersion CURRENT_WRITER = WriterVersion.HIVE_13083
> 重新编译即可,但是这样尝试之后还是报同样的错误,是Hive必须升级到3.x版本么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


Re: zookeeper更换leader对flink的影响

2020-12-03 Thread Yang Wang
我理解Curator和ZooKeeper的各个节点(包括Leader, Followers)之间都是长连接
如果你重启了ZK节点的其中一个,应该会导致和这个节点连着的Curator Client都Suspend,
进而导致相应的JobManager丢掉leader ship,所以会cancel掉当前任务然后重新运行

你可以验证一下是不是重启一个ZK节点只是特定的连到这台上面的Flink任务Failover,而不是全部的

最后,这个问题目前应该是没有办法通过Flink配置直接解决,除非是Curator#LeaderLatch对Suspend的状态处理可以进行改进
同时Flink里也需要在LeaderRetrieval中对Suspend状态的处理进行改进,不是直接notify一个empty leader


Best,
Yang

赵一旦  于2020年12月3日周四 上午10:28写道:

>
> 那Curator的state为什么会变成suspended或lost呢?我重启zk一般都是一台一台重启,而且我最近才刚刚又试过一次,我是先重启了follower
> zk节点,结果刚刚kill一瞬间flink任务全部出问题了。
>
> Yang Wang  于2020年12月1日周二 下午8:18写道:
>
> > Flink是利用Curator Framework来进行Leader Election和Retrieval,当时Curator的State
> > 变成Suspended或者Lost的时候都会触发leader的revoke,进而导致需要Cancel掉之前的job
> > 等待新的leader出现再重新调度
> >
> > 你可以提供一下JobManager的log或者自己观察一下JobManager的log是不是有Curator Connection
> State的变化
> > 进而导致了Failover
> >
> >
> > Best,
> > Yang
> >
> > 赵一旦  于2020年12月1日周二 下午7:13写道:
> >
> > > 又石沉大海了,有没有懂的人出来解释下。
> > >
> > > RS  于2020年11月17日周二 上午9:35写道:
> > >
> > > > 哈哈, 我的也是, flink和ZK断开连接的话, 任务会全部重启, 这边测试了各种场景, 比如部署HA方案,
> > > > 部署多个jobmanager都测试过, 任务都是会重启的, 同样不知道如何解决.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020-11-16 18:39:29,"赵一旦"  写道:
> > > >
> > > >
> > >
> >
> >按照我在工作中经验,有过几次需要重启zk集群,我是单个zk节点逐个重启。结论是导致了flink集群中任务的全部自动重启(基于最近一次的ckpt)。这对任务还是有一定影响的,因为ckpt是10分钟一次,会导致瞬间压力变高。
> > > > >
> > > > >问下这个合理嘛,还是我配置的有问题or操作有问题。
> > > >
> > >
> >
>


Re: Flink SQL使用Tumble窗口函数报NoSuchMethodError functions/AggregateFunction 异常

2020-12-03 Thread JasonLee
hi

从报错信息看应该jar包冲突了,可以贴一下相关的依赖包吗



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-03 Thread Pierre Oberholzer
Hi Xingbo, Wei, Dian,

Many thanks for this plus for the high quality and prompt support overall.
Let’s close this thread here. Looking forward trying your approach.
Community, feel free to reach out with additional remarks and experiences
on structured streaming on complex/sparse objects.

Best regards,

Le jeu. 3 déc. 2020 à 08:47, Xingbo Huang  a écrit :

> Hi Pierre,
>
> The serialization/deserialization of sparse Row in flink is specially
> optimized. The principle is that each Row will have a leading mask when
> serializing to identify whether the field at the specified position is
> NULL, and one field corresponds to one bit. For example, if you have 10k
> fields, then there will be a mask of 10k bit / 8 = 1250 byte. In this way,
> the serialization/deserialization overhead can be omitted for those field
> values that are NULL.
>
> For specific code optimization logic, you can refer to java logic[1], or
> python logic[2] and cython logic[3].
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java#L185
> [2]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py#L100
> [3]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/coder_impl_fast.pyx#L697
>
> Best,
> Xingbo
>
> Pierre Oberholzer  于2020年12月3日周四 下午3:08写道:
>
>> Hi Xingbo, Community,
>>
>> Thanks a lot for your support.
>> May I finally ask to conclude this thread, including wider audience:
>> - Are serious performance issues to be expected with 100k fields per ROW
>> (i.e. due solely to metadata overhead and independently of queries logic) ?
>> - In sparse population (say 99% sparsity) already optimized in the ROW
>> object or are sparse types on your roadmap ?
>> Any experience with sparse Table from other users (including benchmarks
>> vs. other frameworks) are also highly welcome.
>>
>> Thanks !
>>
>> Best
>>
>>
>> Le jeu. 3 déc. 2020 à 02:53, Xingbo Huang  a écrit :
>>
>>> Hi Pierre,
>>>
>>> This example is written based on the syntax of release-1.12 that is
>>> about to be released, and the test passed. In release-1.12, input_type can
>>> be omitted and expression can be used directly. If you are using
>>> release-1.11, you only need to modify the grammar of udf used slightly
>>> according to the udf documentation[1].
>>>
>>> The flink table connector supports avro format, please refer to the
>>> document[2].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format
>>>
>>> Best,
>>> Xingbo
>>>
>>> Pierre Oberholzer  于2020年12月3日周四 上午2:57写道:
>>>
 Hi Xingbo,

 Nice ! This looks a bit hacky, but shows that it can be done ;)

 I just got an exception preventing me running your code, apparently
 from udf.py:

 TypeError: Invalid input_type: input_type should be DataType but
 contains None

 Can you pls check again ?
 If the schema is defined is a .avsc file, do we have to parse it and
 rebuild those syntax (ddl and udf) and or is there an existing component
 that could be used ?

 Thanks a lot !

 Best,


 Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a
 écrit :

> Hi Pierre,
>
> I wrote a PyFlink implementation, you can see if it meets your needs:
>
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
> DataTypes
> from pyflink.table.udf import udf
>
>
> def test():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env,
>
> environment_settings=EnvironmentSettings.new_instance()
>
> .in_streaming_mode().use_blink_planner().build())
>
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>   '80m')
>
> # 10k nested columns
> num_field = 10_000
> fields = ['f%s INT' % i for i in range(num_field)]
> field_str = ','.join(fields)
> t_env.execute_sql(f"""
> CREATE TABLE source_table (
> f0 BIGINT,
> f1 DECIMAL(32,2),
> f2 ROW<${field_str}>,
> f3 TIMESTAMP(3)
> ) WITH (
>   'connector' = 'datagen',
>   'number-of-rows' = '2'
> )
> """)
>
> t_env.execute_sql(f"""
> CREATE TABLE print_table (
>  f0 BIGINT,
>  f1 DECIMAL(32,2),
>  f2 ROW<${field_str}>,
>  f3 TIMESTAMP(3)
> ) WITH (

flink 1.11.2写hive 2.1.1 orc 遇到的问题

2020-12-03 Thread yang xu
flink 版本1.11.2
hive 版本2.1.1  基于cdh 6.2.1 
写普通表或parquet没问题,写orc报如下错误:

 

也看到其它邮件列表说修改:
flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar 
OrcFile:
WriterVersion CURRENT_WRITER = WriterVersion.HIVE_13083
重新编译即可,但是这样尝试之后还是报同样的错误,是Hive必须升级到3.x版本么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/