[External] naming table stages

2021-07-27 Thread Clemens Valiente
Is it possible to rename execution stages from the Table API? Right now the
entire select transformation appears in plaintext in the task name so the
log entries from ExecutionGraph are over 10,000 characters long and the log
files are incredibly difficult to read.
for example a simple selected field shows up as
Calc(select=[(((_UTF-16LE'code = ' POSITION (FLAG(BOTH) TRIM _UTF-16LE' '
TRIM FLAG(BOTH) TRIM _UTF-16LE' ' TRIM extraInfo.loginRequestID) =
_UTF-16LE'') OR ((FLAG(BOTH) TRIM _UTF-16LE' ' TRIM
extraInfo.loginRequestID) = _UTF-16LE'None')) CASE null:VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" CASE extraInfo.loginRequestID))) = 1) CASE
null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CASE FLAG(BOTH) TRIM
_UTF-16LE' ' TRIM extraInfo.loginRequestID) = _UTF-16LE'') OR ((FLAG(BOTH)
TRIM _UTF-16LE' ' TRIM extraInfo.loginRequestID) = _UTF-16LE'None')) CASE
null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CASE
extraInfo.loginRequestID)) AS loginRequestId
and we have about a dozen of those, and they're all printed out for every
single log line.
Is there any way of shortening this without having to suppress these log
lines completely?

Best Regards
Clemens

-- 


By communicating with Grab Inc and/or its subsidiaries, associate 
companies and jointly controlled entities (“Grab Group”), you are deemed to 
have consented to the processing of your personal data as set out in the 
Privacy Notice which can be viewed at https://grab.com/privacy/ 



This email contains confidential information 
and is only for the intended recipient(s). If you are not the intended 
recipient(s), please do not disseminate, distribute or copy this email 
Please notify Grab Group immediately if you have received this by mistake 
and delete this email from your system. Email transmission cannot be 
guaranteed to be secure or error-free as any information therein could be 
intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain 
viruses. Grab Group do not accept liability for any errors or omissions in 
the contents of this email arises as a result of email transmission. All 
intellectual property rights in this email and attachments therein shall 
remain vested in Grab Group, unless otherwise provided by law.



Re: [External] NullPointerException on accumulator after Checkpointing

2021-07-27 Thread Clemens Valiente
Hi Timo,
thanks for the help here, wrapping the MapView in a case class indeed
solved the problem.
It was not immediately apparent from the documentation that using a MapView
as top level accumulator would cause an issue. it seemed a straightforward
intuitive way to use it :)

Cheers
Clemens

On Wed, Jul 14, 2021 at 10:19 PM Timo Walther  wrote:

