JDBC异步lookup join有FLIP或者迭代计划吗?

2021-06-09 Thread Luna Wong
如果没有我用VertX和Druid连接池贡献下代码。这个要在dev邮件列表里讨论是吗


SQL DDL选项的值需要单引号应该如何解决?

2021-05-27 Thread Luna Wong
create table source (
id int
) with (
type='jdbc',
username='us',
password='ab'c'
);

例如上面DDL密码的值需要 ' 单引号。这种应该怎么解决?


Flink Jdbc Connector close方法线程同步

2021-05-21 Thread Luna Wong
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java

为什么上诉代码中的close方法要进行线程同步,这个是什么考虑。


GenericRowData与BinaryRowData的转换

2021-04-08 Thread Luna Wong
我看Kafka Connector源码生成的是GenericRowData,到Jdbc
Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么?


SinkFunction与OutputFormat选择哪个?

2021-04-08 Thread Luna Wong
自定义Connector时,RichSinkFunction和RichOutputFormat可以选择一个进行实现,那么作为开发者应该如何选择呢?


blink planner里的Scala代码,未来会由Java改写吗?

2021-04-01 Thread Luna Wong
目前blink planner中有大量Scala代码,Scala在这方面写起来确实简单不少。未来不需要用Java重写是吗?


Re: Flink 1.12.2 sql api 使用parquet格式报错

2021-03-31 Thread Luna Wong
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html
Parquet你要下这个Jar包放在你flink/lib目录的。

Luna Wong  于2021年4月1日周四 上午10:45写道:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html
>
> 太平洋 <495635...@qq.com> 于2021年4月1日周四 上午10:26写道:
> >
> > 使用 parquet 还需要手段添加其他相关的依赖吗?
> >
> >
> > 环境和报错信息如下:
> >
> >
> > Flink 版本: 1.12.2
> > 部署方式: standalone kubernetes session
> > 添加的相关依赖
> > > 
> >  > 
> >  >  >  >
> >
> >
> > 错误信息:
> > Caused by: org.apache.flink.table.api.ValidationException: Could not find 
> > any format factory for identifier 'parquet' in the classpath. at 
> > org.apache.flink.table.filesystem.FileSystemTableSink. >  at 
> > org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:85)


Re: Flink 1.12.2 sql api 使用parquet格式报错

2021-03-31 Thread Luna Wong
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html

太平洋 <495635...@qq.com> 于2021年4月1日周四 上午10:26写道:
>
> 使用 parquet 还需要手段添加其他相关的依赖吗?
>
>
> 环境和报错信息如下:
>
>
> Flink 版本: 1.12.2
> 部署方式: standalone kubernetes session
> 添加的相关依赖
> 
>  
>   
>
>
> 错误信息:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
> format factory for identifier 'parquet' in the classpath. at 
> org.apache.flink.table.filesystem.FileSystemTableSink.  at 
> org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:85)


Re: FLIP-146中TableSource并行度设置预计哪个版本做?

2021-03-31 Thread Luna Wong
DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。
问题1:ScanTable 并行度在FLIP-146中有提及, LookupTable的并行度设置有FLIP或者issue吗?
问题2:这两类Table的并行度设置,预计在Flink哪个版本推出。

Luna Wong  于2021年3月31日周三 下午9:46写道:
>
> DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。
>
> Luna Wong  于2021年3月31日周三 下午9:45写道:
> >
> > DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。


Re: FLIP-146中TableSource并行度设置预计哪个版本做?

2021-03-31 Thread Luna Wong
DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。

Luna Wong  于2021年3月31日周三 下午9:45写道:
>
> DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。


FLIP-146中TableSource并行度设置预计哪个版本做?

2021-03-31 Thread Luna Wong
DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。


Re: Flink SQL一个Job使用多个Catalog的例子

2021-01-12 Thread Luna Wong
我已经解决了。USE其他Catalog再建表即可。

