debezium-json数据timestamp类型时区问题

2022-11-22 Thread Kyle Zhang
Hi all,
我们有一个场景,是把oracle数据通过debezium-oracle-cdc插件抽到kafka中,后面接flink
sql分析,现在遇到一个时区的问题,比如数据库中有一个timestamp类型的字段,值是‘2022-11-17
16:16:44’,但是debezium处理的时候用了int64保存,还不带时区信息,变成1668701804000,导致flink
sql中用FROM_UNIXTIME处理后变成‘2022-11-18 00:16:44
’,差了8小时,需要手工再减8h。请问有没有一种统一的方式处理这种情况?

Best


关于flink table store的疑问

2022-09-07 Thread Kyle Zhang
Hi all,
  看table
store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目?

Best.


用什么工具展示实时报表?

2022-08-16 Thread Kyle Zhang
Hi all,

经过flink处理过的数据想要做成实时报表,现在业内都在用什么方案?是通过flink写入db,然后用永洪/帆软等定时刷新,还是flink写入kafka,有工具能读取kafka数据源展示?

Best


Oracle CDC产生大量logminer日志

2022-08-09 Thread Kyle Zhang
Hi,Team
  最近在使用cdc的方式获取oracle数据的时候,dba反馈产生了大量的logminer日志,有没有方式调整日志级别,或者有特定参数可以加上?




Best


flink-k8s-operator中webhook的作用

2022-07-27 Thread Kyle Zhang
Hi,all
最近在看flink-k8s-operator[1],架构里有一个flink-webhook,请问这个container的作用是什么,如果配置
webhook.create=false对整体功能有什么影响?

Best regards

[1]
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/concepts/architecture/


Re: Flink SQL 怎么为每一个任务分配不同的内存配置

2020-12-13 Thread Kyle Zhang
一个集群跑一个SQL任务怎么样

On Mon, Dec 14, 2020 at 8:42 AM yinghua...@163.com 
wrote:

> Flink 作业在提交时可以通过参数指定JobManager
> 和TaskManager的内存配置,但是SQL执行时怎么为每一个任务指定其内存配置,是不是都是读同一个flink-conf.yaml中的配置?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/config.html#memory-configuration
> 中内存的配置都是基于flink-conf.yaml文件来操作的,是全局的配置,没有找到基于SQL任务独立配合内存的?
>
>
>
> yinghua...@163.com
>


Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 Thread Kyle Zhang
Hi,你说的是这个问题么

https://issues.apache.org/jira/browse/FLINK-16788

On Mon, Nov 30, 2020 at 7:23 PM cljb...@163.com  wrote:

> 看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗?
> 除了用api之外。
>
> 感谢!
>
>
>
> cljb...@163.com
>


Re: flink sql 连接mysql 无数据输出

2020-11-25 Thread Kyle Zhang
上面写错了,table要转成stream再打印

On Thu, Nov 26, 2020 at 11:46 AM Kyle Zhang  wrote:

> 调用executeSql,应该输出到另一张表里,比如printTable才能打印。
> 要不就是bsTableEnv.sqlQuery("select * from meson_budget_data").print()打印出来;
>
> On Thu, Nov 26, 2020 at 9:54 AM Leonard Xu  wrote:
>
>> Hi
>>
>> 调用了executeSql,作业就已经执行了,不用再调用下面的这个执行方法,看你已经配置了cp,确认下mysql的参数是否正确。
>>
>> > 在 2020年11月25日,18:42,冯草纸  写道:
>> >
>> > env.execute("sql test");
>> > // bsTableEnv.execute("sql test");
>>
>>


Re: flink sql 连接mysql 无数据输出

2020-11-25 Thread Kyle Zhang
调用executeSql,应该输出到另一张表里,比如printTable才能打印。
要不就是bsTableEnv.sqlQuery("select * from meson_budget_data").print()打印出来;

On Thu, Nov 26, 2020 at 9:54 AM Leonard Xu  wrote:

