flink1.11.2检查点失败

2020-12-17 文章 zhy
hi、
我这面在使用flink1.11.2做实时特征的时候,状态大小大约在30g左右的时候任务就不能继续运行了,而查看异常日志发现大量的InterruptedException,请问这种情况是集群的问题还是flink的问题,而另一个3G状态的任务依然正常运行


flink1.9.1 支持一个 source 指定消费多个 topics么?

2020-12-17 文章 bradyMk
Hi,想请教一下大家:

最近通过flink_taskmanager_job_task_operator_KafkaConsumer_records_consumed_rate指标发现,
flink某个任务消费一个topic A 竟然比消费topic A,B,C,D一起的指标要高,
也就是我四个topic每秒消费的数据竟然还没其中一个topic每秒消费的数据高,
所以想请问:flink1.9.1 支持一个 source 指定消费多个 topics么?
我的代码如下:
val A= params.getProperty("kafka.scene.data.topic")
val B= params.getProperty("kafka.scene.log.topic")
val C= params.getProperty("kafka.event.topic")
val D= params.getProperty("kafka.log.topic")
import scala.collection.JavaConverters._
val topics = List[String](sceneDataTopic, sceneLogTopic, eventTopic,
sdkLog).asJava
env .addSource(new FlinkKafkaConsumer011(topics, new
JSONKeyValueDeserializationSchema(false), kafkaPro))




-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Topic not present in metadata after 60000 ms.

2020-12-17 文章 Level1accelerator
版本1.11.2
用这种方式FlinkKafkaProducer(String topicId, SerializationSchema
serializationSchema, Properties producerConfig)生产没问题,数据写入无报错

这种方式
FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema serializationSchema,
Properties producerConfig,
Optional> customPartitioner) 
就会报上面的错误,不知为何,哪位大佬解答下




--
Sent from: http://apache-flink.147419.n8.nabble.com/


????????flink1.12????????????????state????

2020-12-17 文章 ??????
Hiflink1.12group
 by current_date,userId ??flink 
state?? 1??Stream??TTL 
2??tabEnv.getConfig().setIdleStateRetention(Duration.ofDays(1))

flink 1.11 interval join??????rocksdb????????????

2020-12-17 文章 867127831
Hi,


flink 1.11 on k8sjoin??sql??rocksdbbackend??flink 
managedflink??state.backend.rocksdb.memory.managed=truek8s??pod


flink sql:


insert into console_sink
select t1.*, t2.*
from t1 left join t2
on t1.unique_id = t2.unique_id
and t1.event_time BETWEEN t2.event_time - INTERVAL '1' HOUR AND t2.event_time + 
INTERVAL '1' HOUR



??
state.backend=rocksdb;
state.backend.incremental=false;
state.backend.rocksdb.memory.managed=true
state.idle.retention.mintime='10 min';
state.idle.retention.maxtime='20 min';
checkpoint.time.interval='15 min';
source.idle-timeout='6 ms';

taskmanager.memory.flink.size =55 gb
taskmanager.memory.managed.fraction=0.85






??
1. checkpoint??size??200G??state
2. k8s 
podpodpod??pod
3. promethus??metrics, 
rocksdb_block_cache_usagerocksdb_block_cache_capacityrocksdb_block_cache_usageflink
 managed


flink??rocksdb??rocksdb_block_cache_usage

StreamTableEnvironmentImpl ??????????????

2020-12-17 文章 ????????
??org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl#create
 ??  ??
