Re: Flink SQL 可以使用异步IO特性吗

2021-01-12 Thread zelin jin
hello,kenyore.
我大致了解了你的意思,你可以通过继承AsyncTableFunction的方式实现数据库异步IO。

公共抽象类AsyncTableFunction  扩展了UserDefinedFunction


AsyncTableFunction
可以通过实现自定义评估方法来定义a的行为。评估方法必须公开声明,而不是静态声明,并命名为“
eval”。评估方法也可以通过实现多个名为“ eval”的方法来重载。

对于每个“ eval”,都可以触发一个异步io操作,一旦完成,就可以通过调用来收集结果CompletableFuture.complete(T)
。对于每个异步操作,调用“
eval”后,其上下文将立即存储在运算符中,从而避免在内部缓冲区未满的情况下阻塞输入的每个流。

代码示例:

   public void eval(CompletableFuture> result,
String rowkey) {
Get get = new Get(Bytes.toBytes(rowkey));
ListenableFuture future = hbase.asyncGet(get);
Futures.addCallback(future, new FutureCallback() {
public void onSuccess(Result result) {
List ret = process(result);
result.complete(ret);
}

public void onFailure(Throwable thrown) {
result.completeExceptionally(thrown);
}
});
}

参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html
kenyore  于2021年1月12日周二 下午3:29写道:

> 感谢如此详尽的回复!
> 但是我的场景似乎无法直接使用维表join。
> 因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL 可以使用异步IO特性吗

2021-01-12 Thread zelin jin
hello,kenyore.
我大致了解了你的意思,你可以通过继承AsyncTableFunction的方式实现数据库异步IO。

公共抽象类AsyncTableFunction  扩展了UserDefinedFunction


AsyncTableFunction
可以通过实现自定义评估方法来定义a的行为。评估方法必须公开声明,而不是静态声明,并命名为“
eval”。评估方法也可以通过实现多个名为“ eval”的方法来重载。

对于每个“ eval”,都可以触发一个异步io操作,一旦完成,就可以通过调用来收集结果CompletableFuture.complete(T)
。对于每个异步操作,调用“
eval”后,其上下文将立即存储在运算符中,从而避免在内部缓冲区未满的情况下阻塞输入的每个流。

代码示例:

   public void eval(CompletableFuture> result,
String rowkey) {
Get get = new Get(Bytes.toBytes(rowkey));
ListenableFuture future = hbase.asyncGet(get);
Futures.addCallback(future, new FutureCallback() {
public void onSuccess(Result result) {
List ret = process(result);
result.complete(ret);
}

public void onFailure(Throwable thrown) {
result.completeExceptionally(thrown);
}
});
}

参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html

kenyore  于2021年1月12日周二 下午3:29写道:

> 感谢如此详尽的回复!
> 但是我的场景似乎无法直接使用维表join。
> 因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:Flink1.12.0 sql-client连接hive报错

2021-01-12 Thread yujianbo
1、现在sql-cli能够提交到yarn的session那边,但是会直接报错:
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.mapred.JobConf
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

找不到Hadoop的依赖,我已经在三台测试机上已经将export HADOOP_CLASSPATH=`hadoop
classpath`配置到/etc/profile。

2、我的perjob任务或者,启动一个任务flink run -yid提交到这个session都是没有问题的

3、有个朋友给了我这个shade包flink-shaded-hadoop-2-uber-2.7.5-8.0.jar放到lib目录下,就可以了。

4、奇怪的是我不加这个shade包,通过export HADOOP_CLASSPATH=`hadoop
classpath`配置全局变量,session启动不起来吗???



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink编译报错

2021-01-12 Thread Yun Tang
Hi,

国内网络环境不太好,其实问题是node.js 安装有问题,可以考虑单独安装一下node 
js和npm,如果还是不行,在不需要webui的前提下,可以编译时候加上profile “-Pskip-webui-build” 来skip掉该部分的编译。

祝好
唐云

From: Ruguo Yu 
Sent: Tuesday, January 12, 2021 14:00
To: user-zh@flink.apache.org 
Subject: Re: flink编译报错

试下这个命令
mvn clean install -DskipTests -Dfast -Drat.skip=true -Dhaoop.version=2.7.6
-Dinclude-hadoop -Dscala-2.11 -T2C
其中,-Dhaoop.version 为 hadoop 版本号



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink代码一直报 java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢

2021-01-12 Thread Yun Tang
Hi,

这个错误其实是kryo初始化时候扔出来的。你自定义的类 SockRowV2,WashDetectionSockValue 
等,不符合Flink关于pojo的定义,所以会回退到使用kryo进行序列化/反序列化。建议将相关类在kryo上进行注册 
[1]。特别地,如果是thrift或者protobuf的类,需要单独注册[2],更好的方法其实是建议将你们的自定义类修改为满足Flink的POJO类 [3]


[1] 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#kryo
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#rules-for-pojo-types

祝好
唐云

From: JackJia 
Sent: Tuesday, January 12, 2021 14:16
To: user-zh@flink.apache.org 
Subject: Flink代码一直报 
java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢

请教个问题,代码一直报 java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢。
错误如下:
2021-01-12 04:36:09,950 INFO  org.apache.flink.runtime.taskmanager.Task 
- Window(TumblingEventTimeWindows(6), EventTimeTrigger, 
WashDataDetectionFunction) -> Map -> Map -> Sink: Unnamed (1/1) 
(b015c7cebf71e744f6b50136cdc32e20) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output 
watermark:
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1238)
at java.util.ArrayList$SubList.size(ArrayList.java:1048)
at java.util.AbstractList.add(AbstractList.java:108)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
at 
com.airen.lasttime.keyedfunction.WashDataDetectionFunction.temperatureFall(WashDataDetectionFunction.java:264)
at 
com.airen.lasttime.keyedfunction.WashDataDetectionFunction.washDetection(WashDataDetectionFunction.java:206)
at 
com.airen.lasttime.keyedfunction.WashDataDetectionFunction.process(WashDataDetectionFunction.java:94)
at 
com.airen.lasttime.keyedfunction.WashDataDetectionFunction.process(WashDataDetectionFunction.java:33)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
... 10 more


