Re: 关于Flink1.11 CSV Format的一些疑问

2020-08-07 文章 Shengkai Fang
hi,
对于第一个问题,文档[1]中已经有较为详细的解释,你可以仔细阅读下文档关于partition files的解释。
对于第二个问题,现在的csv格式的确不支持这个选项,可以考虑见个jira作为improvment.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html


WeiXubin <18925434...@163.com> 于2020年8月8日周六 上午11:40写道:

> Hi,我在Flink1.11版本,使用filesystem connector的时候,读取csv文件并输出到另外一个csv文件遇到了些问题,问题如下:
> 问题1:sink 的path指定具体输出文件名,但是输出的结果是 文件夹形式
> 问题2:在flink1.11的文档中没有找到csv的 ignore-first-line 忽略第一行这个配置
>
> 测试数据
> 11101322000220200517145507667060666706;9
> 11101412000220200515163257249700624970;9
> 11101412010220200514163709315410631541;9
> 11101712050220200516173624737150673715;9
> 11101312000220200516184127322880632288;9
>
> CREATE TABLE source_table (
>   face_id STRING,
>   p_id STRING
> ) WITH (
>  'connector' = 'filesystem',
>  'path' = 'E:\label_file.csv',
>  'format' = 'csv',
>  'csv.field-delimiter'=';'
> );
>
> CREATE TABLE sink_table (
>   face_id STRING,
>   p_id STRING
> ) WITH (
>  'connector' = 'filesystem',
>  'path' = 'E:\label_file2.csv',
>   'csv.disable-quote-character' ='true',
>  'format' = 'csv',
>  'csv.field-delimiter'=';'
> );
>
> INSERT INTO sink_table SELECT face_id,p_id FROM source_table;
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink1.8??????????lettuce

2020-08-07 文章 wujunxi
flinkIOredislettuce??
Caused by: io.lettuce.core.RedisException: Cannot retrieve initial cluster 
partitions from initial URIs [RedisURI [host='XXX', port=XXX]]
at 
io.lettuce.core.cluster.RedisClusterClient.loadPartitions(RedisClusterClient.java:808)
at 
io.lettuce.core.cluster.RedisClusterClient.initializePartitions(RedisClusterClient.java:761)
at 
io.lettuce.core.cluster.RedisClusterClient.connectClusterImpl(RedisClusterClient.java:500)
at 
io.lettuce.core.cluster.RedisClusterClient.connect(RedisClusterClient.java:339)
at 
io.lettuce.core.cluster.RedisClusterClient.connect(RedisClusterClient.java:316)
at 
com.sangfor.abdi.AsyncRedis$2.open(AsyncRedis.java:67)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:163)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)



??flink1.10 


关于Flink1.11 CSV Format的一些疑问

2020-08-07 文章 WeiXubin
Hi,我在Flink1.11版本,使用filesystem connector的时候,读取csv文件并输出到另外一个csv文件遇到了些问题,问题如下:
问题1:sink 的path指定具体输出文件名,但是输出的结果是 文件夹形式
问题2:在flink1.11的文档中没有找到csv的 ignore-first-line 忽略第一行这个配置

测试数据
11101322000220200517145507667060666706;9
11101412000220200515163257249700624970;9
11101412010220200514163709315410631541;9
11101712050220200516173624737150673715;9
11101312000220200516184127322880632288;9

CREATE TABLE source_table (
  face_id STRING,
  p_id STRING
) WITH (
 'connector' = 'filesystem',
 'path' = 'E:\label_file.csv',
 'format' = 'csv',
 'csv.field-delimiter'=';'
);

CREATE TABLE sink_table (
  face_id STRING,
  p_id STRING
) WITH (
 'connector' = 'filesystem',
 'path' = 'E:\label_file2.csv',
  'csv.disable-quote-character' ='true',
 'format' = 'csv',
 'csv.field-delimiter'=';'
);