if (!settings.isStreamingMode()) {
throw new TableException(
"StreamTableEnvironment can not run in batch 
mode for now, please use TableEnvironment.");
}



 ?? 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl#StreamTableEnvironmentImpl
 


?? ??


    ?? ??
http://apache-flink.147419.n8.nabble.com/Blink-Planner-Remote-Env-td3162.html#a3180

Re: Blink Planner构造Remote Env

2020-12-17 文章 莫失莫忘
我现在也碰到了这个问题,也是删除了方法中检查模式的代码 settings.isStreamingMode()。源码为什么要加这样一个检测呢? 感觉
StreamTableEnvironmentImpl 原来是能跑批的,现在加上这个检测 反而不能了。我看最新版的 1.12 还有这个检测代码呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: flink clickhouse connector

2020-12-17 文章 guoliubi...@foxmail.com
有,但是贴做附件时因为超长没法发出去
:
ezmlm-reject: fatal: Sorry, I don't accept messages larger than 100 bytes 
(#5.2.3)

发你私人邮箱是否方便



guoliubi...@foxmail.com
 
From: magichuang
Date: 2020-12-17 20:18
To: user-zh
Subject: Re: Re: flink clickhouse connector
您是用java写的还是pyflink  啊?  我是用pyflink写的程序,所以需要一个jar包,您那里有嘛,我本地是新安装的maven,在打包  
但是一直在下载依赖好多。。
 
 
> -- 原始邮件 --
> 发 件 人:"guoliubi...@foxmail.com" 
> 发送时间:2020-12-17 19:36:55
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: flink clickhouse connector
>
> 我这也是往clickhouse写数据,用官方的或是其他第三方的JDBC驱动(我用的https://github.com/blynkkk/clickhouse4j),然后用JdbcSink就能写入了,不需要另外写connector。
>
>
>
> guoliubi...@foxmail.com
>
> From: magichuang
> Date: 2020-12-17 18:41
> To: user-zh
> Subject: flink clickhouse connector
> hi 想问一下有小伙伴使用flink 
> 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
>  这个flink-connector,但是运行报错了:
>
> Caused by: java.io.IOException: unable to establish connection to ClickHouse
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)
>
> at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
>
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
> at 
> org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
>
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.IOException: table `default`.`traffic` is not a 
> Distributed table
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)
>
> ... 12 more
>
>
>
>
> 但 traffic 这个表我在clickhouse里面创建了,flink版本是1.11
>
>
>
>
> 有小伙伴成功对接的嘛,可否分享一下connector呀
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best,
>
> MagicHuang
>
>
>
>
>
 
 
 
--
 
Best,
 
MagicHuang
 
 


jdbc sink无法插入数据

2020-12-17 文章 guoliubi...@foxmail.com
Hi,

我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
.process(new ProcessFunction() {
@Override
public void processElement(RatioValuevalue, Context ctx, 
Collector out) throws Exception {
out.collect(value);
ctx.output(ratioOutputTag, value);
}
});
sideStream.addSink(new FlinkKafkaProducer<>(
"ratio_value",
new RatioValueSerializationSchema(suffix),
PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), 
tool.get(SCHEMA_REGISTRY_URL)),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
DataStream ratioSideStream = 
sideStream.getSideOutput(ratioOutputTag);
ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
想问下这种情况是否有什么排查手段?


guoliubi...@foxmail.com


Flink 1.11.2 读写Hive以及对hive的版本支持

2020-12-17 文章 Jacob
Dear All,

Flink.11.2操作hive时,对hive的版本支持是怎样的


看官网介绍是支持1.0、1.1、1.2、2.0、2.1、2.2、2.3、3.1
我的执行环境:

*Flink : 1.11.2*
*Haoop : 2.6.0-cdh5.8.3*
*Hive : 1.1.0-cdh5.8.3*
*Job运行方式 : on yarn*

同时对读写hive的demo,我不知道我写的是否正确:

public static void main(String[] args) throws Exception {

EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

String name = "myhive";
String defaultDatabase = "datafeed";
String hiveConfDir = "/opt/app/bigdata/hive-1.1.0-cdh5.8.3/conf"; 
// hive-site.xml路径
String version = "1.1.0-cdh5.8.3";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);

tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
String createDbSql = "INSERT INTO TABLE flink2hive_test VALUES
('55', \"333\", \"CHN\")";
tableEnv.sqlUpdate(createDbSql);  
}

这样的job提交到yarn会报错:

Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.mapreduce.TaskAttemptContext

是缺少MapReduce的相关包吗?





-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-17 文章 Storm☀️
state.backend.incremental 出现问题的时候增量模式是开启的吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink eventTIme????

2020-12-17 文章 ?g???U?[????
Hi all  When I use SQL with UDTF, when I call the 
tableEnv.sqlQuery () method, I throw the following error: Rowtime attributes 
must not be in the input rows of a regular join. As a workaround you can cast 
the time attributes of input tables to TIMESTAMP before.I used the 
to_timestamp function in eventTIme and it doesn't work,How to solve the 
problem?


sql: select
 tmp.metric_id as 
metric_id,
 
tmp.metric_config as metric_config,
 startLat,
 destName,
 bizType,
 orderId,
 
completedTime,
 
orderStatus,
 
startHexList,
 cityId,
 type,
 destLat,
 endHexList,
 destLng,
 createTime,
 
passengerId,
 
finishedTime,
 vehicleId,
 startLng,
 startName,
 eventTime
from
 
htw_order_dwd_htw_order_geo_Infos,
 lateral table(
  
metricUdtf('aa')
 ) as 
tmp(metric_id, metric_config)


Thanks
Jiazhi

Re: flink1.12 docker 镜像啥时候有

2020-12-17 文章 Yang Wang
OK,我看目前flink-docker项目里面的docker-entrypoint.sh是正常,有其他问题你再继续反馈


Best,
Yang

superainbower  于2020年12月18日周五 上午8:33写道:

> hi,我重新git下来,build又可以了,可能之前我下的有文件有问题
>
> 在2020年12月17日 14:08,Yang Wang 写道:
> 你直接clone下来,然后cd到1.12目录,这样build出来的镜像也是可以的
>
> 你用build的镜像启动Flink任务是报什么错呢,我这边试了一下是可以正常运行的
>
>   - git clone https://github.com/apache/flink-docker.git
>   - cd scala_2.11-java8-debian
>   - sudo docker build -t flink:1.12.0 .
>   - docker push
>
>
> Best,
> Yang
>
> superainbower  于2020年12月17日周四 上午7:19写道:
>
> > 请教下 git checkout dev-master./add-custom.sh -u
> >
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
> > -n flink-1.12.0 这是一条指令吗?感觉执行不了
> > 另外直接
> > git clone https://github.com/apache/flink-docker.git
> > 在里面的1.12目录中选择2.11的进去,直接dock build -t flink:1.12.0
> .可以吗,我尝试直接这样构建出来的镜像好像不能跑
> >
> > 在2020年12月16日 10:56,Yang Wang 写道:
> > 目前社区在将镜像推到docker hub的过程中遇到了点问题,正在解决
> > 具体你可以跟进一下这个PR
> https://github.com/docker-library/official-images/pull/9249
> >
> > 当前你也可以自己build一个镜像来使用,方法如下:
> >
> > git clone https://github.com/apache/flink-docker.git
> > git checkout dev-master./add-custom.sh -u
> >
> >
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
> > -n
> > <
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz-n
> >
> > flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t
> > flink:flink-1.12.0docker push flink:flink-1.12.0
> >
> >
> >
> > jiangjiguang719  于2020年12月9日周三 下午5:09写道:
> >
> > > 请问啥时候 在docker hub中可以看到1.12版本的镜像?
> >
>


Re: Re:Re: flink sql作业state size一直增加

2020-12-17 文章 Storm☀️
mini batch默认为false 。题主问题找到了吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


取消订阅中文资讯邮件列表失败

2020-12-17 文章 肖越
由失误操作使用了企业邮箱订阅,目前无法取消订阅,向user-zh-unsubscribe发送多封邮件也无效?请问官方有解决办法么?

Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

2020-12-17 文章 bradyMk
Hi~谢谢 Yun Tang 大佬的解答~

不过这个指标不能单任务配置么?官网有这么个提示:

"启用本机指标可能会导致性能下降,应谨慎设置"[1]

所以如果全局配置,其他没有用RocksDB的任务也会尝试发送这个指标,那会不会导致其他任务的性能下降?感觉这样不是很科学啊?


[1]https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#rocksdb-native-metrics



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()

2020-12-17 文章 丁浩浩
问题我自己已经解决。

> 在 2020年12月17日,下午9:00,丁浩浩 <18579099...@163.com> 写道:
> 
> flink版本:1.11.1
> udaf函数代码来自于阿里云官网文档
> 
> 以下是代码
> public class TestSql {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env);
>//env.setParallelism(3);
>tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);
> 
>Properties configs = CommonUtils.getConfigs();
>//注册clazz源表
>FlinkUtils.registerMysqlTable2FlinkTable(
>tableEnv,configs.getProperty("url"),
>configs.getProperty("user.name"), 
> configs.getProperty("password"),
>“test", "clazz_lesson");
> 
>Table table = tableEnv.sqlQuery("select 
> count_uadf(clazz_number),clazz_number from clazz_lesson group by 
> clazz_number");
>//Table table = tableEnv.sqlQuery("select 
> number,collect(extension_value) from clazz_extension group by number ");
>tableEnv.toRetractStream(table, Row.class).print();
>env.execute();
> 
> 
>}
> }
> 
> 
> 
> public class CountUdaf extends AggregateFunction {
>//定义存放count UDAF状态的accumulator的数据的结构。
>public static class CountAccum {
>public long total;
>}
> 
>@Override
>//初始化count UDAF的accumulator。
>public CountAccum createAccumulator() {
>CountAccum acc = new CountAccum();
>acc.total = 0;
>return acc;
>}
>@Override
>//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
>public Long getValue(CountAccum accumulator) {
>return accumulator.total;
>}
> 
> 
>//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
>public void accumulate(CountAccum accumulator, Long iValue) {
>accumulator.total++;
>}
>public void merge(CountAccum accumulator, Iterable its) {
>for (CountAccum other : its) {
>accumulator.total += other.total;
>}
>}
> }
> 
> 以下是堆栈信息
> 
> -
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 1, column 8 to line 1, column 31: No match 
> found for function signature count_uadf()
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
>   at 
> com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 8 to line 1, column 31: No match found for function signature 
> count_uadf()
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
>   at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
>   at 
> 

Re: flink 1.12 RocksDBStateBackend 报错

2020-12-17 文章 Xintong Song
https://issues.apache.org/jira/browse/FLINK-20646

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 11:40 PM zhisheng  wrote:

> hi,xintong
>
> 有对应的 Issue ID 吗?
>
> Xintong Song  于2020年12月17日周四 下午4:48写道:
>
> > 确实是 1.12.0 的 bug。
> > 我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添加的
> > ReduceTransformation 没有做这个声明,导致所有涉及到这个算子的作业使用 RocksDB 都会出问题。
> > 我马上建 issue,这个可能要推动社区加急发一个 bugfix 版本了
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Dec 17, 2020 at 11:05 AM HunterXHunter <1356469...@qq.com>
> wrote:
> >
> > > 1.12设置 env.setStateBackend(new RocksDBStateBackend(checkpointPath,
> > > true))之后会报错
> > > :
> > > Caused by: java.lang.IllegalArgumentException: The fraction of memory
> to
> > > allocate should not be 0. Please make sure that all types of managed
> > memory
> > > consumers contained in the job are configured with a non-negative
> weight
> > > via
> > > `taskmanager.memory.managed.consumer-weights`.
> > >
> > > 但查看源码这个参数是默认值。
> > > 最终找到原因是
> > > Streamconfig下getManagedMemoryFractionOperatorUseCaseOfSlot中
> > > config缺少key : managedMemFraction.STATE_BACKEND
> > > 当设置
> > > config.setDouble("managedMemFraction.STATE_BACKEND", 0.7)
> > > 后,程序正常。
> > > 代码如下
> > > https://paste.ubuntu.com/p/9WrBz3Xrc6/
> > >
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> > >
> >
>


Re: task manager内存使用问题

2020-12-17 文章 Yangze Guo
1. 加jvm参数可以使用env.java.opts.taskmanager配置
2. 目前tm中没有对heap memory进行slot间细粒度管理,session模式下不支持这种功能

Best,
Yangze Guo

On Fri, Dec 18, 2020 at 9:22 AM guoliubi...@foxmail.com
 wrote:
>
> Hi,
> 现在使用的是flink1.12,使用standalone cluster模式运行。
> 在上面运行一个Job内存消耗大,会用满heap然后把整个task manager带崩掉。
> 想问下怎么给task manager的jvm加上heap dump相关参数。
> 还有是否有选项,可以在某个job吃满heap后是kill这个job而不是shutdown整个task manager,因为这个task 
> manager还有其他job在跑,会导致其他job一起fail。
>
>
>
> guoliubi...@foxmail.com


回复: task manager内存使用问题

2020-12-17 文章 yinghua...@163.com
我也是在这个讨论群学到的,你看下能否解答你的问题
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#env-java-opts

这个里面可以配置task manager的虚拟机参数,在虚拟机参数里配置 -XX:OnOutOfMemoryError=kill -9 
%p,这样当OOM时杀掉task manager进程







yinghua...@163.com
 
发件人: guoliubi...@foxmail.com
发送时间: 2020-12-18 09:22
收件人: user-zh
主题: task manager内存使用问题
Hi,
现在使用的是flink1.12,使用standalone cluster模式运行。
在上面运行一个Job内存消耗大,会用满heap然后把整个task manager带崩掉。
想问下怎么给task manager的jvm加上heap dump相关参数。
还有是否有选项,可以在某个job吃满heap后是kill这个job而不是shutdown整个task manager,因为这个task 
manager还有其他job在跑,会导致其他job一起fail。
 
 
 
guoliubi...@foxmail.com


task manager内存使用问题

2020-12-17 文章 guoliubi...@foxmail.com
Hi,
现在使用的是flink1.12,使用standalone cluster模式运行。
在上面运行一个Job内存消耗大,会用满heap然后把整个task manager带崩掉。
想问下怎么给task manager的jvm加上heap dump相关参数。
还有是否有选项,可以在某个job吃满heap后是kill这个job而不是shutdown整个task manager,因为这个task 
manager还有其他job在跑,会导致其他job一起fail。



guoliubi...@foxmail.com


回复:flink1.12 docker 镜像啥时候有

2020-12-17 文章 superainbower
hi,我重新git下来,build又可以了,可能之前我下的有文件有问题

在2020年12月17日 14:08,Yang Wang 写道:
你直接clone下来,然后cd到1.12目录,这样build出来的镜像也是可以的

你用build的镜像启动Flink任务是报什么错呢,我这边试了一下是可以正常运行的

  - git clone https://github.com/apache/flink-docker.git
  - cd scala_2.11-java8-debian
  - sudo docker build -t flink:1.12.0 .
  - docker push


Best,
Yang

superainbower  于2020年12月17日周四 上午7:19写道:

> 请教下 git checkout dev-master./add-custom.sh -u
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
> -n flink-1.12.0 这是一条指令吗?感觉执行不了
> 另外直接
> git clone https://github.com/apache/flink-docker.git
> 在里面的1.12目录中选择2.11的进去,直接dock build -t flink:1.12.0 .可以吗,我尝试直接这样构建出来的镜像好像不能跑
>
> 在2020年12月16日 10:56,Yang Wang 写道:
> 目前社区在将镜像推到docker hub的过程中遇到了点问题,正在解决
> 具体你可以跟进一下这个PR https://github.com/docker-library/official-images/pull/9249
>
> 当前你也可以自己build一个镜像来使用,方法如下:
>
> git clone https://github.com/apache/flink-docker.git
> git checkout dev-master./add-custom.sh -u
>
> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
> -n
> 
> flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t
> flink:flink-1.12.0docker push flink:flink-1.12.0
>
>
>
> jiangjiguang719  于2020年12月9日周三 下午5:09写道:
>
> > 请问啥时候 在docker hub中可以看到1.12版本的镜像?
>


Re: flink 1.12 RocksDBStateBackend 报错

2020-12-17 文章 zhisheng
hi,xintong

有对应的 Issue ID 吗?

Xintong Song  于2020年12月17日周四 下午4:48写道:

> 确实是 1.12.0 的 bug。
> 我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添加的
> ReduceTransformation 没有做这个声明,导致所有涉及到这个算子的作业使用 RocksDB 都会出问题。
> 我马上建 issue,这个可能要推动社区加急发一个 bugfix 版本了
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Dec 17, 2020 at 11:05 AM HunterXHunter <1356469...@qq.com> wrote:
>
> > 1.12设置 env.setStateBackend(new RocksDBStateBackend(checkpointPath,
> > true))之后会报错
> > :
> > Caused by: java.lang.IllegalArgumentException: The fraction of memory to
> > allocate should not be 0. Please make sure that all types of managed
> memory
> > consumers contained in the job are configured with a non-negative weight
> > via
> > `taskmanager.memory.managed.consumer-weights`.
> >
> > 但查看源码这个参数是默认值。
> > 最终找到原因是
> > Streamconfig下getManagedMemoryFractionOperatorUseCaseOfSlot中
> > config缺少key : managedMemFraction.STATE_BACKEND
> > 当设置
> > config.setDouble("managedMemFraction.STATE_BACKEND", 0.7)
> > 后,程序正常。
> > 代码如下
> > https://paste.ubuntu.com/p/9WrBz3Xrc6/
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

2020-12-17 文章 Yun Tang
Hi

这些metrics启用的配置是放到flink conf里面的,不是让你直接在代码里面调用的。

祝好
唐云

From: bradyMk 
Sent: Thursday, December 17, 2020 20:56
To: user-zh@flink.apache.org 
Subject: Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

谢谢 Yun Tang 大佬的解答~

另外,还想请教一下:我在代码中设置开启了cur-size-all-mem-tables的监控,代码如下:
//设置RocksDB状态后端,且开启增量ck
val backend = new RocksDBStateBackend(path, true)

//监控配置项
val metricOptions = new RocksDBNativeMetricOptions
metricOptions.enableSizeAllMemTables()

//设置预选项
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)