涉及的代码如下:
private SingleOutputStreamOperator 
dataClean(DataStream source, ParameterTool pt) {
WindowedStream windowStream = 
source.keyBy(x -> x.mac).window(TumblingEventTimeWindows.of(Time.seconds(60)));

SingleOutputStreamOperator afterWashDetection = 
windowStream.process(new WashDataDetectionFunction());

SingleOutputStreamOperator afterIdleAndShortLiveClean = 
afterWashDetection.keyBy(x -> x.mac)
.window(TumblingEventTimeWindows.of(Time.seconds(60))).process(new 
IdleAndShortLiveDataCleanFunction());


DataStre

Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

2021-01-12 Thread Yun Tang
Hi

Flink的容错机制是可以保证TM lost时候会尝试重启作业,“为何任务不能恢复”是需要看完整异常栈的,简单描述是无法帮助排查问题的。

祝好
唐云

From: Carmen Free 
Sent: Tuesday, January 12, 2021 15:52
To: user-zh@flink.apache.org 
Subject: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

hi,

rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

1、环境说明

flink版本:1.10.2
操作系统:centos 7

2、集群说明(当前模拟了2节点)

节点A   |  节点B
角色|   JM、TM|TM
taskslot   |   4   | 4

3、statebackend配置

# rocksdb作为状态后备
state.backend: rocksdb


# 存储快照的目录(暂时使用的本地目录)
state.checkpoints.dir: file:///data/flink/checkpoints

4、启动任务后,任务自动分配在A节点的TM上,运行一段时间后,检查点快照正常。接着,仅停掉A节点TM(JM仍正常运行),任务被自动调度至B节点的TM上,但是此时任务一直重启,无法恢复,这是为什么呢?

5、如果我启动A节点,任务依旧无法恢复(此时任务仍在B节点运行),直到我停掉B节点TM,此时任务调度至A节点,任务可以正常恢复。所以有点疑问,4中的场景为何不能恢复任务呢?为什么只有在A节点上才可以进行任务恢复呢?最初以为是访问路径的问题,但是仔细想了想,检查点相关的操作一直都是JM进行的,我觉得只要JM没有挂掉,应该就可以将任务进行恢复啊,是我的理解有偏差吗?


flink waterMark 相关问题

2021-01-12 Thread 张锴
hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。


FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 Thread jy l
Hi:
Flink SQL filter data throw an exception,
code:
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.fromElements(
  (1.0f, 11.0f, 12.0f),
  (2.0f, 21.0f, 22.0f),
  (3.0f, 31.0f, 32.0f),
  (4.0f, 41.0f, 42.0f),
  (5.0f, 51.0f, 52.0f)
)
val settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build()
val tEnv = StreamTableEnvironment.create(env, settings)
tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2"))
val query =
  """
|select * from myTable where id in (1.0, 2.0, 3.0)
|""".stripMargin
tEnv.executeSql(query).print()
}

exception:
Exception in thread "main" java.lang.UnsupportedOperationException: class
org.apache.calcite.sql.type.SqlTypeName: FLOAT
at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703)
at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737)
at org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710)
at org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397)
at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237)
at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110)
at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157)
at org.apache.calcite.util.Sarg.printTo(Sarg.java:106)
at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709)
at
org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652)
at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502)

Why is that? How do i need to solve it?

thanks.


Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

2021-01-12 Thread Carmen Free
你好,唐老师,谢谢解答。

不好意思,下面补充一下报错信息,刚才忘记说了。

主要报错信息如下,重新模拟了下:
2021-01-12 18:09:34,236 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
Custom Source -> Flat Map -> Timestamps/Watermarks (2/2)
(3b50a7ce56b408c2978260846b76a28a) switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4c107a3a.

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(2/2) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more

Caused by: java.io.FileNotFoundException:
/data/flink/checkpoints/b261de0447d59bb2f1db9c084f5b1a0b/chk-5/4a160901-8ddd-468e-a82d-6efcb8a9dff9
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
at
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at
org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:73)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more

这个文件夹在A节点(JM)上是有的,难道是访问权限问题吗?B节点无法访问A节点吗,有点奇怪啊,配置了ssh免密的啊,文件夹/data/flink/checkpoints访问权限也设置成了777

Yun Tang  于2021年1月12日周二 下午5:46写道:

> Hi
>
> Flink的容错机制是可以保证TM lost时候会尝试重启作业,“为何任务不能恢复”是需要看完整异常栈的,简单描述是无法帮助排查问题的。
>
> 祝好
> 唐云
> 
> From: Carmen Free 
> Sent: Tuesday, January 12, 2021 15:52
> To: user-zh@flink.apache.org 
> Subject: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
>
> hi,
>
> rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
>
> 1、环境说明
>
> flink版本:1.10.2
> 操作系统:centos 7
>
> 2、集群说明(当前模拟了2节点)
>
> 节点A   |  节点B
> 角色|   JM、TM|TM
> taskslot   |   4   | 4
>
> 3、statebackend配置
>
> # rocksdb作为状态后备
> state.backend: rocksdb
>
>
> # 存储快照的目录(暂时使用的本地目录)
> state.checkpoints.dir: file:///data/flink/checkpoints
>
>
> 4、启动任务后,任务自动分配在A节点的TM上,运行一段时间后,检查点快照正常。接着,仅停掉A节点TM(JM仍正常运行),任务被自动调度至B节点的TM上,但是此时任务一直重启,无法恢复,这是为什么呢?
>
>
> 5、如果我启动A节点,任务依旧无法恢复(此时任务仍在B节点运行),直到我停掉B节点TM,此时任务调度至A节点,任务可以正常恢复。所以有点疑问,4中的场景为何不能恢复任务呢?为什么只有在A节点上才可以进行任务恢复呢?最初以为是访问路径的问题,但是仔细想了想,检查点相关的操作一直都是JM进行的,我觉得只要JM没有挂掉,应该就可以将任务进行恢复啊,是我的理解有偏差吗?
>


