Re: 回复: 如何从复杂的kafka消息体定义 table

2021-07-08 文章 Caizhi Weng
Hi!您可以像 JasonLee 提供的文章中一样先用 DDL 描述 kafka 消息的结构,之后在 SQL 代码中通过 create view 抽取
APPLY_PERSON_ID 等信息,就可以达成您需要的效果。

您的一条 kafka 消息似乎就对应 MyUserTable 的一行,看起来没有列转行的需求。

Chenzhiyuan(HR)  于2021年7月9日周五 上午11:57写道:

> 列转行是在json的 DDL 里面可以写,还是在获取kafka数据后java代码里再转换一次。
>
> -邮件原件-
> 发件人: JasonLee [mailto:17610775...@163.com]
> 发送时间: 2021年7月9日 11:34
> 收件人: user-zh@flink.apache.org
> 主题: 回复: 如何从复杂的kafka消息体定义 table
>
> Hi
>
>
> 事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.
>
>
> Best
> JasonLee
>
>
> 在2021年07月9日 10:06,Chenzhiyuan(HR) 写道:
> 消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:
>
> CREATE TABLE MyUserTable(
> APPLY_PERSON_ID VARCHAR,
> UPDATE_SALARY DECIMAL,
> UP_AMOUNT DECIMAL,
> CURRENCY VARCHAR,
> EXCHANGE_RATE DECIMAL
> ) with (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'topic_name',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'localhost:9092', '
> connector.properties.group.id' = 'testGroup', 'format.type' = '?'
>
> 接下来直接查询每个字段的值:
> Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY,
> UP_AMOUNT, CURRENCY, EXCHANGE_RATE from MyUserTable ");
>
> 请教下这个该如何定义DDL.
>
>
>
> 发件人: 17610775726 [mailto:17610775...@163.com]
> 发送时间: 2021年7月9日 9:26
> 收件人: Chenzhiyuan(HR) 
> 主题: 回复:如何从复杂的kafka消息体定义 table
>
> hi
>
> 用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA
>
> Best
> JasonLee
>  回复的原邮件 
> 发件人
>
> Chenzhiyuan(HR)
>
> 发送日期
>
> 2021年07月09日 08:59
>
> 收件人
>
> user-zh@flink.apache.org user-zh@flink.apache.org>
>
> 主题
>
> 如何从复杂的kafka消息体定义 table
>
> 大家好:
> 我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
> 如果json, avro不能满足的话,是不是得自己自定义一个。
> 自定义的话不知道如何写,请各位帮忙指教下。
>
> 定义的表如下:
> CREATE TABLE MyUserTable(
> uuid VARCHAR,
> orgId VARCHAR
> ) with (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'topic_name',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'localhost:9092', '
> connector.properties.group.id' = 'testGroup', 'format.type' = '?'
> )
>
>
> Kafka的消息体如下, 好像不符合avro之类的标准格式:
>
> {
> "beforeData": [],
> "byteSize": 272,
> "columnNumber": 32,
> "data": [{
> "byteSize": 8,
> "columnName": "APPLY_PERSON_ID",
> "rawData": 10017,
> "type": "LONG"
> }, {
> "byteSize": 12,
> "columnName": "UPDATE_SALARY",
> "rawData": "11000.00",
> "type": "DOUBLE"
> }, {
> "byteSize": 11,
> "columnName": "UP_AMOUNT",
> "rawData": "1000.00",
> "type": "DOUBLE"
> }, {
> "byteSize": 3,
> "columnName": "CURRENCY",
> "rawData": "CNY",
> "type": "STRING"
> }, {
> "byteSize": 32,
> "columnName": "EXCHANGE_RATE",
> "rawData": "1.00",
> "type": "DOUBLE"
> },  {
> "byteSize": 11,
> "columnName": "DEDUCTED_ACCOUNT",
> "rawData": "1000.00",
> "type": "DOUBLE"
> }, {
> "byteSize": 1,
> "columnName": "ENTER_AT_PROCESS",
> "rawData": "Y",
> "type": "STRING"
> }],
> "dataCount": 0,
> "dataMetaData": {
> "connector": "mysql",
> "pos": 1000368076,
> "row": 0,
> "ts_ms": 1625565737000,
> "snapshot": "false",
> "db": "testdb",
> "table": "flow_person_t"
> },
> "key": "APPLY_PERSON_ID",
> "memorySize": 1120,
> "operation": "insert",
> "rowIndex": -1,
> "timestamp": "1970-01-01 00:00:00"
> }
>


Re:如何从复杂的kafka消息体定义 table

2021-07-08 文章 东东
自己实现一个DeserializationFormatFactory就行




可以参考官方的CanalJsonFormatFactory或者DebeziumJsonFormatFactory



在 2021-07-09 08:59:36,"Chenzhiyuan(HR)"  写道:
>大家好:
>我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
>如果json, avro不能满足的话,是不是得自己自定义一个。
>自定义的话不知道如何写,请各位帮忙指教下。
>
>   定义的表如下:
>   CREATE TABLE MyUserTable(
>uuid VARCHAR,
>orgId VARCHAR
>) with (
>'connector.type' = 'kafka',
>'connector.version' = '0.11',
>'connector.topic' = 'topic_name',
>'connector.properties.zookeeper.connect' = 'localhost:2181',
>'connector.properties.bootstrap.servers' = 'localhost:9092',
>'connector.properties.group.id' = 'testGroup',
>'format.type' = '?'
>)
>
>
>Kafka的消息体如下, 好像不符合avro之类的标准格式:
>
>{
>   "beforeData": [],
>"byteSize": 272,
>"columnNumber": 32,
>"data": [{
>"byteSize": 8,
>"columnName": "APPLY_PERSON_ID",
>"rawData": 10017,
>"type": "LONG"
>}, {
>"byteSize": 12,
>"columnName": "UPDATE_SALARY",
>"rawData": "11000.00",
>"type": "DOUBLE"
>}, {
>"byteSize": 11,
>"columnName": "UP_AMOUNT",
>"rawData": "1000.00",
>"type": "DOUBLE"
>}, {
>"byteSize": 3,
>"columnName": "CURRENCY",
>"rawData": "CNY",
>"type": "STRING"
>}, {
>"byteSize": 32,
>"columnName": "EXCHANGE_RATE",
>"rawData": "1.00",
>"type": "DOUBLE"
>},  {
>"byteSize": 11,
>"columnName": "DEDUCTED_ACCOUNT",
>"rawData": "1000.00",
>"type": "DOUBLE"
>}, {
>"byteSize": 1,
>"columnName": "ENTER_AT_PROCESS",
>"rawData": "Y",
>"type": "STRING"
>}],
>"dataCount": 0,
>"dataMetaData": {
>"connector": "mysql",
>"pos": 1000368076,
>"row": 0,
>"ts_ms": 1625565737000,
>"snapshot": "false",
>"db": "testdb",
>"table": "flow_person_t"
>},
>"key": "APPLY_PERSON_ID",
>"memorySize": 1120,
>"operation": "insert",
>"rowIndex": -1,
>"timestamp": "1970-01-01 00:00:00"
>}
>


答复: 回复: 如何从复杂的kafka消息体定义 table

2021-07-08 文章 Chenzhiyuan(HR)
列转行是在json的 DDL 里面可以写,还是在获取kafka数据后java代码里再转换一次。

-邮件原件-
发件人: JasonLee [mailto:17610775...@163.com] 
发送时间: 2021年7月9日 11:34
收件人: user-zh@flink.apache.org
主题: 回复: 如何从复杂的kafka消息体定义 table

Hi


事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.


Best
JasonLee


在2021年07月9日 10:06,Chenzhiyuan(HR) 写道:
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181', 
'connector.properties.bootstrap.servers' = 'localhost:9092', 
'connector.properties.group.id' = 'testGroup', 'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, 
CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775...@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) 
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
 回复的原邮件 