//开启RocksDB
env.setStateBackend(backend.asInstanceOf[StateBackend])

但是发现这个监控指标并没有成功发送,请问是我在代码里开启的方式不对么?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()

2020-12-17 文章 丁浩浩
flink版本:1.11.1
udaf函数代码来自于阿里云官网文档

以下是代码
public class TestSql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env);
//env.setParallelism(3);
tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);

Properties configs = CommonUtils.getConfigs();
//注册clazz源表
FlinkUtils.registerMysqlTable2FlinkTable(
tableEnv,configs.getProperty("url"),
configs.getProperty("user.name"), 
configs.getProperty("password"),
“test", "clazz_lesson");

Table table = tableEnv.sqlQuery("select 
count_uadf(clazz_number),clazz_number from clazz_lesson group by clazz_number");
//Table table = tableEnv.sqlQuery("select 
number,collect(extension_value) from clazz_extension group by number ");
tableEnv.toRetractStream(table, Row.class).print();
env.execute();


}
}



public class CountUdaf extends AggregateFunction {
//定义存放count UDAF状态的accumulator的数据的结构。
public static class CountAccum {
public long total;
}

@Override
//初始化count UDAF的accumulator。
public CountAccum createAccumulator() {
CountAccum acc = new CountAccum();
acc.total = 0;
return acc;
}
@Override
//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
public Long getValue(CountAccum accumulator) {
return accumulator.total;
}


//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
public void accumulate(CountAccum accumulator, Long iValue) {
accumulator.total++;
}
public void merge(CountAccum accumulator, Iterable its) {
for (CountAccum other : its) {
accumulator.total += other.total;
}
}
}