回复:flink版本升级问题咨询

2021-01-12 Thread xuhaiLong
描述的不太对,具体可以参考下这个 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html
在2021年1月11日 20:12,xuhaiLong 写道:
我试过 flink 1.7 升级到 1.10。如果使用到了 table api 涉及 group By 的话 
savePoint恢复有问题,其他没发现过什么问题。或者可以使用下 process api 写一份数据,启动 job


在2021年1月7日 09:50,zhang hao 写道:
目前现状:公司flink任务都是跑在flin1.7当中,通过公司开发的流计算平台进行提交到flink yarn集群,flink on yarn
基于flink
session进行部署,运行任务有接近500个流式任务,许多都是有状态应用,现在如果想把flink集群升级到1.11或者1.12,如何平滑的进行版本升级,而不影响现有的任务?


Re: flink编译报错

2021-01-12 Thread 赵一旦
设置下镜像可以。

Yun Tang  于2021年1月12日周二 下午5:37写道:

> Hi,
>
> 国内网络环境不太好,其实问题是node.js 安装有问题,可以考虑单独安装一下node
> js和npm,如果还是不行,在不需要webui的前提下,可以编译时候加上profile “-Pskip-webui-build”
> 来skip掉该部分的编译。
>
> 祝好
> 唐云
> 
> From: Ruguo Yu 
> Sent: Tuesday, January 12, 2021 14:00
> To: user-zh@flink.apache.org 
> Subject: Re: flink编译报错
>
> 试下这个命令
> mvn clean install -DskipTests -Dfast -Drat.skip=true -Dhaoop.version=2.7.6
> -Dinclude-hadoop -Dscala-2.11 -T2C
> 其中,-Dhaoop.version 为 hadoop 版本号
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

2021-01-12 Thread Yun Tang
Hi,

从异常日志看,应该是因为你的state.checkpoints.dir 或者说 
statebackend的checkpoint目录配置成了本地目录,checkpoint保存到了本地机器上,所以在failover 
restore的时候,必须得让原task部署回原来的机器才能正常运行。将state backend的checkpoint目录更换为一个DFS目录即可。


祝好
唐云

From: Carmen Free 
Sent: Tuesday, January 12, 2021 18:14
To: user-zh@flink.apache.org 
Subject: Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

你好,唐老师,谢谢解答。

不好意思,下面补充一下报错信息,刚才忘记说了。

主要报错信息如下,重新模拟了下:
2021-01-12 18:09:34,236 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
Custom Source -> Flat Map -> Timestamps/Watermarks (2/2)
(3b50a7ce56b408c2978260846b76a28a) switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4c107a3a.

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(2/2) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more

Caused by: java.io.FileNotFoundException:
/data/flink/checkpoints/b261de0447d59bb2f1db9c084f5b1a0b/chk-5/4a160901-8ddd-468e-a82d-6efcb8a9dff9
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
at
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at
org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:73)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more

这个文件夹在A节点(JM)上是有的,难道是访问权限问题吗?B节点无法访问A节点吗,有点奇怪啊,配置了ssh免密的啊,文件夹/data/flink/checkpoints访问权限也设置成了777

Yun Tang  于2021年1月12日周二 下午5:46写道:

> Hi
>
> Flink的容错机制是可以保证TM lost时候会尝试重启作业,“为何任务不能恢复”是需要看完整异常栈的,简单描述是无法帮助排查问题的。
>
> 祝好
> 唐云
> 
> From: Carmen Free 
> Sent: Tuesday, January 12, 2021 15:52
> To: user-zh@flink.apache.org 
> Subject: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
>
> hi,
>
> rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
>
> 1、环境说明
>
> flink版本:1.10.2
> 操作系统:centos 7
>
> 2、集群说明(当前模拟了2节点)
>
> 节点A   |  节点B
> 角色|   JM、TM|TM
> taskslot   |   4   | 4
>
> 3、statebackend配置
>
> # rocksdb作为状态后备
> state.backend: rocksdb
>
>
> # 存储快照的目录(暂时使用的本地目录)
> state.checkpoints.dir: file:///data/flink/checkpoints
>
>
> 4、

Flink SQL一个Job使用多个Catalog的例子

2021-01-12 Thread Luna Wong
大家好。
  我没有在官网找到个Job使用多个Catalog的例子。
  我想在一个Job里使用普通的Catalog注册个Kafka Source然后,将数据发送Iceberg Sink表。这个Sink表
注册在另一个Iceberg +  Hive 的Catalog 中。
注册代码如下。
 CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://kudu1:9083',
'clients'='2',
'property-version'='1',
'warehouse'='hdfs://ns1//user/hive/warehouse'
);
之所以使用两个Catalog是因为我发现Kafka Source无法注册到这种类型为Iceberg的Hive Catalog中。我必须得换一个Catalog。
目前在IDEA下我还没跑起来。

可爱的木兰。


flink 编译

2021-01-12 Thread penguin.
Hi,


请问有人知道怎么单独编译flink-runtime模块吗?
然后这样是否能把更改的部分直接在flink-dist包中的org.apache.flink.runtime目录下进行替换?
整体编译一次实在太慢了。
谢谢!


penguin

Re: flink 编译

2021-01-12 Thread tison
试试 mvn clean install -DskipTests -pl flink-runtime,flink-dist

Best,
tison.


penguin.  于2021年1月12日周二 下午9:44写道:

> Hi,
>
>
> 请问有人知道怎么单独编译flink-runtime模块吗?
> 然后这样是否能把更改的部分直接在flink-dist包中的org.apache.flink.runtime目录下进行替换?
> 整体编译一次实在太慢了。
> 谢谢!
>
>
> penguin


回复: flink 编译

2021-01-12 Thread hdxg1101300...@163.com
你好:
你可以在idea中mvn package 或者mvn install 相应的模块



hdxg1101300...@163.com
 
发件人: penguin.
发送时间: 2021-01-12 21:44
收件人: user-zh@flink.apache.org
主题: flink 编译
Hi,
 
 
请问有人知道怎么单独编译flink-runtime模块吗?
然后这样是否能把更改的部分直接在flink-dist包中的org.apache.flink.runtime目录下进行替换?
整体编译一次实在太慢了。
谢谢!
 
 
penguin