> Hi Clemens,
>
> first of all can you try to use the MapView within an accumulator POJO
> class. This might solve your exception. I'm not sure if we support the
> views as top-level accumulators.
>
> In any case this seems to be a bug. I will open an issue once I get you
> feedback. We might simply throw an exception for top-level usage then.
>
> Regards,
> Timo
>
>
>
> On 14.07.21 06:33, Clemens Valiente wrote:
> > Hi,
> >
> > we created a new AggregateFunction with Accumulator as Mapview as follows
> >
> > class CountDistinctAggFunction[T] extends
> > AggregateFunction[lang.Integer, MapView[T, lang.Integer]] {
> >
> >override def createAccumulator(): MapView[T, lang.Integer] = {
> >  new MapView[T, lang.Integer]()
> >}
> > ...
> >
> > We had NullPointerExceptions happening on
> >
> > getValue(accumulator: MapView[T, lang.Integer]): lang.Integer
> >
> > and
> >
> > def accumulate(accumulator: MapView[T, lang.Integer], key: T): Unit
> = {
> >
> > so I added null checks there.
> >
> > Unfortunately the NPEs are still happening, right after triggering
> > checkpointing
> >
> > 2021-07-14 04:01:22,340 INFO
> >   org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > Triggering checkpoint 1 (type=CHECKPOINT) @ 1626235282013 for job
> > 0cbe21cce72742ec8e5
> > e6786aa6b44ca.
> > 2021-07-14 04:02:52,249 INFO
> >   org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> > OverAggregate(partitionBy=[entityID], orderBy=[eventTime ASC],
> > window=[ RANG BETWEEN
> > 360 PRECEDING AND CURRENT ROW],
> >
>  select=[] 
> app$functions$table$CountDistinctAggFunction$47dfbce463746500de0b303cff5c947b
> > AS w0$o0) (5/8) (39c23e4703862c39e513dcb5fd629fb4) switched from
> >   RUNNING to FAILED on 10.195.174.180:6122-ba5b74 (dataPort=45665).
> > java.lang.NullPointerException: null
> >  at
> >
>  
> org.apache.flink.table.data.conversion.RawObjectConverter.toExternal(RawObjectConverter.java:49)
> > ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >  at BoundedOverAggregateHelper$946.setAccumulators(Unknown
> > Source) ~[?:?]
> >  at
> >
>  
> org.apache.flink.table.runtime.operators.over.RowTimeRangeBoundedPrecedingFunction.onTimer(RowTimeRangeBoundedPrecedingFunction.java:224)
> > ~[feature_hydra-assembly-master-
> > 25903391.jar:master-25903391]
> >  at
> >
>  
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
> > ~[feature_hydra-assembly-master-25903391.jar:master-2590339
> > 1]
> >  at
> >
>  
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
> > ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >  at
> >
>  
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
> > ~[feature_hydra-assembly-master-25903391.jar:master-
> > 25903391]
> >  at
> >
>  
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
> > ~[feature_hydra-assembly-master-25903391
> > .jar:master-25903391]
> >  at
> >
>  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
> > ~[feature_hydra-assembly-master-25903391.jar:master-2590
> > 3391]
> >  at
> >
>  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:197)
> > ~[feature_hydra-assembly-master-25903391.ja
> > r:master-25903391]
> >  at
> >
>  
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
> > ~[feature_hydra-asse
> > mbly-master-25903391.jar:master-25903391]
> >  at
> >
>  
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
> > ~[feature_hydra-assembly-master-25903391.jar:master-259
> > 03391]
> >  at
> > org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
> > ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >  at
> > org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
> >

Re: Need help when using kubernetes.jobmanager.annotations

2021-07-27 Thread Yang Wang
Could you please ensure that you are using the native Kubernetes mode[1]?

For standalone on K8s[2], you need to manually set the annotation in the
jobmanager yaml file.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/
[2].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/

Best,
Yang

Manong Karl  于2021年7月27日周二 下午3:47写道:

> Hi Team, I have set the "kubernetes.jobmanager.annotations". But I can't
> find these in the k8s deployment. As these can be found in the job manager
> pod.
> Is it by design or just be missed?
>


Re: flink 触发保存点失败

2021-07-27 Thread 龙逸尘
Hi,
之前遇到过这个 jobid 为 0 的报错情况。我们的场景是是任务开启了基于 zk 的 ha,但是使用未配置 ha 的 flink
client 去运行 savepoint 命令。
可以考虑下是否是相同的问题。


Michael Ran  于2021年7月23日周五 上午10:43写道:

> 有没可能是文件的问题,比如写入权限之类的?
> 在 2021-07-13 17:31:19,"仙剑……情动人间" <1510603...@qq.com.INVALID> 写道:
> >Hi All,
> >
> >
> >  我触发Flink
> 保存点总是失败,报错如下,一直说是超时,但是没有进一步的信息可以查看,我查资料说可以设置checkpoint超时时间,我设置了2min,但是触发
> >保存点时在2min之前就会报错,另外我的 状态 并不大
> > 
> >
> >
> >
> >The program finished with the following exception:
> >
> >
> >org.apache.flink.util.FlinkException: Triggering a savepoint for the job
>  failed.
> >   at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:777)
> >   at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:754)
> >   at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
> >   at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:751)
> >   at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1072)
> >   at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> >   at java.security.AccessController.doPrivileged(Native Method)
> >   at javax.security.auth.Subject.doAs(Subject.java:422)
> >   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> >   at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >   at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> >Caused by: java.util.concurrent.TimeoutException
> >   at
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
> >   at
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
> >   at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
> >   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >   at java.lang.Thread.run(Thread.java:748)
>