> Hi
>
> 调用了executeSql,作业就已经执行了,不用再调用下面的这个执行方法,看你已经配置了cp,确认下mysql的参数是否正确。
>
> > 在 2020年11月25日,18:42,冯草纸  写道:
> >
> > env.execute("sql test");
> > // bsTableEnv.execute("sql test");
>
>


ProcessingTime下的watermark

2020-11-23 Thread Kyle Zhang
Hi,
使用flink1.11,在SQL ddl中基于process time声明watermark报错

SQL validation failed. Watermark can not be defined for a processing time
attribute column.

文档里关于watermark的解释也基本是跟eventTime在一起[1]
我想问的是基于processingTime的流处理是不需要watermark,还是被flink优化,不需要我们关心?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html#event-time-and-watermarks

Best


Re: window和timeWindow的区别

2020-10-25 Thread Kyle Zhang
第一个是TumblingWindow,第二个是SlidingWindow


https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/operators/windows.html#tumbling-windows


Best Regards


On Sun, Oct 25, 2020 at 9:20 PM Natasha <13631230...@163.com> wrote:

> hi 社区,
> 我想请问下如果我用第二种红色方框内的写法跟第一种红色方框有什么不一样吗?两种方式执行的程序结果是不一样的,但是不明白为什么不同
>
>


关于内存大小设置以及预测

2020-10-16 Thread Kyle Zhang
Hi all,
  最近也是遇到比较常见的内存溢出的错误OutOfMemoryError: Java heap space,JM:1g
TM:2g,简单粗暴的设置成2g、4g就可以运行了,
INFO  [] - Loading configuration property:
cluster.termination-message-path, /flink/log/termination.log
INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:  3.906gb (4194304000 bytes)
INFO  [] - Total Flink Memory:  3.266gb (3506438138 bytes)
INFO  [] -   Total JVM Heap Memory: 1.508gb (1619001315 bytes)
INFO  [] - Framework:   128.000mb (134217728 bytes)
INFO  [] - Task:1.383gb (1484783587 bytes)
INFO  [] -   Total Off-heap Memory: 1.758gb (1887436823 bytes)
INFO  [] - Managed: 1.306gb (1402575276 bytes)
INFO  [] - Total JVM Direct Memory: 462.400mb (484861547 bytes)
INFO  [] -   Framework: 128.000mb (134217728 bytes)
INFO  [] -   Task:  0 bytes
INFO  [] -   Network:   334.400mb (350643819 bytes)
INFO  [] - JVM Metaspace:   256.000mb (268435456 bytes)
INFO  [] - JVM Overhead:400.000mb (419430406 bytes)

请问有没有指标能够事前估算JM、TM需要的内存大小?

Best


Re: kafka topic字段 不全的统计场景

2020-10-15 Thread Kyle Zhang
group by id应该就可以了吧,其他几个字段用last value或者first value[1],还有就是考虑迟到的数据怎么处理

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html

On Thu, Oct 15, 2020 at 5:01 PM 史 正超  wrote:

> 大佬们,现在我有个场景:
> 一个kafka 主题 有 4个字段 , id, field2, field3, field4,其中id 是唯一标识,
> 但是有个问题是,并不是每个消息都会带上全量的字段消息,只有id是固有的字段。然后需要把id, field2, field3, field4
> 作为一个维度 统计, 比如有如下 kafka消息:
> {"id": 1, "field2":"b"}
> {"id": 1, "field3":"c", "field4":"d"}
> 那么 按照维度 count(1)  (group by id, field2, field3, field4) 出来想要得到这样的统计:
> (1,b,  c, d)=> 1
>
> 这种需求有没有什么方案啊
>


Re: flink-connector-jdbc 落入mysql的时候timestamp 有时差问题

2020-10-13 Thread Kyle Zhang
感觉是时区的问题 ,是使用ddl做的么

*show* variables *like* '%time_zone%’ 看一下

On Tue, Oct 13, 2020 at 2:56 PM 姬洪超  wrote:

> flink-connector-jdbc 获取mysql的timestamp类型的数据后,sink到mysql后时间会晚八个小时。Ex:
> 获取到的是2020-05-12T11:53:08,写入mysql后变成2020-05-11 22:53:08