Luna Wong  于2021年1月12日周二 下午9:41写道:
>
> 大家好。
>   我没有在官网找到个Job使用多个Catalog的例子。
>   我想在一个Job里使用普通的Catalog注册个Kafka Source然后,将数据发送Iceberg Sink表。这个Sink表
> 注册在另一个Iceberg +  Hive 的Catalog 中。
> 注册代码如下。
>  CREATE CATALOG hive_catalog WITH (
> 'type'='iceberg',
> 'catalog-type'='hive',
> 'uri'='thrift://kudu1:9083',
> 'clients'='2',
> 'property-version'='1',
> 'warehouse'='hdfs://ns1//user/hive/warehouse'
> );
> 之所以使用两个Catalog是因为我发现Kafka Source无法注册到这种类型为Iceberg的Hive 
> Catalog中。我必须得换一个Catalog。
> 目前在IDEA下我还没跑起来。
>
> 可爱的木兰。


Flink SQL一个Job使用多个Catalog的例子

2021-01-12 Thread Luna Wong
大家好。
  我没有在官网找到个Job使用多个Catalog的例子。
  我想在一个Job里使用普通的Catalog注册个Kafka Source然后,将数据发送Iceberg Sink表。这个Sink表
注册在另一个Iceberg +  Hive 的Catalog 中。
注册代码如下。
 CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://kudu1:9083',
'clients'='2',
'property-version'='1',
'warehouse'='hdfs://ns1//user/hive/warehouse'
);
之所以使用两个Catalog是因为我发现Kafka Source无法注册到这种类型为Iceberg的Hive Catalog中。我必须得换一个Catalog。
目前在IDEA下我还没跑起来。

可爱的木兰。


Flink IDEA中使用Iceberg

2021-01-11 Thread Luna Wong
 代码如下,Flink metastore报错日志: AlreadyExistsException(message:Database
default already exists)

但是我已经 USE了 luna db啊?很奇怪

  tEnv.executeSql("CREATE CATALOG iceberg_hive WITH (\n" +
"  'type'='iceberg',\n" +
"  'catalog-type'='hive',\n" +
"  'uri'='thrift://kudu1:9083',\n" +
"  'clients'='2',\n" +
"  'property-version'='1',\n" +
"  'warehouse'='hdfs://ns1//user/hive/warehouse'\n" +
")");
tEnv.executeSql("USE CATALOG iceberg_hive");

//tEnv.executeSql("CREATE DATABASE luna");
tEnv.executeSql("USE luna");

tEnv.executeSql("CREATE TABLE iceberg_hive.luna.dwd (\n" +
"id BIGINT COMMENT 'unique id',\n" +
"name STRING\n" +
")");

tEnv.executeSql("CREATE TABLE iceberg_hive.luna.ads (\n" +
"id BIGINT COMMENT 'unique id',\n" +
"name STRING\n" +
")");


Flink Stream SQL 何时支持拉取 Iceberg表

2021-01-10 Thread Luna Wong
我看Iceberg官方文档,Stream目前只支持Iceberg Sink但是没有Source。未来在哪个版本会支持Iceberg
Source Stream。有issue吗?


Flink CVE补丁会打在1.10版本吗

2021-01-06 Thread Luna Wong
Flink 那个两个CVE的Patch 会打在1.10.4这个版本吗。目前修复版本是 1.11.3 1.12.0 ,很多生成还都在1.10 不好直接升级的。


Job结束blob目录及文件没有删除

2020-12-21 Thread Luna Wong
https://issues.apache.org/jira/browse/FLINK-20696

有一定概率发生,提交很多Job,Job结束后,会有个别Blob目录没有清理。我还没Debug出原因。


Re: FlinkSQL导致Prometheus内存暴涨

2020-11-30 Thread Luna Wong
我看了源码了。operator name截断了。但是task name没截断。task name是那些operator name拼起来的
所以特别长。现在我只是魔改源码临时截断了一下,咱还是在issue里讨论吧

