Re: (无主题)

2021-06-21 Thread 刘建刚
嗨,你使用的是session还是perJob的作业?哪个flink版本?有详细的日志吗?一般不退户,可能是master卡在了哪里,比如我们遇到过卡在handler或者异步执行有问题。

田磊  于2021年6月20日周日 上午11:14写道:

>
> 我用flink跑hbase的数据,flink的界面显示任务已经finished,正在running的任务为0。而yarn的界面显示正在running的状态,一直都结束不了,需要手动kill,是什么情况啊。
>
>
> | |
> totorobabyfans
> |
> |
> 邮箱:totorobabyf...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Re:Re: (无主题)

2021-06-21 Thread 田磊
用的是perjob的模式,版本是1.12.1,因为我读的是hbase的数据,用的流的模式,是不是数据读完之后任务不会结束啊。下面是我的提交命令:./flink-1.12.1/bin/flink
 run -t yarn-per-job -m yarn-cluster -ynm historyPhotosConstientHandle -ys 8 
-yjm 4096 -ytm 10240 -c 
com.chinaunicom.audit.photo.history.handle.AuditPhotoHistoryNumberHandleApp 
/mnt/sd02/tglf005/devsoft/data-audit-photo-history-handle-2.1.jar,我发现如果指定了perjob参数的话,那么你在命令中指定的slot数量不起作用,我代码的最大并行度为16,按理来说应该分配2个taskmanager,但是却是分配了16个,每个taskmanager1个slot。如果不指定
 -t yarn-per-job的话,那么提交后就显示的是2个taskmanager,不是很明白,小白一个,望大家指点一二。
在 2021-06-21 18:26:33,"刘建刚"  写道:
>嗨,你使用的是session还是perJob的作业?哪个flink版本?有详细的日志吗?一般不退户,可能是master卡在了哪里,比如我们遇到过卡在handler或者异步执行有问题。
>
>田磊  于2021年6月20日周日 上午11:14写道:
>
>>
>> 我用flink跑hbase的数据,flink的界面显示任务已经finished,正在running的任务为0。而yarn的界面显示正在running的状态,一直都结束不了,需要手动kill,是什么情况啊。
>>
>>
>> | |
>> totorobabyfans
>> |
>> |
>> 邮箱:totorobabyf...@163.com
>> |
>>
>> 签名由 网易邮箱大师 定制


Re:?????? flink sql??????????????????

2021-06-21 Thread Michael Ran
k8s ??
?? 2021-06-16 18:22:29??"??" <809097...@qq.com> ??
>FlinkSql WebIDE?? 
>FlinkSQLSQL??SqlCli??
> https://github.com/DataLinkDC/dlink
>
>
>
>
>--  --
>??: "todd": 2021??6??16??(??) 5:48
>??: "user-zh": Re: flink sql??
>
>
>
>Flink 
>apihttps://github.com/todd5167/flink-spark-submiter??
>
>spi
>
>??
>   - ??Flink 
>client
>??
>   - ??
>   - ??
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


中文教程更新不及时问题

2021-06-21 Thread pang fan
大家好,

我是Flink的初学者,在跟着
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/try-flink/table_api/
官方教程(中文版)学习时发现很多中文教程都没有翻译掉,但是去看PR记录又发现很多都已经提了PR但是一直没有合并到主分支,里面很多PR都是几个月前的提的,后来好久都没有更新。

请问现在还有人在跟这些问题吗?如果有,可以更新下JIRA上的工单状态和代码PR状态,这样有需要我们也能申领工单给社区做一些贡献。


谢谢!
Kevin Fan


Re:KafkaSource检查点的end to end duration较长(1min)原因

2021-06-21 Thread 熊云昆
我觉得问题是不是出在反压上面,你的job是不是有反压?

