INSERT INTO sink_table SELECT face_id,p_id FROM source_table;




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 使用flink-sql-connector-elasticsearch6_2.11_1.10.0.jar执行任务失败

2020-08-07 文章 Shengkai Fang
不好意思,在es6上也进行了相应的修复。
但似乎是一个相同的问题。

Shengkai Fang  于2020年8月7日周五 下午7:52写道:

> 你的意思是不是用1.10的es包没问题,但是用1.11的有问题?
> 似乎是一个bug,在1.11 es7上已经修复了这个bug,但是没有对于es6进行修复。
> 参见[1] https://issues.apache.org/jira/browse/FLINK-18006?filter=-2
>
> 费文杰  于2020年8月7日周五 下午3:56写道:
>
>>
>> 以下是我的代码:
>> import com.alibaba.fastjson.JSONObject;
>> import lombok.extern.slf4j.Slf4j;
>> import org.apache.flink.api.common.functions.RuntimeContext;
>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
>> import
>> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
>> import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
>> import org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.action.index.IndexRequest;
>> import org.apache.flink.elasticsearch6.shaded.org
>> .elasticsearch.client.Requests;
>> import org.apache.http.HttpHost;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>> import org.apache.kafka.clients.consumer.ConsumerConfig;
>> import java.util.*;
>> @Slf4j
>> public class TrackToEsJob {
>> public static void main(String[] args) throws Exception {
>> // 获取执行环境
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> // 设置并发数,一般跟机器核数保持一致
>> env.setParallelism(1);
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> final Properties kafkaProps = new Properties();
>> kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
>> 192.168.1.100:9092");
>> kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
>> "track-flink-group");
>> kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>> "false");
>> FlinkKafkaConsumer flinkKafkaConsumer = new
>> FlinkKafkaConsumer("bi-track-log-client",
>> new SimpleStringSchema(), kafkaProps);
>> // 默认从最近开始消费
>> flinkKafkaConsumer.setStartFromLatest();
>> // 1、kafka来源stream,使用跟分区数量一致的并行度
>> int partitionCount = 1;
>> DataStream sourceStream =
>> env.addSource(flinkKafkaConsumer)
>> .setParallelism(1)
>> .name("source_kafka_trackTopics")
>> .uid("source_kafka_trackTopics");
>> List httpHosts = new ArrayList<>();
>> httpHosts.add(new HttpHost("192.168.1.101", 9200, "http"));
>> ElasticsearchSink.Builder esSinkBuilder = new
>> ElasticsearchSink.Builder(httpHosts, new
>> ElasticsearchSinkFunction() {
>>   public IndexRequest createIndexRequest(String o) {
>> JSONObject jsonObject = JSONObject.parseObject(o);
>> System.out.println("saving data"
>> +jsonObject.toJSONString());
>> Map esData = new HashMap<>();
>> esData.put("appId",jsonObject.getString("appId"));
>> esData.put("indexKey",jsonObject.getString("indexKey"));
>>
>> esData.put("indexValue",jsonObject.getString("indexValue"));
>> return Requests.indexRequest()
>> .index("bi_track_log_es")
>> .type("doc")
>> .source(esData);
>> }
>> @Override
>> public void process(String o, RuntimeContext runtimeContext,
>> RequestIndexer requestIndexer) {
>> requestIndexer.add(createIndexRequest(o));
>> }
>> });
>> esSinkBuilder.setBulkFlushMaxActions(1);
>> sourceStream.addSink(esSinkBuilder.build());
>> env.execute("TrackToEsJob");
>> }我认为flink-sql-connector-elasticsearch6
>> 应该是包含flink-connector-elasticsearch6的关系,若换为引入flink-connector-elasticsearch6_2.11_1.10.0,任务就可以正常执行,现在就有点搞不清了,希望得到指导。谢谢!
>>
>> 以下为报错信息:
>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>> Cannot instantiate user function.
>> at
>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:144)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: 

Re: 使用flink-sql-connector-elasticsearch6_2.11_1.10.0.jar执行任务失败