Re: FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 Thread Evan
你好,在数据库中,Float类型存的是个近似值,不能用类似于 = 或者 != 的比较语句,所以也不支持IN操作
希望能帮助到你 



 
From: jy l
Date: 2021-01-12 18:04
To: user-zh
Subject: FlinkSQL Filter Error With Float Column on flink-1.12.0
Hi:
Flink SQL filter data throw an exception,
code:
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.fromElements(
  (1.0f, 11.0f, 12.0f),
  (2.0f, 21.0f, 22.0f),
  (3.0f, 31.0f, 32.0f),
  (4.0f, 41.0f, 42.0f),
  (5.0f, 51.0f, 52.0f)
)
val settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build()
val tEnv = StreamTableEnvironment.create(env, settings)
tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2"))
val query =
  """
|select * from myTable where id in (1.0, 2.0, 3.0)
|""".stripMargin
tEnv.executeSql(query).print()
}
 
exception:
Exception in thread "main" java.lang.UnsupportedOperationException: class
org.apache.calcite.sql.type.SqlTypeName: FLOAT
at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703)
at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737)
at org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710)
at org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397)
at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237)
at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110)
at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157)
at org.apache.calcite.util.Sarg.printTo(Sarg.java:106)
at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709)
at
org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652)
at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502)
 
Why is that? How do i need to solve it?
 
thanks.


Re:flink waterMark 相关问题

2021-01-12 Thread anonnius
可以看一下 ExecutionConfig这个类
在 2021-01-12 17:55:47,"张锴"  写道:
>hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。


??????????????????????????watermark,????????????????watermark??,????????????????????????

2021-01-12 Thread ??????


??????flink waterMark ????????

2021-01-12 Thread Ball's Holy
hi , 
200flink.api.common.ExecutionConfig
 ??autoWatermarkInterval




--  --
??: "anonnius"

Re: Flink SQL一个Job使用多个Catalog的例子

2021-01-12 Thread Luna Wong
我已经解决了。USE其他Catalog再建表即可。

Luna Wong  于2021年1月12日周二 下午9:41写道:
>
> 大家好。
>   我没有在官网找到个Job使用多个Catalog的例子。
>   我想在一个Job里使用普通的Catalog注册个Kafka Source然后,将数据发送Iceberg Sink表。这个Sink表
> 注册在另一个Iceberg +  Hive 的Catalog 中。
> 注册代码如下。
>  CREATE CATALOG hive_catalog WITH (
> 'type'='iceberg',
> 'catalog-type'='hive',
> 'uri'='thrift://kudu1:9083',
> 'clients'='2',
> 'property-version'='1',
> 'warehouse'='hdfs://ns1//user/hive/warehouse'
> );
> 之所以使用两个Catalog是因为我发现Kafka Source无法注册到这种类型为Iceberg的Hive 
> Catalog中。我必须得换一个Catalog。
> 目前在IDEA下我还没跑起来。
>
> 可爱的木兰。


flink1.11.1 如何让多个log4j配置文件生效

2021-01-12 Thread nicygan
dear all:
 我的flink任务提交到yarn运行,
 默认生效的是日志配置是flink/conf中的log4j.properties。
 但我的应用jar包中还有一个log4j2.xml,这里面配置了KafkaAppend,要把日志发送到kafka。
 我要如果设置,才能让这两个配置文件都生效呢?
 哪位大侠有配置经验。



thanks
by nicygan


Re: flink waterMark 相关问题

2021-01-12 Thread 张锴
谢谢你

anonnius  于2021年1月13日周三 上午9:19写道:

> 可以看一下 ExecutionConfig这个类
> 在 2021-01-12 17:55:47,"张锴"  写道:
> >hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。
>


Re: flink waterMark 相关问题

2021-01-12 Thread 张锴
ok,谢谢你

Ball's Holy <873925...@qq.com> 于2021年1月13日周三 上午9:42写道:

> hi 张锴,
> 如果我没记错的话,默认时间间隔是200毫秒,具体设置细节可以看flink.api.common.ExecutionConfig
> 对应的属性autoWatermarkInterval
>
>
>
>
> -- 原始邮件 --
> 发件人: "anonnius" 发送时间: 2021年1月13日(星期三) 上午9:19
> 收件人: "user-zh" 主题: Re:flink waterMark 相关问题
>
>
>
> 可以看一下 ExecutionConfig这个类
> 在 2021-01-12 17:55:47,"张锴"  >hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。


Re: 为什么我这个打印的是上次的watermark,而是用的是这次的watermark呢,为什么打印的不是这次的呢

2021-01-12 Thread 赵一旦
图挂了。

何宗谨  于2021年1月13日周三 上午9:20写道:

>
>
>
>
>


Re: flink waterMark 相关问题

2021-01-12 Thread 张锴
我从ExecutionConfig找到了,private long autoWatermarkInterval =
0,并不是200毫秒,这个代表一个时间戳就代表一个watermark是吗

Ball's Holy <873925...@qq.com> 于2021年1月13日周三 上午9:42写道:

> hi 张锴,
> 如果我没记错的话,默认时间间隔是200毫秒,具体设置细节可以看flink.api.common.ExecutionConfig
> 对应的属性autoWatermarkInterval
>
>
>
>
> -- 原始邮件 --
> 发件人: "anonnius" 发送时间: 2021年1月13日(星期三) 上午9:19
> 收件人: "user-zh" 主题: Re:flink waterMark 相关问题
>
>
>
> 可以看一下 ExecutionConfig这个类
> 在 2021-01-12 17:55:47,"张锴"  >hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。


Re:Re: flink waterMark 相关问题

2021-01-12 Thread anonnius
在 StreamExecutionEnvironmennt的方法@PublicEvolving   public void 
setStreamTimeCharacteristic(TimeCharacteristic characteristic) {   
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);  
if (characteristic == TimeCharacteristic.ProcessingTime) { 
getConfig().setAutoWatermarkInterval(0);   } else 
{   getConfig().setAutoWatermarkInterval(200);
 }  }