发件人

Chenzhiyuan(HR)

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

定义的表如下:
CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181', 
'connector.properties.bootstrap.servers' = 'localhost:9092', 
'connector.properties.group.id' = 'testGroup', 'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
"beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.00",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.00",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}


回复: 如何从复杂的kafka消息体定义 table

2021-07-08 文章 JasonLee
Hi


事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.


Best
JasonLee


在2021年07月9日 10:06,Chenzhiyuan(HR) 写道:
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, 
CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775...@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) 
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
 回复的原邮件 
发件人

Chenzhiyuan(HR)

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

定义的表如下:
CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
"beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.00",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.00",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}


flink-sql有没有类似hive里distribute by的功能

2021-07-08 文章 woods
flink-sql有没有类似hive里distribute by的功能,数据行根据某个字段hash到不同的
task中


答复: 回复:如何从复杂的kafka消息体定义 table

2021-07-08 文章 Chenzhiyuan(HR)
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, 
CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775...@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) 
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
 回复的原邮件 
发件人

Chenzhiyuan(HR)

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

  定义的表如下:
  CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
  "beforeData": [],
   "byteSize": 272,
   "columnNumber": 32,
   "data": [{
   "byteSize": 8,
   "columnName": "APPLY_PERSON_ID",
   "rawData": 10017,
   "type": "LONG"
   }, {
   "byteSize": 12,
   "columnName": "UPDATE_SALARY",
   "rawData": "11000.00",
   "type": "DOUBLE"
   }, {
   "byteSize": 11,
   "columnName": "UP_AMOUNT",
   "rawData": "1000.00",
   "type": "DOUBLE"
   }, {
   "byteSize": 3,
   "columnName": "CURRENCY",
   "rawData": "CNY",
   "type": "STRING"
   }, {
   "byteSize": 32,
   "columnName": "EXCHANGE_RATE",
   "rawData": "1.00",
   "type": "DOUBLE"
   },  {
   "byteSize": 11,
   "columnName": "DEDUCTED_ACCOUNT",
   "rawData": "1000.00",
   "type": "DOUBLE"
   }, {
   "byteSize": 1,
   "columnName": "ENTER_AT_PROCESS",
   "rawData": "Y",
   "type": "STRING"
   }],
   "dataCount": 0,
   "dataMetaData": {
   "connector": "mysql",
   "pos": 1000368076,
   "row": 0,
   "ts_ms": 1625565737000,
   "snapshot": "false",
   "db": "testdb",
   "table": "flow_person_t"
   },
   "key": "APPLY_PERSON_ID",
   "memorySize": 1120,
   "operation": "insert",
   "rowIndex": -1,
   "timestamp": "1970-01-01 00:00:00"
}


退订

2021-07-08 文章 Yu Wang
退订


如何从复杂的kafka消息体定义 table

2021-07-08 文章 Chenzhiyuan(HR)
大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

   定义的表如下:
   CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
   "beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.00",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.00",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}



flink-sql 连接kafka报错

2021-07-08 文章 yanyunpeng
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V

flink-sql 查询kafka

kafka版本2.4 connector版本flink-sql-connector-kafka_2.11-1.11.2.jar


请求 这是什么原因 是 connector的版本问题有?