Re: 如何监控kafka延迟

2021-07-27 Thread 龙逸尘
Hi comsir,
采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。
group  id 需要自己维护。

comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道:

> hi all
> 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况??
> 监控这个延迟的目的:1.大盘展示,2.延迟后报警
> 小问题:
> 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标?
> 2.怎么获取groupId呢,多个group消费的话,如何区分呀?
> 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗?
> 4.有比较优雅的实现方式吗?
> 非常感谢 期待解答 感谢感谢


退订

2021-07-27 Thread 赵珠峰
退订



本邮件载有秘密信息,请您恪守保密义务。未经许可不得披露、使用或允许他人使用。谢谢合作。
This email contains confidential information. Recipient is obliged to keep the 
information confidential. Any unauthorized disclosure, use, or distribution of 
the information in this email is strictly prohibited. Thank you.


退订

2021-07-27 Thread morris996
退订


| |
morris996
|
|
morris...@163.com
|
签名由网易邮箱大师定制

Re: Flink 1.10 内存问题

2021-07-27 Thread Ada Luna
最后我发现问题的根源是双流JOIN没设置TTL。双流JOIN task的 OutputBuffer会被打满。然后Flink就处于假死状态了。不再消费任何数据。

Ada Luna  于2021年7月19日周一 下午7:06写道:
>
> 异步IO的Order队列打满,导致算子卡死?
>
> Ada Luna  于2021年7月19日周一 下午2:02写道:
> >
> > 我通过反压信息观察到,这个 async wait operator
> > 算子上游全部出现严重反压。很有可能是这个算子死锁或者死循环等类似问题。但是我还不知道如何进一步排查。
> >
> > "async wait operator -> (where: (=(CASE(>(ABS(Z), WaterRoseMax_5), 1,
> > 0), 0)), select: (ID, STID, _UTF-16LE'' AS STCD, ITEM, VAL, UNIT,
> > DATATIME AS DT, _UTF-16LE'2.6' AS DATAINDEX, currtime2(DATATIME,
> > _UTF-16LE'-MM-dd HH:mm:ss') AS CREATETIME, 0 AS ISFILL, 1 AS
> > STATE, 0E0 AS P5) -> to: Tuple2, where: (=(CASE(>(ABS(Z),
> > WaterRoseMax_5), 1, 0), 1)), select: (STID, DATATIME, _UTF-16LE'2' AS
> > DATATYPE, currtime2(DATATIME, _UTF-16LE'-MM-dd HH:mm:ss') AS
> > VALTIME, _UTF-16LE'' AS CORTIME, VAL AS CORBERVAL, _UTF-16LE'' AS
> > CORAFTVAL, _UTF-16LE'' AS CORNAME, _UTF-16LE'' AS CORPHONE, 0 AS
> > STATE, 0 AS ISFILL, 1 AS TYPE) -> to: Tuple2, where: (=(CASE(>(ABS(Z),
> > WaterRoseMax_5), 1, 0), 0)), select: (ID, STID, _UTF-16LE'' AS STCD,
> > ITEM, VAL, UNIT, DATATIME AS DT, _UTF-16LE'2.6' AS DATAINDEX,
> > currtime2(DATATIME, _UTF-16LE'-MM-dd HH:mm:ss') AS CREATETIME, 0
> > AS ISFILL, 1 AS STATE, 0E0 AS P5) -> to: Tuple2) (2/2)" #82 prio=5
> > os_prio=0 tid=0x7fd4c4ac5000 nid=0x21c3 in Object.wait()
> > [0x7fd4d5416000]
> > java.lang.Thread.State: WAITING (on object monitor)
> > at java.lang.Object.wait(Native Method)
> > at java.lang.Object.wait(Object.java:502)
> > at 
> > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addAsyncBufferEntry(AsyncWaitOperator.java:403)
> > at 
> > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:224)
> > at 
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> > - locked <0x00074cb5b3a0> (a java.lang.Object)
> > at 
> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > at java.lang.Thread.run(Thread.java:748)
> >
> > "async wait operator -> (where: (=(CASE(>(ABS(Z), WaterRoseMax_5), 1,
> > 0), 0)), select: (ID, STID, _UTF-16LE'' AS STCD, ITEM, VAL, UNIT,
> > DATATIME AS DT, _UTF-16LE'2.6' AS DATAINDEX, currtime2(DATATIME,
> > _UTF-16LE'-MM-dd HH:mm:ss') AS CREATETIME, 0 AS ISFILL, 1 AS
> > STATE, 0E0 AS P5) -> to: Tuple2, where: (=(CASE(>(ABS(Z),
> > WaterRoseMax_5), 1, 0), 1)), select: (STID, DATATIME, _UTF-16LE'2' AS
> > DATATYPE, currtime2(DATATIME, _UTF-16LE'-MM-dd HH:mm:ss') AS
> > VALTIME, _UTF-16LE'' AS CORTIME, VAL AS CORBERVAL, _UTF-16LE'' AS
> > CORAFTVAL, _UTF-16LE'' AS CORNAME, _UTF-16LE'' AS CORPHONE, 0 AS
> > STATE, 0 AS ISFILL, 1 AS TYPE) -> to: Tuple2, where: (=(CASE(>(ABS(Z),
> > WaterRoseMax_5), 1, 0), 0)), select: (ID, STID, _UTF-16LE'' AS STCD,
> > ITEM, VAL, UNIT, DATATIME AS DT, _UTF-16LE'2.6' AS DATAINDEX,
> > currtime2(DATATIME, _UTF-16LE'-MM-dd HH:mm:ss') AS CREATETIME, 0
> > AS ISFILL, 1 AS STATE, 0E0 AS P5) -> to: Tuple2) (1/2)" #81 prio=5
> > os_prio=0 tid=0x7fd4c4ac3000 nid=0x21c2 in Object.wait()
> > [0x7fd4d5517000]
> > java.lang.Thread.State: WAITING (on object monitor)
> > at java.lang.Object.wait(Native Method)
> > at java.lang.Object.wait(Object.java:502)
> > at 
> > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539)
> > - locked <0x00074cb5d560> (a java.util.ArrayDeque)
> > at 
> > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
> > at 
> > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:165)
> > at 
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > at 
> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > at java.lang.Thread.run(Thread.java:748)
> >
> > Yun Tang  于2021年7月6日周二 下午4:01写道:
> > >
> > > Hi,
> > >
> > > 有可能的,如果下游发生了死锁,无法消费任何数据的话,整个作业就假死了。要定位root cause还是需要查一下下游的task究竟怎么了
> > >
> > > 祝好
> > > 唐云
> > > 
> > > From: Ada Luna 
> > > Sent: Tuesday, July 6, 2021 12:04
> > > To: user-zh@flink.apache.org 
> > > Subject: Re: Flink 1.10 内存问题
> > >
> > > 反压会导致整个Flink任务假死吗?一条Kafka数据都不消费了。持续几天,不重启不恢复的
> > >
> > > Yun Tang  于2021年7月6日周二 上午11:12写道:
> > > >
> > > > Hi,
> > > >
> > > > LocalBufferPool.requestMemorySegment 
> > > > 这个方法并不是在申请内存,而是因为作业存在反压,因为下游没有及时消费,相关buffer被占用,所以上游会卡在requestMemorySegment上面。
> > > >
> > > > 想要解决还是查一下为什么下游会反压。
> > > >
> > > >
> > > > 祝好
> > > > 唐云
> > > > 