在 2021-06-21 10:38:36,"yidan zhao"  写道:
>如题,我任务的检查点(对齐检查点)大多数时间成功,偶现失败。目前针对超时类失败做了分析,存在部分特点,希望大佬们分析下原因。
>
>(1)KafkaSouce的e2e时间达到1min+,正常xxx
>ms就结束了。同时对应e2e达到1min+的情况下,sync、async、alignment、start delay等都为0,偶尔几个x
>ms的。   这个不是很明白什么情况会是这样呢?
>
>(2)对于部分task,start delay假设为31s,alignment duration为43s,但是processed
>data才xxxKB(几十到几百KB)。从我任务的正常处理情况对比来说,这个数据量几乎不需要时间就能处理完。
>
>(3)有个window算子(前边是hash进来,keyBy的),检查点时间1m14s。然后看了下subtask的检查点,大多数都是2s内完成,其中1个subtask耗时1m14s。这个subtask对应的start-delay为1m13s。
> 这个就更奇怪了,首先前边是keyBy,所以是hash分区方式进入window算子。那么,对于正常subtask0,其start_delay为1s,那么subtask0收到第一个barrier耗时1s,假设这个barrier来自上游算子的
>0 
>号子任务(preTask0)。那么preTask0既然已经发送了barrier,对于window任务的异常subtask就应该也能很快收到barrier,可是实际却耗时1min14s(start
>delay)。


?????? flink-1.13.1 sql error

2021-06-21 Thread guoyb
??






--  --
??: 
   "user-zh"

http://apache-flink.147419.n8.nabble.com/

flink-elasticsearch6-sink question

2021-06-21 Thread 闫杜峰
i have two questions.

1. when i add elasticserach host and port,i random write host, but not report 
error. for example

List httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http"));

2. when i write conrrect elasticsearch host and port, but no response, also not 
create index in elasticsearch


你好,我在使用flink-elasticsearch6-sink的功能,但是不能执行成功,存在以下两个问题。
1、当我随便写elasticsearch的hostname时,并不会有任何报错信息。例如
List httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http"));
填写的sdfsdfsf是不存在的。


2、当我使用docker启动一个elasticsearch时,程序并不会写入到elasticsearch中 ,但是也不报错。以下是代码附件。

package com.dufeng.test.connectors;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ElasticsearchSinkExample1 {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);

DataStream source =  env.generateSequence(0, 10);

List httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http"));
httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));
httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));

ElasticsearchSink.Builder esSinkBuilder =
new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction() {
public IndexRequest createIndexRequest(String 
element) {
Map json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}


@Override
public void process(Long aLong, RuntimeContext 
runtimeContext, RequestIndexer requestIndexer) {

requestIndexer.add(createIndexRequest(String.valueOf(aLong)));
}
}
);

esSinkBuilder.setBulkFlushMaxActions(1);

source.addSink(esSinkBuilder.build());

env.execute("Elasticsearch 6.x end to end sink test example");

}
}


flink-elasticsearch6-sink question

2021-06-21 Thread 闫杜峰
你好,我在使用flink-elasticsearch6-sink的功能,但是不能执行成功,存在以下两个问题。
1、当我随便写elasticsearch的hostname时,并不会有任何报错信息。例如
List httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http"));
填写的sdfsdfsf是不存在的。


2、当我使用docker启动一个elasticsearch时,程序并不会写入到elasticsearch中 ,但是也不报错。以下是代码附件。

package com.dufeng.test.connectors;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ElasticsearchSinkExample1 {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);

DataStream source =  env.generateSequence(0, 10);

List httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("sdfsdfsf", 9200, "http"));
httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));
httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));

ElasticsearchSink.Builder esSinkBuilder =
new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction() {
public IndexRequest createIndexRequest(String 
element) {
Map json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}


@Override
public void process(Long aLong, RuntimeContext 
runtimeContext, RequestIndexer requestIndexer) {

requestIndexer.add(createIndexRequest(String.valueOf(aLong)));
}
}
);

esSinkBuilder.setBulkFlushMaxActions(1);

source.addSink(esSinkBuilder.build());

env.execute("Elasticsearch 6.x end to end sink test example");

}
}


Re: KafkaSource检查点的end to end duration较长(1min)原因

2021-06-21 Thread yidan zhao
存在间断性周期性反压。  因为有window算子,时间窗口触发。