2020-08-07 文章 Shengkai Fang
你的意思是不是用1.10的es包没问题,但是用1.11的有问题?
似乎是一个bug,在1.11 es7上已经修复了这个bug,但是没有对于es6进行修复。
参见[1] https://issues.apache.org/jira/browse/FLINK-18006?filter=-2

费文杰  于2020年8月7日周五 下午3:56写道:

>
> 以下是我的代码:
> import com.alibaba.fastjson.JSONObject;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.api.common.functions.RuntimeContext;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
> import
> org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
> import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
> import org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.action.index.IndexRequest;
> import org.apache.flink.elasticsearch6.shaded.org
> .elasticsearch.client.Requests;
> import org.apache.http.HttpHost;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import java.util.*;
> @Slf4j
> public class TrackToEsJob {
> public static void main(String[] args) throws Exception {
> // 获取执行环境
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 设置并发数,一般跟机器核数保持一致
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> final Properties kafkaProps = new Properties();
> kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.1.100:9092");
> kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> "track-flink-group");
> kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> "false");
> FlinkKafkaConsumer flinkKafkaConsumer = new
> FlinkKafkaConsumer("bi-track-log-client",
> new SimpleStringSchema(), kafkaProps);
> // 默认从最近开始消费
> flinkKafkaConsumer.setStartFromLatest();
> // 1、kafka来源stream,使用跟分区数量一致的并行度
> int partitionCount = 1;
> DataStream sourceStream = env.addSource(flinkKafkaConsumer)
> .setParallelism(1)
> .name("source_kafka_trackTopics")
> .uid("source_kafka_trackTopics");
> List httpHosts = new ArrayList<>();
> httpHosts.add(new HttpHost("192.168.1.101", 9200, "http"));
> ElasticsearchSink.Builder esSinkBuilder = new
> ElasticsearchSink.Builder(httpHosts, new
> ElasticsearchSinkFunction() {
>   public IndexRequest createIndexRequest(String o) {
> JSONObject jsonObject = JSONObject.parseObject(o);
> System.out.println("saving data"
> +jsonObject.toJSONString());
> Map esData = new HashMap<>();
> esData.put("appId",jsonObject.getString("appId"));
> esData.put("indexKey",jsonObject.getString("indexKey"));
>
> esData.put("indexValue",jsonObject.getString("indexValue"));
> return Requests.indexRequest()
> .index("bi_track_log_es")
> .type("doc")
> .source(esData);
> }
> @Override
> public void process(String o, RuntimeContext runtimeContext,
> RequestIndexer requestIndexer) {
> requestIndexer.add(createIndexRequest(o));
> }
> });
> esSinkBuilder.setBulkFlushMaxActions(1);
> sourceStream.addSink(esSinkBuilder.build());
> env.execute("TrackToEsJob");
> }我认为flink-sql-connector-elasticsearch6
> 应该是包含flink-connector-elasticsearch6的关系,若换为引入flink-connector-elasticsearch6_2.11_1.10.0,任务就可以正常执行,现在就有点搞不清了,希望得到指导。谢谢!
>
> 以下为报错信息:
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:144)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: unexpected exception type
> at
> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
> 

Re:回复: Re: Table Api执行sql如何设置sink并行度

2020-08-07 文章 wldd
hi,Shengkai Fang,Cayden chen:


谢谢解答,这个DISCUSS应该可以解决我的问题










--

Best,
wldd