Jark Wu  于2020年11月26日周四 下午8:53写道:
>
> IIRC, runtime will truncate the operator name to max 80 characters, see
> `TaskMetricGroup#METRICS_OPERATOR_NAME_MAX_LENGTH`.
> You can search the log if there are "The operator name {} exceeded the {}
> characters length limit and was truncated.".
>
> On Thu, 26 Nov 2020 at 18:18, hailongwang <18868816...@163.com> wrote:
>
> >
> >
> >
> > Hi,
> >  是的,个人觉得可以提供一个配置项来控制 task Name。
> >  完整的 task name 有助于排查问题等,简短的 task name 有助于在生产环境中 metric
> > 的采集,可以极大较少发送的网络开销,存储空间等。
> >  已建立个了 issue :https://issues.apache.org/jira/browse/FLINK-20375
> >
> >
> > Best,
> > Hailong
> >
> > 在 2020-11-24 14:19:40,"Luna Wong"  写道:
> > >FlinkSQL 生成的Metrics数据 task_name名字超长,导致Prometheus查询的时候内存暴涨,生产环境接受不了。
> > >下面只是一个最简单的例子,复杂的SQL生成的task_name长达9000字节。这会导致Prometheus内存暴涨,我该怎么办。
> > >
> >
> > >task_name="Source:_wuren_foo_ods_fooSourceConversion_table__Unregistered_DataStream_1___fields__id__name__SinkConversionToRowSourceConversion_table__default_catalog_default_database_ods_foo___fields__id__name__PROCTIME__Calc_select__id__name__SinkConversionToTuple2Sink:_Unnamed"
> >


FlinkSQL导致Prometheus内存暴涨

2020-11-23 Thread Luna Wong
FlinkSQL 生成的Metrics数据 task_name名字超长,导致Prometheus查询的时候内存暴涨,生产环境接受不了。
下面只是一个最简单的例子,复杂的SQL生成的task_name长达9000字节。这会导致Prometheus内存暴涨,我该怎么办。

task_name="Source:_wuren_foo_ods_fooSourceConversion_table__Unregistered_DataStream_1___fields__id__name__SinkConversionToRowSourceConversion_table__default_catalog_default_database_ods_foo___fields__id__name__PROCTIME__Calc_select__id__name__SinkConversionToTuple2Sink:_Unnamed"


Flink未来会弃用TableSourceFactory吗

2020-11-16 Thread Luna Wong
FLIP-95都实现后有了DynamicTableSourceFactory那么TableSourceFactory会弃用吗?


Flink JSON反序列化DECIMAL精度丢失

2020-11-16 Thread Luna Wong
https://issues.apache.org/jira/browse/FLINK-20170

这是我今天提的issue。 
Jackson这样反序列化会把BigDECIMAL转成Double。超过12位的小数精度丢失。这种情况我得怎么办。只能先当做STRING类型处理或修改下JSON这个包的源码重新变一下。
还有其他的最佳实践吗


Re: Re: Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 Thread Luna Wong
Table API 转 DataStream为啥会出现性能损耗

hailongwang <18868816...@163.com> 于2020年11月11日周三 下午6:28写道:
>
> 我理解是使用 使用 Kafka consumer 时使用 `CanalJsonDeserializationSchema` 序列化类就好了?
> 而不是再实现一个 Connector。
>
>
>
>
> 在 2020-11-11 16:56:58,"LittleFall" <1578166...@qq.com> 写道:
> >明白了,多谢。
> >
> >是 Canal-Json 格式的 Kafka Connector.
> >
> >我们的一个产品 (TiCDC) 已经实现了输出 Canal-Json 格式的 changelog 到 Kafka 中,现在可以方便地使用 table
> >api 对接 flink。
> >
> >现在是因为考虑到 Stream Api 能力比 Table Api 能力要强,所以在评估是否需要再实现一个 Stream Connector.
> >
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/


ElasticsearchApiCallBridge相关类构造函数问题

2020-11-11 Thread Luna Wong
为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。
我还想继承Elasticsearch6ApiCallBridge类。在new
RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。

不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?