在 2021-01-13 10:09:54,"张锴"  写道:
>我从ExecutionConfig找到了,private long autoWatermarkInterval =
>0,并不是200毫秒,这个代表一个时间戳就代表一个watermark是吗
>
>Ball's Holy <873925...@qq.com> 于2021年1月13日周三 上午9:42写道:
>
>> hi 张锴,
>> 如果我没记错的话,默认时间间隔是200毫秒,具体设置细节可以看flink.api.common.ExecutionConfig
>> 对应的属性autoWatermarkInterval
>>
>>
>>
>>
>> -- 原始邮件 --
>> 发件人: "anonnius"> 发送时间: 2021年1月13日(星期三) 上午9:19
>> 收件人: "user-zh"> 主题: Re:flink waterMark 相关问题
>>
>>
>>
>> 可以看一下 ExecutionConfig这个类
>> 在 2021-01-12 17:55:47,"张锴" > >hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。


?????? flink waterMark ????????

2021-01-12 Thread Ball's Holy
master??200??1.10watermark



--  --
??: ""

?????? flink waterMark ????????

2021-01-12 Thread Ball's Holy
master??200??1.10watermark




--  --
??: 
   "user-zh"



?????? ??????????????????????????watermark,????????????????watermark??,????????????????????????

2021-01-12 Thread ??????
3??,watermark,


--  --
??: 
   "user-zh"



Re: Re: flink waterMark 相关问题

2021-01-12 Thread 张锴
ok,明白了

anonnius  于2021年1月13日周三 上午10:20写道:

> 在 StreamExecutionEnvironmennt的方法@PublicEvolving   public
> void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
>  this.timeCharacteristic =
> Preconditions.checkNotNull(characteristic);  if
> (characteristic == TimeCharacteristic.ProcessingTime) {
>  getConfig().setAutoWatermarkInterval(0);   } else {
>  getConfig().setAutoWatermarkInterval(200);
>  }  }
> 在 2021-01-13 10:09:54,"张锴"  写道:
> >我从ExecutionConfig找到了,private long autoWatermarkInterval =
> >0,并不是200毫秒,这个代表一个时间戳就代表一个watermark是吗
> >
> >Ball's Holy <873925...@qq.com> 于2021年1月13日周三 上午9:42写道:
> >
> >> hi 张锴,
> >> 如果我没记错的话,默认时间间隔是200毫秒,具体设置细节可以看flink.api.common.ExecutionConfig
> >> 对应的属性autoWatermarkInterval
> >>
> >>
> >>
> >>
> >> -- 原始邮件 --
> >> 发件人: "anonnius" >> 发送时间: 2021年1月13日(星期三) 上午9:19
> >> 收件人: "user-zh" >> 主题: Re:flink waterMark 相关问题
> >>
> >>
> >>
> >> 可以看一下 ExecutionConfig这个类
> >> 在 2021-01-12 17:55:47,"张锴"  >> >hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。
>


Re: Re: flink waterMark 相关问题

2021-01-12 Thread Px New
private long autoWatermarkInterval = 200;

/**
 * Interval in milliseconds for sending latency tracking marks from
the sources to the sinks.
 */


张锴  于2021年1月13日周三 上午10:26写道:

> ok,明白了
>
> anonnius  于2021年1月13日周三 上午10:20写道:
>
> > 在 StreamExecutionEnvironmennt的方法@PublicEvolving   public
> > void setStreamTimeCharacteristic(TimeCharacteristic characteristic)
> {
> >  this.timeCharacteristic =
> > Preconditions.checkNotNull(characteristic);  if
> > (characteristic == TimeCharacteristic.ProcessingTime) {
> >  getConfig().setAutoWatermarkInterval(0);   } else {
> >  getConfig().setAutoWatermarkInterval(200);
> >  }  }
> > 在 2021-01-13 10:09:54,"张锴"  写道:
> > >我从ExecutionConfig找到了,private long autoWatermarkInterval =
> > >0,并不是200毫秒,这个代表一个时间戳就代表一个watermark是吗
> > >
> > >Ball's Holy <873925...@qq.com> 于2021年1月13日周三 上午9:42写道:
> > >
> > >> hi 张锴,
> > >> 如果我没记错的话,默认时间间隔是200毫秒,具体设置细节可以看flink.api.common.ExecutionConfig
> > >> 对应的属性autoWatermarkInterval
> > >>
> > >>
> > >>
> > >>
> > >> -- 原始邮件 --
> > >> 发件人: "anonnius" > >> 发送时间: 2021年1月13日(星期三) 上午9:19
> > >> 收件人: "user-zh" > >> 主题: Re:flink waterMark 相关问题
> > >>
> > >>
> > >>
> > >> 可以看一下 ExecutionConfig这个类
> > >> 在 2021-01-12 17:55:47,"张锴"  > >> >hi,我使用的flink版本是1.10,我想问一下watermark的默认时间间隔是多少,以及所对应的源码位置在哪里,我并没有找到。
> >
>


flink sql读kafka元数据问题