Re: ImportError: No module named pyflink

2021-07-27 Thread Dian Fu
Hi,

You need to make sure that PyFlink is available in the cluster nodes. There are 
a few ways to achieve this, e.g.
- Install PyFlink on all the cluster nodes
- Install PyFlink in a virtual environment and specify it via python archive [1]

Regards,
Dian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-interpreter

> 2021年7月27日 下午3:40,Curt Buechter  写道:
> 
> This feels like the simplest error, but I'm struggling to get past it. I can 
> run pyflink jobs locally just fine by submitting them either via `python 
> sample.py` or `flink run --target local -py sample.py`. But, when I try to 
> execute on a remote worker node, it always fails with this error:
> 
> table_environment.execute_sql(query).print()
>   File 
> "/opt/flinks/1.13.1/flink-1.13.1/opt/python/pyflink.zip/pyflink/table/table_result.py",
>  line 219, in print
>   File 
> "/opt/flinks/1.13.1/flink-1.13.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
>   File 
> "/opt/flinks/1.13.1/flink-1.13.1/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>  line 146, in deco
>   File 
> "/opt/flinks/1.13.1/flink-1.13.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o108.print.
> : java.lang.RuntimeException: Failed to fetch next result
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> at 
> org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
> at 
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Failed to fetch job execution result
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> ... 15 more
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
> ... 17 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at 
> 