以下是堆栈信息

-
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. From line 1, column 8 to line 1, column 31: No match found 
for function signature count_uadf()
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at 
com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 8 to line 1, column 31: No match found for function signature 
count_uadf()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
at 

Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

2020-12-17 文章 bradyMk
谢谢 Yun Tang 大佬的解答~

另外,还想请教一下:我在代码中设置开启了cur-size-all-mem-tables的监控,代码如下:
//设置RocksDB状态后端,且开启增量ck
val backend = new RocksDBStateBackend(path, true)

//监控配置项
val metricOptions = new RocksDBNativeMetricOptions
metricOptions.enableSizeAllMemTables()

//设置预选项
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)

//开启RocksDB
env.setStateBackend(backend.asInstanceOf[StateBackend])

但是发现这个监控指标并没有成功发送,请问是我在代码里开启的方式不对么?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink clickhouse connector

2020-12-17 文章 magichuang
您是用java写的还是pyflink  啊?  我是用pyflink写的程序,所以需要一个jar包,您那里有嘛,我本地是新安装的maven,在打包  
但是一直在下载依赖好多。。


> -- 原始邮件 --
> 发 件 人:"guoliubi...@foxmail.com" 
> 发送时间:2020-12-17 19:36:55
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: flink clickhouse connector
>
> 我这也是往clickhouse写数据,用官方的或是其他第三方的JDBC驱动(我用的https://github.com/blynkkk/clickhouse4j),然后用JdbcSink就能写入了,不需要另外写connector。
>
>
>
> guoliubi...@foxmail.com
>
> From: magichuang
> Date: 2020-12-17 18:41
> To: user-zh
> Subject: flink clickhouse connector
> hi 想问一下有小伙伴使用flink 
> 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
>  这个flink-connector,但是运行报错了:
>
> Caused by: java.io.IOException: unable to establish connection to ClickHouse
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)
>
> at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
>
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
> at 
> org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
>
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.IOException: table `default`.`traffic` is not a 
> Distributed table
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)
>
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)
>
> ... 12 more
>
>
>
>
> 但 traffic 这个表我在clickhouse里面创建了,flink版本是1.11
>
>
>
>
> 有小伙伴成功对接的嘛,可否分享一下connector呀
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best,
>
> MagicHuang
>
>
>
>
>