在 2020-08-07 16:56:30,"Cayden chen" <1193216...@qq.com> 写道:
>hi:
> 你可以把sink 
>之前的table转成datastream,然后改变全局的为你想设置的sink并行度,再dataStream.addSink(sink)(由于这里会取全局并行度并给算子设置),
> 之后把全局并行度改回去。理论上这个方法可以为每个算子设置单独并行度
>
>
>
>
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2020年8月7日(星期五) 下午4:35
>收件人:"user-zh"
>主题:Re: Re: Table Api执行sql如何设置sink并行度
>
>
>
>hi, 现在仅支持全局设置,现在并不支持对于单个sink并行度的设置。
>对于单个sink的设置社区正在讨论中,见
>https://www.mail-archive.com/dev@flink.apache.org/msg40251.html
>wldd 
> hi:
> 这应该是对应的flink-conf.yaml的配置,这是一个全局的配置,并不能指定sink的并行度
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best,
> wldd
>
>
>
>
>
> 在 2020-08-07 15:26:34,"Shengkai Fang"  hi
> 不知道 这个能不能满足你的要求
> 
> tEnv.getConfig().addConfiguration(
> new Configuration()
> .set(CoreOptions.DEFAULT_PARALLELISM, 128)
> );
> 
> 参见文档:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
> 
> 
> wldd  
>  hi,all:
>  请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>  --
> 
>  Best,
>  wldd
>


回复: Flink 1.11.1 on k8s 如何配置hadoop

2020-08-07 文章 Matt Wang
官网的镜像只包含 Flink 相关的内容,如果需要连接 HDFS,你需要将 Hadoop 相关包及配置打到镜像中


--

Best,
Matt Wang


在2020年08月7日 12:49,caozhen 写道:
顺手贴一下flink1.11.1的hadoop集成wiki:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html

根据官网说不再提供flink-shaded-hadoop-2-uber。并给出以下两种解决方式

1、建议使用HADOOP_CLASSPATH加载hadoop依赖
2、或者将hadoop依赖放到flink客户端lib目录下

*我在用1.11.1 flink on
yarn时,使用的是第二种方式,下载hadoop-src包,将一些常用依赖拷贝到lib目录下。(这可能会和你的mainjar程序发生类冲突问题,需要调试)

我觉得目前这种方式不好,只是暂时解决问题。还是应该有flink-shaded-hadoop包,正在尝试打包,有些问题还没完全解决。
*



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql csv格式字段分隔符问题

2020-08-07 文章 Leonard Xu
Hi

试下这个 'csv.line-delimiter' = U&'\\0009'

注意后面不要再加单引号了

祝好
Leonard

> 在 2020年8月7日,16:27,kandy.wang  写道:
> 
> 设置 'csv.field-delimiter'='\t'
> ,查询数据的时候,报错:org.apache.flink.table.api.ValidationException: Option 
> 'csv.field-delimiter' must be a string with single character, but was: \t
> 请问,该怎么搞?



??????flink elasticsearch sink ????????????

2020-08-07 文章 Cayden chen
hi. source??




----
??: 
   "user-zh"



?????? Re: Table Api????sql????????sink??????

2020-08-07 文章 Cayden chen
hi:
 sink 
??tabledatastreamsink??dataStream.addSink(sink)(??),
 ??




----
??: 
   "user-zh"

https://www.mail-archive.com/dev@flink.apache.org/msg40251.html
wldd https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
 
 
 wldd 

Re: Re: Table Api执行sql如何设置sink并行度

2020-08-07 文章 Shengkai Fang
hi, 现在仅支持全局设置,现在并不支持对于单个sink并行度的设置。
 对于单个sink的设置社区正在讨论中,见
https://www.mail-archive.com/dev@flink.apache.org/msg40251.html
wldd  于2020年8月7日周五 下午3:41写道:

> hi:
>   这应该是对应的flink-conf.yaml的配置,这是一个全局的配置,并不能指定sink的并行度
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best,
> wldd
>
>
>
>
>
> 在 2020-08-07 15:26:34,"Shengkai Fang"  写道:
> >hi
> >不知道 这个能不能满足你的要求
> >
> >tEnv.getConfig().addConfiguration(
> >new Configuration()
> >.set(CoreOptions.DEFAULT_PARALLELISM, 128)
> >);
> >
> >参见文档:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
> >
> >
> >wldd  于2020年8月7日周五 下午3:16写道:
> >
> >> hi,all:
> >>   请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >> Best,
> >> wldd
>