回复:flink k8s部署使用s3做HA问题

2021-07-27 Thread johnjlong
Caused by: java.lang.ClassNotFoundException: 
com.amazonaws.services.s3.model.AmazonS3Exception


| |
johnjlong
|
|
johnjl...@163.com
|
签名由网易邮箱大师定制
在2021年7月27日 15:18,maker_d...@foxmail.com 写道:
各位开发者:
大家好!

我在使用flink native Kubernetes方式部署,使用minio做文件系统,配置如下:
state.backend: filesystem
fs.allowed-fallback-filesystems: s3
s3.endpoint: http://172.16.14.40:9000
s3.path-style: true
s3.access-key: admin
s3.secret-key: admin123
containerized.master.env.ENABLE_BUILT_IN_PLUGINS: flink-s3-fs-presto-1.12.4.jar
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: 
flink-s3-fs-presto-1.12.4.jar
minio使用正常。

随后根据官方文档设置了HA,配置如下:
kubernetes.cluster-id: flink-sessoion
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3:///flink/recovery

flink-session正常部署,但在提交作业时报错如下:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 8 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
... 16 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload 
job files.
at 
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)
at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)
at 

Re: as-variable configuration for state ac

2021-07-27 Thread Mason Chen
+ user mailing list

I don't have permission to assign to you, but here is the JIRA ticket:
https://issues.apache.org/jira/browse/FLINK-23519

Thanks!

On Tue, Jul 27, 2021 at 4:40 AM Yun Tang  wrote:

> Hi Mason,
>
> I think this request is reasonable and you could create a JIRA ticket so
> that we could resolve it later.
>
>
> Best,
> Yun Tang
> --
> *From:* Mason Chen 
> *Sent:* Tuesday, July 27, 2021 15:15
> *To:* Yun Tang 
> *Cc:* Mason Chen ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: as-variable configuration for state ac
>
> Yup, your understand is correct—that was the analogy I was trying to make!
>
> On Jul 26, 2021, at 7:57 PM, Yun Tang  wrote:
>
> Hi Mason,
>
> In rocksDB, one state is corresponding to a column family and we could
> aggregate all RocksDB native metrics per column family. If my understanding
> is right, are you hoping that all state latency metrics for a particular
> state could be aggregated per state level?
>
>
> Best
> Yun Tang
> --
> *From:* Mason Chen 
> *Sent:* Tuesday, July 27, 2021 4:24
> *To:* user@flink.apache.org 
> *Subject:* as-variable configuration for state ac
>
> We have been using the state backend latency tracking metrics from Flink
> 1.13. To make metrics aggregation easier, could there be a config to expose
> something like `state.backend.rocksdb.metrics.column-family-as-variable`
> that rocksdb provides to do aggregation across column families.
>
> In this case, it would be the various components of state.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable
>
>
>


Re: Unable to read state Witten by Beam application with Flink runner using Flink's State Processor API

2021-07-27 Thread David Morávek
This thread is duplicated on the dev mailing list [1].

[1]
https://lists.apache.org/x/thread.html/r87fa8153137a4968f6a4f6b47c97c4d892664d864c51a79574821165@%3Cdev.flink.apache.org%3E