--

Best,

MagicHuang




flink-1.11.1 checkpoint超时,任务会不断重启

2020-12-17 文章 nicygan
dear all:
我有一个flink流式任务,checkpoint周期5分钟,超时时间3分钟。
此任务中调用了第三方接口,正常情况下没问题,正常的checkpoint时长仅80ms。


 但由于第三方接口发生了拥堵,有部分调用会超时(接口调用超时设置了5秒钟),
 然后此算子的checkpoint就会超时,
 checkpoint 3179 of job  expired before completing

 trying to recover from a global failure
 exceeded checkpoint tolerable failure threshold

 然后任务就发生重启,恢复到最后一个正常checkpoint点。
 但到下一个checkpoint周期时,又超时,又发生重启,又恢复到那个正常checkpoint点。
 就如此反复重启,恢复到那个正常checkpoint点,也导致流中的数据无法继续消费。


 checkpoint超时为什么会导致任务重启,可以避免让他重启吗?
 调用第三方接口超时的数据,我可以后面单独处理,但重启却导致了数据无法消费。


thanks
/nicygan


Re: flink clickhouse connector

2020-12-17 文章 liang zhao
我使用的是第三方的驱动,clickhouse-native-jdbc,通过JDBC的方式。
> 2020年12月17日 18:41,magichuang  写道:
> 
> hi想问一下有小伙伴使用flink 
> 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
>   这个flink-connector,但是运行报错了:
> 
> Caused by: java.io.IOException: unable to establish connection to ClickHouse
> 
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)
> 
> at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
> 
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> 
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> 
> at 
> org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> 
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> 
> at java.lang.Thread.run(Thread.java:748)
> 
> Caused by: java.io.IOException: table `default`.`traffic` is not a 
> Distributed table
> 
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)
> 
> at 
> com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)
> 
> ... 12 more
> 
> 
> 
> 
> 但  traffic  这个表我在clickhouse里面创建了,flink版本是1.11
> 
> 
> 
> 
> 有小伙伴成功对接的嘛,可否分享一下connector呀
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> Best,
> 
> MagicHuang
> 
> 
> 
> 
> 