flink sql csv格式字段分隔符问题

2020-08-07 文章 kandy.wang
 设置 'csv.field-delimiter'='\t'
,查询数据的时候,报错:org.apache.flink.table.api.ValidationException: Option 
'csv.field-delimiter' must be a string with single character, but was: \t
请问,该怎么搞?

使用flink-sql-connector-elasticsearch6_2.11_1.10.0.jar执行任务失败

2020-08-07 文章 费文杰

以下是我的代码:
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import 
org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.index.IndexRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.Requests;
import org.apache.http.HttpHost;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.*;
@Slf4j
public class TrackToEsJob {
public static void main(String[] args) throws Exception {
// 获取执行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发数,一般跟机器核数保持一致
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.1.100:9092");
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"track-flink-group");
kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
FlinkKafkaConsumer flinkKafkaConsumer = new 
FlinkKafkaConsumer("bi-track-log-client",
new SimpleStringSchema(), kafkaProps);
// 默认从最近开始消费
flinkKafkaConsumer.setStartFromLatest();
// 1、kafka来源stream,使用跟分区数量一致的并行度
int partitionCount = 1;
DataStream sourceStream = env.addSource(flinkKafkaConsumer)
.setParallelism(1)
.name("source_kafka_trackTopics")
.uid("source_kafka_trackTopics");
List httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("192.168.1.101", 9200, "http"));
ElasticsearchSink.Builder esSinkBuilder = new 
ElasticsearchSink.Builder(httpHosts, new ElasticsearchSinkFunction() {  
 
  public IndexRequest createIndexRequest(String o) {
JSONObject jsonObject = JSONObject.parseObject(o);
System.out.println("saving data" +jsonObject.toJSONString());
Map esData = new HashMap<>();
esData.put("appId",jsonObject.getString("appId"));
esData.put("indexKey",jsonObject.getString("indexKey"));
esData.put("indexValue",jsonObject.getString("indexValue"));
return Requests.indexRequest()
.index("bi_track_log_es")
.type("doc")
.source(esData);
}
@Override
public void process(String o, RuntimeContext runtimeContext, 
RequestIndexer requestIndexer) {
requestIndexer.add(createIndexRequest(o));
}
});
esSinkBuilder.setBulkFlushMaxActions(1);
sourceStream.addSink(esSinkBuilder.build());
env.execute("TrackToEsJob");
}我认为flink-sql-connector-elasticsearch6 
应该是包含flink-connector-elasticsearch6的关系,若换为引入flink-connector-elasticsearch6_2.11_1.10.0,任务就可以正常执行,现在就有点搞不清了,希望得到指导。谢谢!

以下为报错信息:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:144)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at 

Re:Re: Table Api执行sql如何设置sink并行度

2020-08-07 文章 wldd
hi:
  这应该是对应的flink-conf.yaml的配置,这是一个全局的配置,并不能指定sink的并行度













--

Best,
wldd





在 2020-08-07 15:26:34,"Shengkai Fang"  写道:
>hi
>不知道 这个能不能满足你的要求
>
>tEnv.getConfig().addConfiguration(
>new Configuration()
>.set(CoreOptions.DEFAULT_PARALLELISM, 128)
>);
>
>参见文档:https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
>
>wldd  于2020年8月7日周五 下午3:16写道:
>
>> hi,all:
>>   请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best,
>> wldd


Re: 使用StreamTableEnvironment.createTemporarySystemFunction注册UD(T)F异常

2020-08-07 文章 Benchao Li
Hi,

1.11中引入的新的udf注册接口,使用的是新的udf类型推断机制,所以会有上面的问题。
你可以参考新的udf类型推导文档[1] 来写一下type hint试试

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#type-inference

zz zhang  于2020年8月7日周五 上午11:00写道:

> 执行如下代码提示异常,改为旧方法StreamTableEnvironment.registerFunction执行正常,
> Flink version: 1.11.1
>
> package com.test;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.ScalarFunction;
> import org.apache.flink.table.functions.TableFunction;
> import org.apache.flink.types.Row;
>
>
> public class TestUTDFOk {
> public static class UDTF extends TableFunction {
>
> public void eval(String input) {
> Row row = new Row(3);
> row.setField(0, input);
> row.setField(1, input.length());
> row.setField(2, input +  2);
> collect(row);
> }
> }
>
> public static  class UDF extends ScalarFunction {
> public String eval(Row row, Integer index) {
> try {
> return String.valueOf(row.getField(index));
> } catch (Exception e) {
> throw e;
> }
> }
> }
>
> public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(
> env,
> EnvironmentSettings.newInstance().useBlinkPlanner().build());
> //tEnv.registerFunction("udtf", new UDTF());
> //tEnv.registerFunction("udf", new UDF());
> tEnv.createTemporarySystemFunction("udtf", new UDTF());
> tEnv.createTemporarySystemFunction("udf", new UDF());
>
> tEnv.createTemporaryView("source", tEnv.fromValues("a", "b",
> "c").as("f0"));
> String sinkDDL = "create table sinkTable ("
> + "f0 String"
> + ", x String"
> + ", y String"
> + ", z String"
> + ") with ("
> + "'connector.type' = 'filesystem',"
> + "'format.type' = 'csv',"
> + "'connector.path' =
> 'F:\\workspace\\douyu-git\\bd-flink\\core\\logs\\a.csv'"
> + ")";
> String udtfCall = "insert into sinkTable SELECT S.f0"
> + ", udf(f1, 0) as x"
> + ", udf(f1, 1) as y"
> + ", udf(f1, 2) as z"
> + " FROM source as S, LATERAL TABLE(udtf(f0)) as T(f1)";
>
> tEnv.executeSql(sinkDDL);
> tEnv.executeSql(udtfCall);
> }
> }
>
> 异常如下:
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> An error occurred in the type inference logic of function 'udf'.
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at com.test.TestUTDFOk.main(TestUTDFOk.java:64)
> Caused by: org.apache.flink.table.api.ValidationException: An error
> occurred in the type inference logic of function 'udf'.
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:165)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:148)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
> at java.util.Optional.flatMap(Optional.java:241)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:99)
> at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:73)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1303)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1288)
> at
> 

Re: Table Api执行sql如何设置sink并行度

2020-08-07 文章 Shengkai Fang
hi
不知道 这个能不能满足你的要求

tEnv.getConfig().addConfiguration(
new Configuration()
.set(CoreOptions.DEFAULT_PARALLELISM, 128)
);

参见文档:https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html


wldd  于2020年8月7日周五 下午3:16写道:

> hi,all:
>   请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best,
> wldd


flink elasticsearch sink 反压如何解决

2020-08-07 文章 大头糯米丸子
现在在es端做了一些索引和写es的参数优化,flink在高峰期可以做些什么,有没有什么限流的办法,除了自带的动态反压以外


Table Api执行sql如何设置sink并行度

2020-08-07 文章 wldd
hi,all:
  请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度














--

Best,
wldd

Re: Flink 1.10 on Yarn

2020-08-07 文章 xuhaiLong
感谢回复!我这边的确是这个bug 引起的


On 8/7/2020 13:43,chenkaibit wrote:
hi xuhaiLong,看日志发生的 checkpoint nullpointer 是个已知的问题,具体可以查看下面两个jira。
你用的jdk版本是多少呢?目前发现使用 jdk8_40/jdk8_60 + flink-1.10 会出现 checkpoint 
nullpointer,可以把jdk升级下版本试一下
https://issues.apache.org/jira/browse/FLINK-18196
https://issues.apache.org/jira/browse/FLINK-17479




在 2020-08-07 12:50:23,"xuhaiLong"  写道:

sorry,我添加错附件了