Best,
D.

On Tue, Jul 27, 2021 at 5:38 PM Kathula, Sandeep 
wrote:

> Hi,
>
>  We have a simple Beam application like a work count running with
> Flink runner *(Beam 2.26 and Flink 1.9)*. We are using Beam’s value
> state. I am trying to read the state from savepoint using  Flink's State
> Processor API but getting a NullPointerException. Converted the whole code
> into Pure Flink application, created a savepoint and tried to read the
> state where we are able to read the state successfully.
>
>
>
> Exception Stack trace:
>
>
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631)
>
> at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222)
>
> at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
>
> at
> org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>
> at
> com.intuit.spp.example.StateReader.main(StateReader.java:34)
>
> Caused by: java.io.IOException: Failed to restore state backend
>
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)
>
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
>
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
>
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>
> at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)
>
> ... 6 more
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> f25cb861abbd020d3595d47c5d53d3fd_f25cb861abbd020d3595d47c5d53d3fd_(1/1)
> from any of the 1 provided restore options.
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>
> ... 7 more
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
>
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
> ... 9 more
>
> Caused by: java.lang.NullPointerException
>
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
>
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
>
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
>
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
>
> ... 13 more
>
>
>
>
>
>
>
>
>
>
>
> *When I 

Unable to read state Witten by Beam application with Flink runner using Flink's State Processor API

2021-07-27 Thread Kathula, Sandeep
Hi,
 We have a simple Beam application like a work count running with Flink 
runner (Beam 2.26 and Flink 1.9). We are using Beam’s value state. I am trying 
to read the state from savepoint using  Flink's State Processor API but getting 
a NullPointerException. Converted the whole code into Pure Flink application, 
created a savepoint and tried to read the state where we are able to read the 
state successfully.

Exception Stack trace:

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631)
at 
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222)
at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
at com.intuit.spp.example.StateReader.main(StateReader.java:34)
Caused by: java.io.IOException: Failed to restore state backend
at 
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)
at 
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
at 
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Exception while creating 
StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)
... 6 more
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for 
f25cb861abbd020d3595d47c5d53d3fd_f25cb861abbd020d3595d47c5d53d3fd_(1/1) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 7 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 9 more
Caused by: java.lang.NullPointerException
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 13 more





When I debugged, it is showing that it is throwing 
NullPointerException at 
https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280
metaInfoSnapshot is null. I then checked what all kvStateId values we are 
getting at 
https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277.

I also did stateMetaInfoSnapshot.getName() 

Re: recover from svaepoint

2021-07-27 Thread Schwalbe Matthias
Hi Till,

Having been unaware of this mail thread I've created a Jira Bug 
https://issues.apache.org/jira/browse/FLINK-23509 which proposes also a simple 
solution.

Regards

Matthias

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


flink 1.13.1 ????hive??????????hive sql????????

2021-07-27 Thread Asahi Lee
Hi??
??flink 1.13.1??hive sql??
CREATE CATALOG `tempo_df_hive_default_catalog` WITH(
  'type' = 'hive',
  'default-database' = 'default'
);
USE CATALOG tempo_df_hive_default_catalog;
CREATE TABLE IF NOT EXISTS `default`.`tempo_blackhole_table` (
 f0 INT
);
use cosldatacenter;
INSERT INTO `dw_riginfoparam` ( `large_equip_id`, `equip_code`, `equip_name`, 
`enqueue_date`, `shi_total_len`, `shi_type_width`, `shi_type_depth`, `moonpool` 
) SELECT
mle.`large_equip_id` ,
mle.`equip_code` ,
mle.`equip_name` ,
mle.`enqueue_date` ,
mle.`shi_total_len` ,
mle.`shi_type_width` ,
mle.`shi_type_depth`,
CASE
WHEN mipd.`param_cn` = '' THEN
mipv.`param_value`
END AS `Moonpool`
from `ods_emp_maindata_iadc_paramvalue` mipv
INNER JOIN `ods_emp_maindata_iadc_paramdef` mipd ON mipv.`param_id` = 
mipd.`param_id`
 inner JOIN `ods_emp_md_large_equip` mle ON mipv.`SUBJECT_ID`= 