Re: flink clickhouse connector

2020-12-17 文章 guoliubi...@foxmail.com
我这也是往clickhouse写数据,用官方的或是其他第三方的JDBC驱动(我用的https://github.com/blynkkk/clickhouse4j),然后用JdbcSink就能写入了,不需要另外写connector。



guoliubi...@foxmail.com
 
From: magichuang
Date: 2020-12-17 18:41
To: user-zh
Subject: flink clickhouse connector
hi想问一下有小伙伴使用flink 
往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
  这个flink-connector,但是运行报错了:
 
Caused by: java.io.IOException: unable to establish connection to ClickHouse
 
at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)
 
at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)
 
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 
at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
 
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 
at java.lang.Thread.run(Thread.java:748)
 
Caused by: java.io.IOException: table `default`.`traffic` is not a Distributed 
table
 
at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)
 
at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)
 
... 12 more
 
 
 
 
但  traffic  这个表我在clickhouse里面创建了,flink版本是1.11
 
 
 
 
有小伙伴成功对接的嘛,可否分享一下connector呀
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
Best,
 
MagicHuang
 
 
 
 
 


Flink1.12.0/flink1.11.0版本出现FLINK-19677的问题

2020-12-17 文章 李延延
你好,我在测试1.12版本时,在虚拟机上部署了一个jobmanager,三个taskmanager;
环境信息:
1.centos7/虚拟机,双网卡(一个.4网段,一个.18网段)
2.jobmanager 1节点;taskmanager3节点,未开启高可用