是的,taskmanager.memory.jvm-metaspace.size 为默认配置
On 8/7/2020 11:43,Yangze Guo wrote:
日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么?

Best,
Yangze Guo

On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong  wrote:



Hi


场景:1 tm 三个slot,run了三个job


三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 
`java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has 
occurred. This can mean two things: either the job requires a larger size of 
JVM metaspace to load classes or there is a class loading leak. In the first 
case 'taskmanager.memory.jvm-metaspace.size' configuration option should be 
increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak which has to be 
investigated and fixed. The task executor has to be shutdown...
`


附件为部分异常信息


疑问:
1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题)
2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启?


感谢~~~
从网易邮箱大师发来的云附件
08-07error.txt(730.4KB,2020年8月22日 11:37 到期)
下载


Re:flink-jdbc_2.11:1.11.1依赖 找不到

2020-08-07 文章 RS
Hi,
找下这种的 flink-connector-jdbc_2.12-1.11.1.jar

















在 2020-08-07 14:47:29,"lydata"  写道:
>flink-jdbc_2.11:1.11.1依赖 在 https://mvnrepository.com/  找不到 ,是不是没有上传?  


flink-jdbc_2.11:1.11.1依赖 找不到

2020-08-07 文章 lydata
flink-jdbc_2.11:1.11.1依赖 在 https://mvnrepository.com/  找不到 ,是不是没有上传?  


Re: flinksql连接kafka,数据里面有个字段名字是time

2020-08-07 文章 leiyanrui
好的 我去看下 谢谢哈



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flinksql连接kafka,数据里面有个字段名字是time

2020-08-07 文章 Benchao Li
1.10的确是存在一个这样的bug[1],这个已经在1.10.1和1.11.0中修复了,可以尝试下1.10.1或者1.11.1版本。

[1] https://issues.apache.org/jira/browse/FLINK-16068

leiyanrui <1150693...@qq.com> 于2020年8月7日周五 下午2:32写道:

> 1.10
> 
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: flinksql连接kafka,数据里面有个字段名字是time

2020-08-07 文章 leiyanrui
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flinksql连接kafka,数据里面有个字段名字是time

2020-08-07 文章 leiyanrui
1.10
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flinksql连接kafka,数据里面有个字段名字是time

2020-08-07 文章 Benchao Li
使用的是Flink哪个版本呢?以及最好也提供一下异常信息

leiyanrui <1150693...@qq.com> 于2020年8月7日周五 下午2:18写道:

> CREATE TABLE table1 (
> bg BIGINT,
> user_source BIGINT,
> bossid   BIGINT,
> geekid BIGINT,
> qq_intent BIGINT,
> phone_intent BIGINT,
> wechat_intent BIGINT,
> `time` BIGINT,
> t as to_timestamp(from_unixtime(__ts,'-MM-dd HH:mm:ss')),
> watermark for t as t - interval '5' second
> ) WITH (
> ...
> );
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: 怎么把自己从list中取消,我想换个邮箱

2020-08-07 文章 Benchao Li
Hi,

取消订阅中文用户邮件列表,可以发送邮件到:user-zh-unsubscr...@flink.apache.org
更多邮件列表信息,可以参考[1]

[1] https://flink.apache.org/community.html#mailing-lists

何宗谨  于2020年8月7日周五 下午2:14写道:

>
>
>
>

-- 

Best,
Benchao Li


Re: flinksql连接kafka,数据里面有个字段名字是time

2020-08-07 文章 leiyanrui
CREATE TABLE table1 (
bg BIGINT,
user_source BIGINT,
bossid   BIGINT,
geekid BIGINT,
qq_intent BIGINT,
phone_intent BIGINT,
wechat_intent BIGINT,
`time` BIGINT,
t as to_timestamp(from_unixtime(__ts,'-MM-dd HH:mm:ss')),
watermark for t as t - interval '5' second
) WITH (
...
);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flinksql连接kafka,数据里面有个字段名字是time

2020-08-07 文章 Benchao Li
可以提供一下你使用的Flink版本以及DDL么?