2021-01-12 Thread 酷酷的浑蛋
WITH("`event_time` TIMESTAMP(3) METADATA FROM 'timestamp'," +
"`partition` BIGINT METADATA VIRTUAL," +
"`offset` BIGINT METADATA VIRTUAL," +
"`headers` MAP NOT NULL METADATA VIRTUAL,”+
在这里获取kakfa元数据时,官网没有说怎么获取kafka消息的key?,headers的信息是空的,请问怎么在flink sql中获取kafka消息key?





Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

2021-01-12 Thread Carmen Free
Hi 唐老师,

我又重新尝试了新的场景。

我将集群起来后,然后使A节点的TM失效( 此时集群中仅有A节点的JM, B节点的TM ),这个时候在flnk web
ui界面提交新的任务,任务被调度到B节点TM,可以发现任务一直正常运行,但是trigger检查点快照时一直不成功。报错跟前文中描述恢复检查点时的错误很类似,只不过恢复检查点时,是找不到chk-xx文件,在这里是无法创建chk-xx文件。

具体报错如下:
2021-01-13 10:33:10,042 WARN
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
late message for now expired checkpoint attempt 901 from task
709c3296b490f2e9cb3179ac4d9bbadb of job 14cdc07090d9f22445ba61144f6edf4b at
a0c28dc25b207ca789592018235fda56 @ node204 (dataPort=34389).
2021-01-13 10:33:12,836 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 902 @ 1610505192836 for job 14cdc07090d9f22445ba61144f6edf4b.
2021-01-13 10:33:12,844 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
checkpoint 902 by task a7f9e9a63f1857b02a9e52c9d6bfefc7 of job
14cdc07090d9f22445ba61144f6edf4b at a0c28dc25b207ca789592018235fda56 @
node204 (dataPort=34389).
2021-01-13 10:33:12,844 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 902 of job 14cdc07090d9f22445ba61144f6edf4b.

java.lang.Exception: Could not materialize checkpoint 902 for operator
Source: Custom Source -> Flat Map -> Timestamps/Watermarks (1/2).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1221)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1163)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to null in order to
obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1126)
... 3 more

Caused by: java.io.IOException: Could not flush and close the file system
output stream to null in order to obtain the stream state handle
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:334)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
... 5 more

Caused by: java.io.IOException: Could not open output stream for state
backend
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309)
... 10 more

Caused by: java.io.IOException: Mkdirs failed to create
file:/data/flink/checkpoints/14cdc07090d9f22445ba61144f6edf4b/chk-902
at
org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:269)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
at
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
... 12 more

那么也就是说,如果JM节点不同时作为TM节点时,checkpoint根本无法触发,更无从谈及failover
restore了。因此我们可以认为flink仅支持statebackend的checkpoint目录是DFS时,才可以对状态恢复有比较好的支持是吗?否则,就意味着,JM节点必须同时作为TM,并且每次任务要调度至该节点上运行,才可以触发checkpoint,最终也只能在该节点进行failover
restore。感觉这样的话,就确实不太适合配置本地目录作为state.checkpoints.dir 了。

另外,您在回复中也提到,在使用本地目录作为 state.checkpoints.dir 时,
“必须得让原task部署回原来的机器才能正常运行”,实际上意味着任务要被调度至JM所在节点才可以。然后,任务调度时,flink这边能人为指定任务的分配吗?我看前边的讨论,好像flink的任务调度基本是自动完成的,人为干预难度较大是吧?

祝好



Yun Tang  于2021年1月12日周二 下午7:11写道:

> Hi,
>
> 从异常日志看,应该是因为你的state.checkpoints.dir 或者说
> statebackend的checkpoint目录配置成了本地目录,checkpoint保存到了本地机器上,所以在failover
> restore的时候,必须得让原task部署回原来的机器才能正常运行。将state backend的checkpoint目录更换为一个DFS目录即可。
>
>
> 祝好
> 唐云
> 
> From: Carmen Free 
> Sent: Tuesday,

flink源码项目导入idea,无法下载插件和依赖

2021-01-12 Thread penguin.
如图,在网上查找配了好几个maven镜像了,求助!!!



idea导入flink源码问题

2021-01-12 Thread penguin.
已经在maven的setting文件中配置了好几个镜像了,还是这样。如下图





如何修改FlinkSQL中DATE类型对应的Conversion类?

2021-01-12 Thread automths
Hi:

Flink SQL 
中,每种数据类型都有默认的Conversion类,有些数据类型支持多种,比如DATE,支持java.sql.Date、java.time.LocalDate、Integer。而DATE默认的是java.time.LocalDate,我现在希望在将其更改为java.sql.Date,可以通过什么设置将其更改?




祝好!
automths





Re: idea导入flink源码问题

2021-01-12 Thread Carmen Free
hi,

图看不见。

penguin.  于2021年1月13日周三 下午1:19写道:

> 已经在maven的setting文件中配置了好几个镜像了,还是这样。如下图
>
>
>
>
>
>


Re:Re: idea导入flink源码问题

2021-01-12 Thread penguin.
Hi,
图好像是有点问题,我重传一下

















在 2021-01-13 13:42:27,"Carmen Free"  写道:
>hi,
>
>图看不见。
>
>penguin.  于2021年1月13日周三 下午1:19写道:
>
>> 已经在maven的setting文件中配置了好几个镜像了,还是这样。如下图
>>
>>
>>
>>
>>
>>


Re:Re: idea导入flink源码问题

2021-01-12 Thread penguin.
好像图放在正文里面有问题,放在附件里了











在 2021-01-13 13:42:27,"Carmen Free"  写道:
>hi,
>
>图看不见。
>
>penguin.  于2021年1月13日周三 下午1:19写道:
>
>> 已经在maven的setting文件中配置了好几个镜像了,还是这样。如下图
>>
>>
>>
>>
>>
>>


Re:Re: idea导入flink源码问题(看不到图片,贴了下文字)

2021-01-12 Thread penguin.
贴不了图,我直接放文字吧



▼θSync: at 2021/1/13 12:05 with 18 errors

   ▼Resolve dependencies 4 errors 

 Cannot resolve netminidev:json-smart:2.3

 Cannot resolve io.confluent:kafka-schema-registry-client:4.1.0

 Cannot resolve com.nimbusds:nimbus-jose-jwt:9.4.1
 Cannot resolve com.nimbusds:lang-tag:1.5 
   ▼Resolve plugins 14 errors
Cannot resolve plugin org.codehaus.mojo:build-helper-maven-plugin: 

















在 2021-01-13 13:42:27,"Carmen Free"  写道:
>hi,
>
>图看不见。
>
>penguin.  于2021年1月13日周三 下午1:19写道:
>
>> 已经在maven的setting文件中配置了好几个镜像了,还是这样。如下图
>>
>>
>>
>>
>>
>>


Re: 为什么我这个打印的是上次的watermark,而是用的是这次的watermark呢,为什么打印的不是这次的呢

2021-01-12 Thread 赵一旦
描述还是不清晰。
watermark是定期生成,你获取的时候不一定已经更新。