Re: Flink的table-api不支持.

2020-10-08 Thread Kyle Zhang
试一试select *  from  OrderA orderA join OrderB orderB on
orderA.user=orderB.user

On Sun, Oct 4, 2020 at 5:09 PM 忝忝向仧 <153488...@qq.com> wrote:

> Hi,all:
>
>
> Table api的sql查询里面join的时候不能写"."么?
> 这样写就会报错 如下
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "." at line 1, column 36.
> Was expecting one of:
>  "EXCEPT" ...
>   "FETCH" ...
>   "FROM" ...
>   "INTERSECT" ...
>   "LIMIT" ...
>   "OFFSET" ...
>   "ORDER" ...
>   "MINUS" ...
>   "UNION" ...
>   "," ...
>
>
>
> Table result = tEnv.sqlQuery("select *  from  OrderA join OrderB on
> OrderA.user=OrderB.user");


Re: 关于flink sql cdc

2020-09-29 Thread Kyle Zhang
show variables like '%binlog_format%'确实是ROW

On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


Re: 关于flink sql cdc

2020-09-29 Thread Kyle Zhang
CC426ED884CB5EEC8E3B29284D6705620D76567D071BAEDEBCC57C223751AA7DF9F8EAF22432BA1A8C511EDFCF4653936D27F9FBF18AEBE58A4BE2E14620526BA55E9E305FCBE4813B9FE57047FB42DAC2F8CA54346CCFF19BA0DAA8078D124FC04A436DD68398FADA2570A567F6F21BE8E94F55305818EDD127D7A798778FCA366A47B94B910EEC72ADFE4297DABFE852FB0F75E1873BD817FFCB19BD72AAB0C52190DF302922C95508D4248CB954C5492532012535F461038AB574AA548511DEB1C0BDA19C112C78AA3CCB9683060BDAE74911944B6D6C36488C49CB7B45A0DB1BF4C65F669DE968140E26FDC7D9683A18F4069F8FAE791BC30DA05411925E70E5004761BC5CEC86BA4BCA7473D70378C6E022B4241322775ADA9CCD8F14D2925F80D595F9F9C02D8FC1675B478B5ED6ECF3CB9291747CAE20CAE775D63B99303AB1E63B089F9D499D955A65CF5C05E862CD6232873D08AF40D4DB80671B30575CACFBD2F9F14E0B762852A064BC238F7B2E4E76EB32272D974256F7264286A7DB556B7EAC37F0632FBF0077C290D8EF05'
WHERE trigger_id=42' for processing, binlog probably contains events
generated with statement or mixed based replication format
at 
io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]


On Tue, Sep 29, 2020 at 7:39 PM Kyle Zhang  wrote:

> Hi,all
>   今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
> CREATE TABLE mysql_binlog (
>  id INT NOT NULL,
>  emp_name STRING,
>  age INT
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'xxx',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'empoylee1'
> );
> 结果直接用print table
> 运行一段时间后报错
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Error during binlog processing. Last offset stored = null,
> binlog reader near position = binlog.001254/132686776
> 19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
>   [] - Failed due to error: Error processing binlog event
> org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT
> INTO execution_flows (project_id, flow_id, version, status, submit_time,
> submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
> [flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
> 'INSERT INTO execution_flows (project_id, flow_id, version, status,
> submit_time, submit_user, update_time) values
> (47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
> processing, binlog probably contains events generated with statement or
> mixed based replication format
> at
> io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> at
> io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
> ... 5 more
>
> sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题
>
> Best,
> Kyle Zhang
>


关于flink sql cdc

2020-09-29 Thread Kyle Zhang
Hi,all
  今天在使用sql cdc中遇到以一个问题 ,版本1.11.2,idea中运行,我的ddl是
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 emp_name STRING,
 age INT
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'xxx',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'empoylee1'
);
结果直接用print table
运行一段时间后报错
19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
[] - Error during binlog processing. Last offset stored = null, binlog
reader near position = binlog.001254/132686776
19:22:52,430 ERROR io.debezium.connector.mysql.BinlogReader
[] - Failed due to error: Error processing binlog event
org.apache.kafka.connect.errors.ConnectException: Received DML 'INSERT INTO
execution_flows (project_id, flow_id, version, status, submit_time,
submit_user, update_time) values
(47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
processing, binlog probably contains events generated with statement or
mixed based replication format
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: org.apache.kafka.connect.errors.ConnectException: Received DML
'INSERT INTO execution_flows (project_id, flow_id, version, status,
submit_time, submit_user, update_time) values
(47,'OfflineComputeTask',1,20,1601378572442,'azkaban',1601378572442)' for
processing, binlog probably contains events generated with statement or
mixed based replication format
at
io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:785)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at
io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:583)
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
... 5 more