配置中发现hostname 
对应的是其中的一个网卡的(18网段)ip;而flink的masters/workers文件全部配置的是.4网段的ip,最重要的是jobmanager.rpc.address也配置的是.4网段,启动集群之后提交WordCount示例,提交不成功,报错与FLINK-19677一样
在节点上重新部署flink1.10.0版本,使用相同的ip配置,可以正常提交任务,并运行成功


对于flink1.12.0版本报错最后的解决办法是,换成把ip换成18网段,或者改成hostname问题都能解决,所以这个问题应该是flink1.11.0开始引入的问题





flink clickhouse connector

2020-12-17 文章 magichuang
hi想问一下有小伙伴使用flink 
往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
  这个flink-connector,但是运行报错了:

Caused by: java.io.IOException: unable to establish connection to ClickHouse

at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:79)

at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65)

at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:73)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.IOException: table `default`.`traffic` is not a Distributed 
table

at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.establishShardConnections(ClickHouseShardOutputFormat.java:96)

at 
com.aliyun.flink.connector.clickhouse.table.internal.ClickHouseShardOutputFormat.open(ClickHouseShardOutputFormat.java:76)

... 12 more




但  traffic  这个表我在clickhouse里面创建了,flink版本是1.11




有小伙伴成功对接的嘛,可否分享一下connector呀



















Best,

MagicHuang







回复: flink1.12 docker 镜像啥时候有