何宗谨  于2021年1月13日周三 上午10:20写道:

>
> 允许的时间间隔是3秒,每次打印的都是上一个时间戳的watermark,但是使用的好像还是这次的
>
> -- 原始邮件 --
> *发件人:* "user-zh" ;
> *发送时间:* 2021年1月13日(星期三) 上午10:02
> *收件人:* "user-zh";
> *主题:* Re: 为什么我这个打印的是上次的watermark,而是用的是这次的watermark呢,为什么打印的不是这次的呢
>
> 图挂了。
>
> 何宗谨  于2021年1月13日周三 上午9:20写道:
>
> >
> >
> >
> >
> >
>


Flink sql cli 查询hive 为两个字段的分区表如果where条件不指定这两个字段条件会出异常

2021-01-12 Thread yujianbo
生产的hive表由datekey和event两个字段作为分区,查询sql语句:
(1)第一组对比
SELECT vid From table_A WHERE datekey = '20210112' AND event = 'XXX' AND vid
= 'aa';(*正常*)
SELECT vid From table_A WHERE datekey = '20210112' AND vid = 'aa';    
(*异常*)
SELECT vid From table_A WHERE datekey = '20210112';    (*正常*)

(2)第二组对比
SELECT vid From table_B WHERE datekey = '20210112' AND event = 'YYY' AND vid
= 'bb';(*正常*)
SELECT vid From table_B WHERE datekey = '20210112' AND vid = 'bb';
(*异常*)
SELECT vid From table_B WHERE datekey = '20210112';(*正常*)

报错异常为:
java.lang.RuntimeException: One or more fetchers have encountered exception
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:273)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: SplitFetcher thread 19 received
unexpected exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.lang.IllegalArgumentException
at java.nio.Buffer.position(Buffer.java:244)
at
org.apache.flink.hive.shaded.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:424)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:79)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:33)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:199)
at
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:359)
at
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:328)
at
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
... 6 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sql cli 查询hive 为两个字段的分区表如果where条件不指定这两个字段条件会出异常

2021-01-12 Thread yujianbo
生产的hive表由datekey和event两个字段作为分区,查询sql语句:
(1)第一组对比
SELECT vid From table_A WHERE datekey = '20210112' AND event = 'XXX' AND vid
= 'aa';(*正常*)
SELECT vid From table_A WHERE datekey = '20210112' AND vid = 'aa';
(*异常*)

(2)第二组对比
SELECT vid From table_B WHERE datekey = '20210112' AND event = 'YYY' AND vid
= 'bb';(*正常*)
SELECT vid From table_B WHERE datekey = '20210112' AND vid = 'bb';
(*异常*)

报错异常为:
java.lang.RuntimeException: One or more fetchers have encountered exception
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:273)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: SplitFetcher thread 19 received
unexpected exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.lang.IllegalArgumentException
at java.nio.Buffer.position(Buffer.java:244)
at
org.apache.flink.hive.shaded.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:424)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:79)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.BytesColumnReader.readBatchFromDictionaryIds(BytesColumnReader.java:33)
at
org.apache.flink.hive.shaded.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:199)
at
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:359)
at
org.apache.flink.hive.shaded.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:328)
at
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
... 6 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

2021-01-12 Thread Yun Tang
Hi

其实问题本质很简单,作为一个分布式计算引擎,需要一个分布式文件系统才能保证可以做到分布式检查点的创建和恢复 
[1]。没有必要再尝试将checkpoint写到本地磁盘上,否则一旦发生failover是无法正常恢复的。


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#prerequisites

祝好
唐云

From: Carmen Free 
Sent: Wednesday, January 13, 2021 11:28
To: user-zh@flink.apache.org 
Subject: Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

Hi 唐老师,

我又重新尝试了新的场景。

我将集群起来后,然后使A节点的TM失效( 此时集群中仅有A节点的JM, B节点的TM ),这个时候在flnk web
ui界面提交新的任务,任务被调度到B节点TM,可以发现任务一直正常运行,但是trigger检查点快照时一直不成功。报错跟前文中描述恢复检查点时的错误很类似,只不过恢复检查点时,是找不到chk-xx文件,在这里是无法创建chk-xx文件。

具体报错如下:
2021-01-13 10:33:10,042 WARN
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
late message for now expired checkpoint attempt 901 from task
709c3296b490f2e9cb3179ac4d9bbadb of job 14cdc07090d9f22445ba61144f6edf4b at
a0c28dc25b207ca789592018235fda56 @ node204 (dataPort=34389).
2021-01-13 10:33:12,836 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 902 @ 1610505192836 for job 14cdc07090d9f22445ba61144f6edf4b.
2021-01-13 10:33:12,844 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
checkpoint 902 by task a7f9e9a63f1857b02a9e52c9d6bfefc7 of job
14cdc07090d9f22445ba61144f6edf4b at a0c28dc25b207ca789592018235fda56 @
node204 (dataPort=34389).
2021-01-13 10:33:12,844 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 902 of job 14cdc07090d9f22445ba61144f6edf4b.

java.lang.Exception: Could not materialize checkpoint 902 for operator
Source: Custom Source -> Flat Map -> Timestamps/Watermarks (1/2).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1221)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1163)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to null in order to
obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1126)
... 3 more

Caused by: java.io.IOException: Could not flush and close the file system
output stream to null in order to obtain the stream state handle
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:334)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
... 5 more

Caused by: java.io.IOException: Could not open output stream for state
backend
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309)
... 10 more

Caused by: java.io.IOException: Mkdirs failed to create
file:/data/flink/checkpoints/14cdc07090d9f22445ba61144f6edf4b/chk-902
at
org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:269)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
at
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
... 12 more

那么也就是说,如果JM节点不同时作为TM节点时,checkpoint根本无法触发,更无从谈及failover
restore了。因此我们可以认为flink仅支持statebackend的checkpoint目录是DFS时,才可以对状态恢复有比较好的支持是吗?否则,就意味着,JM节点必须同时作为TM,并且每次任务要调度至该节点上运行,才可以触发checkpoint,最终也只能在该节点进行failover
restore。感觉这样的话,就确实不太适合配置本地目录作为state.checkpoints.dir 了。