sql cdc还会解析我其他的表然后报错么?,有没有人遇到类似的问题

Best,
Kyle Zhang


关于如何贡献社区

2020-09-27 Thread Kyle Zhang
Hi,
  我在jira上建了一个issue(FLINK-19433

),后续如何跟进呢,是否要有commiter把任务指配给我?

Best regards


关于sql中null值设置watermark

2020-09-25 Thread Kyle Zhang
Hi,
我在flink1.11 create ddl中使用 WATERMARK FOR wm AS wm - INTERVAL '5’ SECOND
设置watermark,当有脏数据的时候wm为null,就会报RowTime field should not be null, please
convert it to a non-null long value.有没有好的方法直接忽略脏数据。

Best


kafka connector从指定timestamp开始消费

2020-06-11 Thread Kyle Zhang
Hi,
kafka connector 
ddl能不能像flinkKafkaConsumer.setStartFromTimestamp(xx)一样从指定timestamp开始消费,我看文档里只提到了earliest-offset,latest-offset,group-offsets,specific-offsets

CREATE TABLE MyUserTable (
  ...
) WITH (
  'connector.type' = 'kafka',   

  'connector.version' = '0.11', -- required: valid connector versions are
-- "0.8", "0.9", "0.10", "0.11", and 
"universal"

  'connector.topic' = 'topic_name', -- required: topic name from which the 
table is read

  'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: 
specify the ZooKeeper connection string
  'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: 
specify the Kafka server connection string
  'connector.properties.group.id' = 'testGroup', --optional: required in Kafka 
consumer, specify consumer group
  'connector.startup-mode' = 'earliest-offset',-- optional: valid modes are 
"earliest-offset", 
   -- "latest-offset", 
"group-offsets", 
   -- or "specific-offsets"

  -- optional: used in case of startup mode with specific offsets
  'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',

  'connector.sink-partitioner' = '...',  -- optional: output partitioning from 
Flink's partitions 
 -- into Kafka's partitions valid are 
"fixed" 
 -- (each Flink partition ends up in at 
most one Kafka partition),
 -- "round-robin" (a Flink partition is 
distributed to 
 -- Kafka partitions round-robin)
 -- "custom" (use a custom 
FlinkKafkaPartitioner subclass)
  -- optional: used in case of sink partitioner custom
  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
  
  'format.type' = '...', -- required: Kafka connector requires 
to specify a format,
  ...-- the supported formats are 'csv', 
'json' and 'avro'.
 -- Please refer to Table Formats 
section for more details.
)



sql中使用Create view 报错

2020-06-01 Thread Kyle Zhang
Hi,
最近在使用tEnv.sqlUpdate("create view ….的时候报错,Unsupported query: create 
view。稍微看了一下,master上的SqlToOperationConverter.convert里是有“validated instanceof 
SqlCreateView”,这个判断的,1.10的分支上还没有,感觉这个功能应该挺常见的吧,Flink SQL CLI上也是支持create 
view的,还没合并是有什么考虑么。

用Sql输出到oracle

2020-05-25 Thread Kyle Zhang
大家好,最近刚开始用flink,遇到一些问题。
我的场景是从kafka读数,group 
by之后输出到oracle,但是我看现在JdbcDialects只有mysql、postgres跟derby,大家oracle怎么用的。
现在我是把table转成stream再写到oracle里。