熊云昆  于2021年6月21日周一 下午11:03写道:
>
> 我觉得问题是不是出在反压上面,你的job是不是有反压?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-06-21 10:38:36,"yidan zhao"  写道:
> >如题,我任务的检查点(对齐检查点)大多数时间成功,偶现失败。目前针对超时类失败做了分析,存在部分特点,希望大佬们分析下原因。
> >
> >(1)KafkaSouce的e2e时间达到1min+,正常xxx
> >ms就结束了。同时对应e2e达到1min+的情况下,sync、async、alignment、start delay等都为0,偶尔几个x
> >ms的。   这个不是很明白什么情况会是这样呢?
> >
> >(2)对于部分task,start delay假设为31s,alignment duration为43s,但是processed
> >data才xxxKB(几十到几百KB)。从我任务的正常处理情况对比来说,这个数据量几乎不需要时间就能处理完。
> >
> >(3)有个window算子(前边是hash进来,keyBy的),检查点时间1m14s。然后看了下subtask的检查点,大多数都是2s内完成,其中1个subtask耗时1m14s。这个subtask对应的start-delay为1m13s。
> > 这个就更奇怪了,首先前边是keyBy,所以是hash分区方式进入window算子。那么,对于正常subtask0,其start_delay为1s,那么subtask0收到第一个barrier耗时1s,假设这个barrier来自上游算子的
> >0 
> >号子任务(preTask0)。那么preTask0既然已经发送了barrier,对于window任务的异常subtask就应该也能很快收到barrier,可是实际却耗时1min14s(start
> >delay)。


Re: standalone K8S 如何查看 TaskMananger 的 gc.log ?

2021-06-21 Thread Yang Wang
你可以增加如下参数来打开gc日志,并且写入到/opt/flink/log目录下

env.java.opts: -verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:ParallelGCThreads=4 -Xss512k
env.java.opts.jobmanager: -Xloggc:/opt/flink/log/jobmanager-gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M
env.java.opts.taskmanager: -Xloggc:/opt/flink/log/taskmanager-gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M


Best,
Yang

WeiXubin <18925434...@163.com> 于2021年6月18日周五 下午2:27写道:

> 请问 *standalone K8S* 部署模式为 *Deploy Application Cluster* 在哪获取查看/怎么配置
> TaskMananger 的 *gc.log* 日志?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: question about flink on k8s per-job mode

2021-06-21 Thread Yang Wang
如果不严格区分名字的话,application模式和per-job模式在K8s环境下是没有差异的,都是在JM完成的任务提交
目前的K8s application mode在高可用开启的情况下,也只能支持提交一个任务

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster
[2].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#application-mode

Best,
Yang


Best,
Yang

at003  于2021年6月17日周四 下午4:51写道:

> 哈喽,各位专家/大神:
>
> 为啥flink官方文档说明了flink on k8s 还有 native k8s都不支持 per-job mode 呢,但是搜索可以搜到好多教程。。。
>
> 谢谢
>
>
>
>


rocksdb对比filestatebackend

2021-06-21 Thread yidan zhao
如题,我生产中目前一直都是使用的FileStateBackend,然后使用一个对象存储服务作为后端。

按照我的理解,这种方式下,状态的操作性能很高,都是在内存内部,只有检查点时候才会输出到对象存储中。但是,不支持增量检查点。

RocksDB支持增量检查点,但是缺点是每个状态的操作都是需要序列化/反序列化,至于是文件还是内存操作可能还和rocksdb的块大小,多久刷新等有关。
不过我现在在想,既然我的任务的状态当前使用内存存储,也就是内存存储是能够容纳我的全状态的。
那么是否我从FileStateBackend切换到RocksDB其实性能也不会降低很多呢? 就是牺牲很少性能,换来增量检查点。

此外,还有个点。RocksDB的增量检查点在每次检查点时候,输出到对象存储的部分也是增量?还是全量。
只是rocksdb自身使用增量状态,然后检查点存储的时候是全量到对象存储吗?
还是说,对象存储中存储的ckpt-1,ckpt-2,ckpt-3等也是增量的,即ckpt-3可能依赖ckpt-2等这样。


退订

2021-06-21 Thread William王卫光
退订