另外,您在回复中也提到,在使用本地目录作为 state.checkpoints.dir 时,
“必须得让原task部署回原来的机器才能正常运行”,实际上意味着

Flink 1.12 Too old resource version

2021-01-12 Thread 吴松
Flink 版本: flink1.12.0

K8s版本: 1.17

Docker 镜像:  apache/flink:1.12.0-scala_2.11


使用以上的docker 镜像在k8s中 session模式下还是会报: "Too old resource version" exception in 
Kubernetes watch more gracefully



Jira上也有对应的issue, 对于1.12.0 版本如何修复呢?







Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

2021-01-12 Thread Carmen Free
Hi 唐老师,

谢谢解惑,明白了。

祝好


Yun Tang  于2021年1月13日周三 下午2:59写道:

> Hi
>
> 其实问题本质很简单,作为一个分布式计算引擎,需要一个分布式文件系统才能保证可以做到分布式检查点的创建和恢复
> [1]。没有必要再尝试将checkpoint写到本地磁盘上,否则一旦发生failover是无法正常恢复的。
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#prerequisites
>
> 祝好
> 唐云
> 
> From: Carmen Free 
> Sent: Wednesday, January 13, 2021 11:28
> To: user-zh@flink.apache.org 
> Subject: Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
>
> Hi 唐老师,
>
> 我又重新尝试了新的场景。
>
> 我将集群起来后,然后使A节点的TM失效( 此时集群中仅有A节点的JM, B节点的TM ),这个时候在flnk web
>
> ui界面提交新的任务,任务被调度到B节点TM,可以发现任务一直正常运行,但是trigger检查点快照时一直不成功。报错跟前文中描述恢复检查点时的错误很类似,只不过恢复检查点时,是找不到chk-xx文件,在这里是无法创建chk-xx文件。
>
> 具体报错如下:
> 2021-01-13 10:33:10,042 WARN
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
> late message for now expired checkpoint attempt 901 from task
> 709c3296b490f2e9cb3179ac4d9bbadb of job 14cdc07090d9f22445ba61144f6edf4b at
> a0c28dc25b207ca789592018235fda56 @ node204 (dataPort=34389).
> 2021-01-13 10:33:12,836 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 902 @ 1610505192836 for job 14cdc07090d9f22445ba61144f6edf4b.
> 2021-01-13 10:33:12,844 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
> checkpoint 902 by task a7f9e9a63f1857b02a9e52c9d6bfefc7 of job
> 14cdc07090d9f22445ba61144f6edf4b at a0c28dc25b207ca789592018235fda56 @
> node204 (dataPort=34389).
> 2021-01-13 10:33:12,844 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
> checkpoint 902 of job 14cdc07090d9f22445ba61144f6edf4b.
>
> java.lang.Exception: Could not materialize checkpoint 902 for operator
> Source: Custom Source -> Flat Map -> Timestamps/Watermarks (1/2).
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1221)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1163)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> Could not flush and close the file system output stream to null in order to
> obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
> at
>
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1126)
> ... 3 more
>
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to null in order to obtain the stream state handle
> at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:334)
> at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
> at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> at
>
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
> ... 5 more
>
> Caused by: java.io.IOException: Could not open output stream for state
> backend
> at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
> at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
> at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309)
> ... 10 more
>
> Caused by: java.io.IOException: Mkdirs failed to create
> file:/data/flink/checkpoints/14cdc07090d9f22445ba61144f6edf4b/chk-902
> at
>
> org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:269)
> at
>
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
> at
>
> org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
> at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
> ... 12 more
>
> 那么也就是说,如果JM节点不同时作为TM节点时,checkpoint根本无法触发,更无从

Re: Re: idea导入flink源码问题(看不到图片,贴了下文字)

2021-01-12 Thread Carmen Free
建议你把Maven的settings.xml以及你项目的pom.xml也贴出来看看,大家才好帮你定位问题

penguin.  于2021年1月13日周三 下午2:25写道:

> 贴不了图,我直接放文字吧
>
>
>
> ▼θSync: at 2021/1/13 12:05 with 18 errors
>
>▼Resolve dependencies 4 errors
>
>  Cannot resolve netminidev:json-smart:2.3
>
>  Cannot resolve io.confluent:kafka-schema-registry-client:4.1.0
>
>  Cannot resolve com.nimbusds:nimbus-jose-jwt:9.4.1
>  Cannot resolve com.nimbusds:lang-tag:1.5
>▼Resolve plugins 14 errors
> Cannot resolve plugin org.codehaus.mojo:build-helper-maven-plugin:
> 
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-13 13:42:27,"Carmen Free"  写道:
> >hi,
> >
> >图看不见。
> >
> >penguin.  于2021年1月13日周三 下午1:19写道:
> >
> >> 已经在maven的setting文件中配置了好几个镜像了,还是这样。如下图
> >>
> >>
> >>
> >>
> >>
> >>
>


Re: flink sql读kafka元数据问题

2021-01-12 Thread JasonLee
hi

你写入数据的时候设置 headers 了吗 没设置的话当然是空的了



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink watermark 为负数的异常

2021-01-12 Thread 张锴
hi,我通过webUI查看了当前的程序,watermark变为-2,一直没变过,出现这种情况是什么原因,一头雾水。截图在附件里,帮忙分析一下。


Re: flink-kafka-sink

2021-01-12 Thread r pp
hi,没有效果 具体是啥?

cxx <1156531...@qq.com> 于2021年1月7日周四 上午9:53写道:

>  我从kafka消费一条数据,然后将消息进行切分,再发送到下游的kafka中,但是这样不能保证在一个事务里面。
> 例如:我将一条数据切分成10条,然后再第五条的时候抛出一个异常,但是前四条已经发送到下游的kafka了。
> 我设置了事务id,隔离级别,client
> id,enable.idempotence,max.in.flight.requests.per.connection,retries
> 但是没有效果。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best,
  pp