2020-12-17 文章 superainbower
您好,镜像打完之后,向K8S提交jobmanager-job.yaml的时候,jobmanager起不来,看日志,日志里报镜像时的docker-entrypoint.sh脚本第102行缺少
 ),我比对了下1.12 和1.11镜像里的 docker-entrypoint.sh,1.12里102行( 
_args=("${_args[@]:1}")对应的是 一个新加的 方法

disable_jemalloc_env() {

  # use nameref '_args' to update the passed 'args' within function

   local -n _args=$1

   if [ "${_args[0]}" = ${COMMAND_DISABLE_JEMALLOC} ]; then

  echo "Disable Jemalloc as the memory allocator"

  _args=("${_args[@]:1}")

   else

   echo "Enable Jemalloc as the memory allocator via appending env variable 
LD_PRELOAD with /usr/lib/x86_64-linux-gnu/libjemalloc.so"

   export LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so

   fi

}



另外我看了1.12的文档,yaml文件和1.11应该没有区别,原先的yaml文件在1.11.1的版本是可以正常执行的
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


在2020年12月17日 14:08,Yang Wang 写道:
你直接clone下来,然后cd到1.12目录,这样build出来的镜像也是可以的

你用build的镜像启动Flink任务是报什么错呢,我这边试了一下是可以正常运行的

- git clone https://github.com/apache/flink-docker.git
- cd scala_2.11-java8-debian
- sudo docker build -t flink:1.12.0 .
- docker push


Best,
Yang

superainbower  于2020年12月17日周四 上午7:19写道:

请教下 git checkout dev-master./add-custom.sh -u
https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
-n flink-1.12.0 这是一条指令吗?感觉执行不了
另外直接
git clone https://github.com/apache/flink-docker.git
在里面的1.12目录中选择2.11的进去,直接dock build -t flink:1.12.0 .可以吗,我尝试直接这样构建出来的镜像好像不能跑

在2020年12月16日 10:56,Yang Wang 写道:
目前社区在将镜像推到docker hub的过程中遇到了点问题,正在解决
具体你可以跟进一下这个PR https://github.com/docker-library/official-images/pull/9249

当前你也可以自己build一个镜像来使用,方法如下:

git clone https://github.com/apache/flink-docker.git
git checkout dev-master./add-custom.sh -u

https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
-n

flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t
flink:flink-1.12.0docker push flink:flink-1.12.0



jiangjiguang719  于2020年12月9日周三 下午5:09写道:

请问啥时候 在docker hub中可以看到1.12版本的镜像?



flink-sql????-??????????state

2020-12-17 文章 ??????
?? flink 
sql??cdccdc??state??
 state??
val config: TableConfig = tabEnv.getConfig
config.setIdleStateRetention(Duration.ofHours(1))

pyflink 有没有方便的print方式?例如java api中的 .print() ?

2020-12-17 文章 huang huang
print(page_turn.to_pandas())

> 可以collect到client端[1],或者可以看看另外几种方式[2]: [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/python/pyflink.table.html#pyflink.table.TableResult.collect
>   [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/intro_to_table_api.html#emit-results
>在 2020年12月17日,下午2:47,肖越  写道:   最近在尝试 pyflink 
> 功能,只查到了定义connector 的输出方法,例如:  sink_ddl = '''  CREATE TABLE print_sink 
> (  ID DOUBLE,  NAME STRING  ) WITH (  'connector' = 'print' 
>  )  '''  每次都要事先定义好要输出的表格格式,是否有更加方便的输出方法?


曹 三启 added you to the flink group

2020-12-17 文章 曹 三启
[Microsoft] Groups
[cid:TransparentConsumerWelcomeMailNonOutlookTopBanner]
曹 三启 added
you to flink!
Use the group to share messages and files, and to coordinate group events.
3 members
Have group discussions
With only one email address to remember, connecting with everyone is easy.
Email the group
[ConversationIcon]
[FilesIcon]
Share your files
Send documents, photos, and links. Everyone wants to see those selfies from 
your trip!
Share 
files
Plan group events
Inviting the group is a snap with one email address. You’ll never accidentally 
leave someone off an invitation again.
Create an 
event
[CalendarIcon]
Start exploring group features today
Learn more
[cid:TransparentConsumerWelcomeMailNonOutlookFooter]
You're receiving this message because you're a member of the flink group from 
Microsoft 365.
Report 
abuse
  |  Leave 
group
  |  Privacy Statement  |  
Learn more
© Microsoft Corporation, One Microsoft Way, Redmond, WA 98052 USA
[Microsoft]


Re: flink 1.12 RocksDBStateBackend 报错

2020-12-17 文章 Xintong Song
确实是 1.12.0 的 bug。
我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添加的
ReduceTransformation 没有做这个声明,导致所有涉及到这个算子的作业使用 RocksDB 都会出问题。
我马上建 issue,这个可能要推动社区加急发一个 bugfix 版本了

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 11:05 AM HunterXHunter <1356469...@qq.com> wrote:

> 1.12设置 env.setStateBackend(new RocksDBStateBackend(checkpointPath,
> true))之后会报错
> :
> Caused by: java.lang.IllegalArgumentException: The fraction of memory to
> allocate should not be 0. Please make sure that all types of managed memory
> consumers contained in the job are configured with a non-negative weight
> via
> `taskmanager.memory.managed.consumer-weights`.
>
> 但查看源码这个参数是默认值。
> 最终找到原因是
> Streamconfig下getManagedMemoryFractionOperatorUseCaseOfSlot中
> config缺少key : managedMemFraction.STATE_BACKEND
> 当设置
> config.setDouble("managedMemFraction.STATE_BACKEND", 0.7)
> 后,程序正常。
> 代码如下
> https://paste.ubuntu.com/p/9WrBz3Xrc6/
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

2020-12-17 文章 Storm☀️
flink 1.10.1 同样遇到这个问题 设置了ttl但是没有生效,请问题主解决该问题了吗?
*sql*:
select
* 
from 
xx
group by 
TUMBLE(monitor_processtime, INTERVAL '60' SECOND),topic_identity

*60s的窗口,设置的过期时间是2分钟,但是checkpoint中状态还是在变大*

*tEnv.getConfig().setIdleStateRetentionTime(Time.minutes(2),
Time.minutes(5)); *
   



--
Sent from: http://apache-flink.147419.n8.nabble.com/