mle.`LARGE_EQUIP_ID`;
INSERT INTO `default`.`tempo_blackhole_table` SELECT 1 ;



??
Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 2:195 
Invalid table alias or column reference 'u': (possible column names are: 
mipv.paramvalue_id, mipv.platform_id, mipv.equipment_id, mipv.param_id, 
mipv.param_value, mipv.remark, mipv.create_time, mipv.creator, 
mipv.update_time, mipv.update_person, mipv.record_flag, mipv.subject_id, 
mipv.output_unit, mipv.show_seq, mipd.param_id, mipd.iadc_id, mipd.param_code, 
mipd.param_en, mipd.param_cn, mipd.output_standard, mipd.output_unit, 
mipd.param_type, mipd.param_value, mipd.remark, mipd.create_time, mipd.creator, 
mipd.update_time, mipd.update_person, mipd.record_flag, mle.large_equip_id, 
mle.equip_name, mle.equip_type, mle.equip_function, mle.equip_board, 
mle.ship_yard, mle.manufacturer_date, mle.enqueue_date, mle.dockrepair_date, 
mle.scrap_date, mle.enqueue_mode, mle.work_for_org, mle.work_in_org, 
mle.old_age, mle.create_time, mle.creator, mle.update_time, mle.update_person, 
mle.record_flag, mle.data_timestamp, mle.work_unit_id, mle.work_status, 
mle.work_location, mle.work_area, mle.equip_code, mle.shi_main_power, 
mle.shi_total_len, mle.shi_type_width, mle.shi_type_depth, 
mle.shi_design_draft, mle.shi_total_tonnage, mle.shi_load_tonnage, mle.remark, 
mle.unit_classification1, mle.unit_classification2)
at 
org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genAllExprNodeDesc(HiveParserSemanticAnalyzer.java:2467)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genExprNodeDesc(HiveParserSemanticAnalyzer.java:2421)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genSelectLogicalPlan(HiveParserCalcitePlanner.java:2314)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:2772)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.logicalPlan(HiveParserCalcitePlanner.java:285)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:273)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.analyzeSql(HiveParser.java:326)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:274)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:51)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]

Re: as-variable configuration for state ac

2021-07-27 Thread Yun Tang
Hi Mason,

I think this request is reasonable and you could create a JIRA ticket so that 
we could resolve it later.


Best,
Yun Tang

From: Mason Chen 
Sent: Tuesday, July 27, 2021 15:15
To: Yun Tang 
Cc: Mason Chen ; user@flink.apache.org 

Subject: Re: as-variable configuration for state ac

Yup, your understand is correct—that was the analogy I was trying to make!

On Jul 26, 2021, at 7:57 PM, Yun Tang 
mailto:myas...@live.com>> wrote:

Hi Mason,

In rocksDB, one state is corresponding to a column family and we could 
aggregate all RocksDB native metrics per column family. If my understanding is 
right, are you hoping that all state latency metrics for a particular state 
could be aggregated per state level?


Best
Yun Tang

From: Mason Chen mailto:mas.chen6...@gmail.com>>
Sent: Tuesday, July 27, 2021 4:24
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: as-variable configuration for state ac

We have been using the state backend latency tracking metrics from Flink 1.13. 
To make metrics aggregation easier, could there be a config to expose something 
like `state.backend.rocksdb.metrics.column-family-as-variable` that rocksdb 
provides to do aggregation across column families.

In this case, it would be the various components of state.

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable



退订

2021-07-27 Thread hihl
退订

Re: foreach exec sql

2021-07-27 Thread Caizhi Weng
Hi!

Try this:
sql.zipWithIndex.foreach { case (sql, idx) =>
  val result = tableEnv.executeSql(sql)
  if (idx == 7) {
result.print()
  }
}

igyu  于2021年7月27日周二 下午4:38写道:

>  tableEnv.executeSql(sql(0))
>  tableEnv.executeSql(sql(1))
>  tableEnv.executeSql(sql(2))
>  tableEnv.executeSql(sql(3))
>  tableEnv.executeSql(sql(4))
>  tableEnv.executeSql(sql(5))
>  tableEnv.executeSql(sql(6))
>  tableEnv.executeSql(sql(7)).print()
>
> that is OK
>
> but I hope
>
>   sql.foreach(s=>{
> tableEnv.executeSql(s)
>   })
>
> --
> igyu
>


foreach exec sql

2021-07-27 Thread igyu
 tableEnv.executeSql(sql(0))
 tableEnv.executeSql(sql(1))
 tableEnv.executeSql(sql(2))
 tableEnv.executeSql(sql(3))
 tableEnv.executeSql(sql(4))
 tableEnv.executeSql(sql(5))
 tableEnv.executeSql(sql(6))
 tableEnv.executeSql(sql(7)).print()

that is OK

but I hope

  sql.foreach(s=>{
tableEnv.executeSql(s)
  })



igyu


Need help when using kubernetes.jobmanager.annotations

2021-07-27 Thread Manong Karl
Hi Team, I have set the "kubernetes.jobmanager.annotations". But I can't
find these in the k8s deployment. As these can be found in the job manager
pod.
Is it by design or just be missed?


ImportError: No module named pyflink

2021-07-27 Thread Curt Buechter
This feels like the simplest error, but I'm struggling to get past it. I
can run pyflink jobs locally just fine by submitting them either via
`python sample.py` or `flink run --target local -py sample.py`. But, when I
try to execute on a remote worker node, it always fails with this error:

table_environment.execute_sql(query).print()
  File
"/opt/flinks/1.13.1/flink-1.13.1/opt/python/pyflink.zip/pyflink/table/table_result.py",
line 219, in print
  File
"/opt/flinks/1.13.1/flink-1.13.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/opt/flinks/1.13.1/flink-1.13.1/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 146, in deco
  File
"/opt/flinks/1.13.1/flink-1.13.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o108.print.
: java.lang.RuntimeException: Failed to fetch next result
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
at
org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
at
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to fetch job execution result
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
... 15 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
... 17 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at

flink k8s部署使用s3做HA问题

2021-07-27 Thread maker_d...@foxmail.com
各位开发者:
大家好!

我在使用flink native Kubernetes方式部署,使用minio做文件系统,配置如下:
state.backend: filesystem
fs.allowed-fallback-filesystems: s3
s3.endpoint: http://172.16.14.40:9000
s3.path-style: true
s3.access-key: admin
s3.secret-key: admin123
containerized.master.env.ENABLE_BUILT_IN_PLUGINS: 
flink-s3-fs-presto-1.12.4.jar
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: 
flink-s3-fs-presto-1.12.4.jar
minio使用正常。

随后根据官方文档设置了HA,配置如下:
kubernetes.cluster-id: flink-sessoion
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3:///flink/recovery

flink-session正常部署,但在提交作业时报错如下:
org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 8 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
... 16 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:364)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Could not upload 
job files.
at 
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:201)
at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1119)
at 
java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1084)
at 

Re: as-variable configuration for state ac

2021-07-27 Thread Mason Chen
Yup, your understand is correct—that was the analogy I was trying to make!

> On Jul 26, 2021, at 7:57 PM, Yun Tang  wrote:
> 
> Hi Mason,
> 
> In rocksDB, one state is corresponding to a column family and we could 
> aggregate all RocksDB native metrics per column family. If my understanding 
> is right, are you hoping that all state latency metrics for a particular 
> state could be aggregated per state level? 
> 
> 
> Best
> Yun Tang
> From: Mason Chen 
> Sent: Tuesday, July 27, 2021 4:24
> To: user@flink.apache.org 
> Subject: as-variable configuration for state ac
>  
> We have been using the state backend latency tracking metrics from Flink 
> 1.13. To make metrics aggregation easier, could there be a config to expose 
> something like `state.backend.rocksdb.metrics.column-family-as-variable` that 
> rocksdb provides to do aggregation across column families.
> 
> In this case, it would be the various components of state.
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable
>  
>