leiyanrui <1150693...@qq.com> 于2020年8月7日周五 上午11:58写道:

> 使用flinksql连接kafka,kafka的数据格式内部有个字段叫time,我在create
> table的时候将time字段加了反单引号还是不行,报错,有什么别的方法吗
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


怎么把自己从list中取消,我想换个邮箱

2020-08-07 文章 何宗谨





Re: 回复:flink1.11 DDL定义kafka source报错

2020-08-07 文章 chengyanan1...@foxmail.com
你好:
你使用的是Flink 1.11版本,但是你的建表语句还是用的老版本,建议更换新版本的建表语句后再试一下
参考如下:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html






chengyanan1...@foxmail.com
 
发件人: 阿华田
发送时间: 2020-08-07 14:03
收件人: user-zh@flink.apache.org
主题: 回复:flink1.11 DDL定义kafka source报错
错误信息:
Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot generate StreamGraph.
at 
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59)
 
 
 
 
代码:
public class DDLSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
String  create_sql=
"create table test\n" +
"(\n" +
"name varchar,\n" +
"city varchar\n" +
")with (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal',\n" +
"'connector.topic' = 'test',\n" +
"'connector.properties.0.key' = 'group.id',\n" +
"'connector.properties.0.value' = 'test_gd',\n" +
"'connector.properties.1.key' = 'bootstrap.servers',\n" +
"'connector.properties.1.value' = '127.0.0.1:9092',\n" +
"'connector.property-version' = '1',\n" +
"'connector.startup-mode' = 'latest-offset',\n" +
"'format.type' = 'json',\n" +
"'format.property-version' = '1',\n" +
"'format.derive-schema' = 'true',\n" +
"'update-mode' = 'append')";
 
tableEnv.executeSql(create_sql);
Table table = tableEnv.sqlQuery("select name from test ");
TableSchema schema = table.getSchema();
System.out.println(schema);
DataStream> tuple2DataStream = 
tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print();
tableEnv.execute("test");
//bsEnv.execute("fff");
}
}
 
 
 
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
 
 
在2020年08月7日 13:49,阿华田 写道:
代码如下
 
 
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
 


Re: flink1.11 DDL定义kafka source报错

2020-08-07 文章 chengyanan1...@foxmail.com
你好 :
图片是看不到的,建议直接粘贴文本再发送一次





chengyanan1...@foxmail.com
 
发件人: 阿华田
发送时间: 2020-08-07 13:49
收件人: user-zh
主题: flink1.11 DDL定义kafka source报错
代码如下

阿华田
a15733178...@163.com
签名由 网易邮箱大师 定制 



回复:flink1.11 DDL定义kafka source报错

2020-08-07 文章 阿华田
错误信息:
Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot generate StreamGraph.
at 
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59)




代码:
public class DDLSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
String  create_sql=
"create table test\n" +
"(\n" +
"name varchar,\n" +
"city varchar\n" +
")with (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal',\n" +
"'connector.topic' = 'test',\n" +
"'connector.properties.0.key' = 'group.id',\n" +
"'connector.properties.0.value' = 'test_gd',\n" +
"'connector.properties.1.key' = 'bootstrap.servers',\n" +
"'connector.properties.1.value' = '127.0.0.1:9092',\n" +
"'connector.property-version' = '1',\n" +
"'connector.startup-mode' = 'latest-offset',\n" +
"'format.type' = 'json',\n" +
"'format.property-version' = '1',\n" +
"'format.derive-schema' = 'true',\n" +
"'update-mode' = 'append')";

tableEnv.executeSql(create_sql);
Table table = tableEnv.sqlQuery("select name from test ");
TableSchema schema = table.getSchema();
System.out.println(schema);
DataStream> tuple2DataStream = 
tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print();
tableEnv.execute("test");
//bsEnv.execute("fff");
}
}



| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年08月7日 13:49,阿华田 写道:
代码如下


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制