Re: 资源均衡问题

2020-09-21 Thread Xintong Song
你的这个需求我是非常同意的。事实上,不管是在社区的邮件列表还是在我们的生产实践中,也都不是第一次遇到这样的需求了。


这个需求实现起来还是有一定难度的,这主要是由 Flink 目前的调度模型决定的。Flink 资源调度拆解为两层,ResourceManager
决定集群资源如何分配给作业,JobMaster 决定作业资源如何被 Task 使用。这就造成了 RM 不知道分配出去的 slot
到底是哪个算子在用,JM 不知道拿到的 slot 来自哪台机器。


针对你说的这个问题,我们也进行过一些讨论,目前来看还是需要先对 Flink
当前的调度模型做一些调整,前置任务比较多,短时间内可能还难以解决。相关前置任务有些已经在进行中或已完成(如
FLIP-119/FLIP-138/FLINK-18689 等),有些则还在设计讨论阶段。


Thank you~

Xintong Song



On Tue, Sep 22, 2020 at 1:17 PM 赵一旦  wrote:

> 本身我的任务复杂点,CPU IDLE 正常在 80 - 90 %,每五分钟窗口闭合时刻CPU IDLE会降到 20-30
> %。如果运气不好,任务再不均衡点,部分机器会存在短时间卡死状态,长久下去很容器导致flink进程失败。
>
> 赵一旦  于2020年9月22日周二 下午1:09写道:
>
> > 我觉得这个非常必要需要解决,后续可以提升下。
> >
> >
> 我相信假设生产中flink集群如果是5台机器,不管slot是多少,对于任务来说,复杂算子的并行度自然会设置为5的倍数,非5倍数的一般可能是不可并行的算子(比如一些数据量极少的配置流等)。这些非5倍数的算子影响到5倍数算子的均衡分配会很麻烦。
> > 目前使用1.10的cluster.evenly-spread-out-slots:
> > true可以保证如果算子都是5的倍数,所有算子都会均衡到5机器。但是不可避免会有部分非5倍数的算子,进而影响到复杂算子的均衡。
> >
> > Xintong Song  于2020年9月22日周二 上午10:21写道:
> >
> >> Flink 现阶段并不支持你说的这种针对特定算子的负载均衡。如果采用默认的 slot sharing 策略,是可以通过调整每台机器上的 tm 和
> >> slot 个数来控制 cluster 总共只有 15 个 slot,这样可以保证 C 在 5 台机器上是均衡的。但是 B
> >> 目前是没有比较好的方法保证的。
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Mon, Sep 21, 2020 at 5:45 PM 赵一旦  wrote:
> >>
> >> >
> >> >
> >>
> 当前的flink资源分配问题。我一个任务3个算子,算子A并行度为1,算子B并行度为10,算子C并行度15。集群5台机器。我怎么保证算子B和C在5台机器完全均衡呢?我不介意算子A在哪,但算子B和C必须完全均衡到5个机器。
> >> >
> >> >
> >> > 算子更多了就更复杂了,部分算子的并行度不是5的倍数,会导致越来越多的类似B和C这些算子在5台机器的不均衡。
> >> >
> >>
> >
>


Re: Zookeeper connection loss causing checkpoint corruption

2020-09-21 Thread Arpith P
Hi Peter,

I have recently had a similar issue where I could not load from the
checkpoints path. I found that whenever a corrupt checkpoint happens the
"_metadata" file will not be persisted, and I've a  program which tracks if
checkpoint location based on this strategy and updates DB with location
based on timestamp. To restore the latest checkpoint I'm querying DB
ordered by latest timestamp. Let me know if this is helpful, I can share
code for this if needed.

Arpith

On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann 
wrote:

> I recently ran into an issue with our Flink cluster: A zookeeper service
> deploy caused a temporary connection loss and triggered a new jobmanager
> leader election. Leadership election was successful and our Flink job
> restarted from the last checkpoint.
>
> This checkpoint appears to have been taken while we los connection to
> Zookeeper and ended up in a corrupted state so the Flink job kept failing.
> Here’s the exception stack trace for that:
>
> 2020-09-18 01:10:57
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
>
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>
>  at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from any of
> the 1 provided restore options.
>
>  at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>
>  ... 9 more
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
>
>  at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)
>
>  at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>
>  at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
>  at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
>  ... 11 more
>
> Caused by: java.io.IOException: Error while opening RocksDB instance.
>
>  at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)
>
>  at
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
>
>  at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)
>
>  at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
>
>  at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
>
>  at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
>
>  at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276)
>
>  ... 15 more
>
> Caused by: org.rocksdb.RocksDBException: Sst file size mismatch:
> 

Re: Flink-1.11 Table API &符号 语法问题

2020-09-21 Thread 赵一旦
首先确保你是1.11,因为1.10没有$函数。其次确保你import了$函数,在org.apache.flink.table.api.
Expressions类中。

Leonard Xu  于2020年9月22日周二 上午9:52写道:

> Hi,
>
> 可以的,我看了下,你可以把你异常和可以复现的代码贴下吗?
>
> 祝好,
> Leonard
>
>
> > 在 2020年9月22日,09:44,nashcen <2415370...@qq.com> 写道:
> >
> > 语法提示错误,就没有运行。在你们的IDEA里面,1.11 Table API 的& 写法,是否显示正常?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re: 资源均衡问题

2020-09-21 Thread 赵一旦
本身我的任务复杂点,CPU IDLE 正常在 80 - 90 %,每五分钟窗口闭合时刻CPU IDLE会降到 20-30
%。如果运气不好,任务再不均衡点,部分机器会存在短时间卡死状态,长久下去很容器导致flink进程失败。

赵一旦  于2020年9月22日周二 下午1:09写道:

> 我觉得这个非常必要需要解决,后续可以提升下。
>
> 我相信假设生产中flink集群如果是5台机器,不管slot是多少,对于任务来说,复杂算子的并行度自然会设置为5的倍数,非5倍数的一般可能是不可并行的算子(比如一些数据量极少的配置流等)。这些非5倍数的算子影响到5倍数算子的均衡分配会很麻烦。
> 目前使用1.10的cluster.evenly-spread-out-slots:
> true可以保证如果算子都是5的倍数,所有算子都会均衡到5机器。但是不可避免会有部分非5倍数的算子,进而影响到复杂算子的均衡。
>
> Xintong Song  于2020年9月22日周二 上午10:21写道:
>
>> Flink 现阶段并不支持你说的这种针对特定算子的负载均衡。如果采用默认的 slot sharing 策略,是可以通过调整每台机器上的 tm 和
>> slot 个数来控制 cluster 总共只有 15 个 slot,这样可以保证 C 在 5 台机器上是均衡的。但是 B
>> 目前是没有比较好的方法保证的。
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Sep 21, 2020 at 5:45 PM 赵一旦  wrote:
>>
>> >
>> >
>> 当前的flink资源分配问题。我一个任务3个算子,算子A并行度为1,算子B并行度为10,算子C并行度15。集群5台机器。我怎么保证算子B和C在5台机器完全均衡呢?我不介意算子A在哪,但算子B和C必须完全均衡到5个机器。
>> >
>> >
>> > 算子更多了就更复杂了,部分算子的并行度不是5的倍数,会导致越来越多的类似B和C这些算子在5台机器的不均衡。
>> >
>>
>


Re: 资源均衡问题

2020-09-21 Thread 赵一旦
我觉得这个非常必要需要解决,后续可以提升下。
我相信假设生产中flink集群如果是5台机器,不管slot是多少,对于任务来说,复杂算子的并行度自然会设置为5的倍数,非5倍数的一般可能是不可并行的算子(比如一些数据量极少的配置流等)。这些非5倍数的算子影响到5倍数算子的均衡分配会很麻烦。
目前使用1.10的cluster.evenly-spread-out-slots:
true可以保证如果算子都是5的倍数,所有算子都会均衡到5机器。但是不可避免会有部分非5倍数的算子,进而影响到复杂算子的均衡。

Xintong Song  于2020年9月22日周二 上午10:21写道:

> Flink 现阶段并不支持你说的这种针对特定算子的负载均衡。如果采用默认的 slot sharing 策略,是可以通过调整每台机器上的 tm 和
> slot 个数来控制 cluster 总共只有 15 个 slot,这样可以保证 C 在 5 台机器上是均衡的。但是 B
> 目前是没有比较好的方法保证的。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Sep 21, 2020 at 5:45 PM 赵一旦  wrote:
>
> >
> >
> 当前的flink资源分配问题。我一个任务3个算子,算子A并行度为1,算子B并行度为10,算子C并行度15。集群5台机器。我怎么保证算子B和C在5台机器完全均衡呢?我不介意算子A在哪,但算子B和C必须完全均衡到5个机器。
> >
> >
> > 算子更多了就更复杂了,部分算子的并行度不是5的倍数,会导致越来越多的类似B和C这些算子在5台机器的不均衡。
> >
>


Re: metaspace out-of-memory & error while retrieving the leader gateway

2020-09-21 Thread Xintong Song
Thanks for the input, Brain.

This looks like what we are looking for. The issue is fixed in 1.10.3,
which also matches this problem occurred in 1.10.2.

Maybe Claude can further confirm it.

Thank you~

Xintong Song



On Tue, Sep 22, 2020 at 10:57 AM Zhou, Brian  wrote:

> Hi Xintong and Claude,
>
>
>
> In our internal tests, we also encounter these two issues and we spent
> much time debugging them. There are two points I need to confirm if we
> share the same problem.
>
>1. Your job is using default restart strategy, which is per-second
>restart.
>2. Your CPU resource on jobmanager might be small
>
>
>
> Here is some findings I want to share.
>
> ## Metaspace OOM
>
> Due to https://issues.apache.org/jira/browse/FLINK-15467 , when we have
> some job restarts, there will be some threads from the sourceFunction
> hanging, cause the class loader cannot close. New restarts would load new
> classes, then expand the metaspace, and finally OOM happens.
>
>
>
> ## Leader retrieving
>
> Constant restarts may be heavy for jobmanager, if JM CPU resources are not
> enough, the thread for leader retrieving may be stuck.
>
>
>
> Best Regards,
>
> Brian
>
>
>
> *From:* Xintong Song 
> *Sent:* Tuesday, September 22, 2020 10:16
> *To:* Claude M; user
> *Subject:* Re: metaspace out-of-memory & error while retrieving the
> leader gateway
>
>
>
> ## Metaspace OOM
>
> As the error message already suggested, the metaspace OOM you encountered
> is likely caused by a class loading leak. I think you are on the right
> direction trying to look into the heap dump and find out where the leak
> comes from. IIUC, after removing the ZK folder, you are now able to run
> Flink with the heap dump options.
>
>
>
> The problem does not occur in previous versions because Flink starts to
> set the metaspace limit since the 1.10 release. The class loading leak
> might have already been there, but is never discovered. This could lead to
> unpredictable stability and performance issues. That's why Flink updated
> its memory model and explicitly set the metaspace limit in the 1.10 release.
>
>
>
> ## Leader retrieving
>
> The command looks good to me. If this problem happens only once, it could
> be irrelevant to adding the options. If that does not block you from
> getting the heap dump, we can look into it later.
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Mon, Sep 21, 2020 at 9:37 PM Claude M  wrote:
>
> Hi Xintong,
>
>
>
> Thanks for your reply.  Here is the command output w/ the java.opts:
>
>
>
> /usr/local/openjdk-8/bin/java -Xms768m -Xmx768m -XX:+UseG1GC
> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/flink/log
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> -classpath
> /opt/flink/lib/flink-metrics-datadog-statsd-2.11-0.1.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:/opt/flink/lib/flink-table-blink_2.11-1.10.2.jar:/opt/flink/lib/flink-table_2.11-1.10.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.10.2.jar::/etc/hadoop/conf:
> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
> --configDir /opt/flink/conf --executionMode cluster
>
>
>
> To answer your questions:
>
>- Correct, in order for the pod to start up, I have to remove the
>flink app folder from zookeeper.  I only have to delete once after applying
>the java.opts arguments.  It doesn't make sense though that I should have
>to do this just from adding a parameter.
>- I'm using the standalone deployment.
>- I'm using job cluster mode.
>
> A higher priority issue I'm trying to solve is this metaspace out of
> memory that is occuring in task managers.  This was not happening before I
> upgraded to Flink 1.10.2.  Even after increasing the memory, I'm still
> encountering the problem.  That is when I added the java.opts argument to
> see if I can get more information about the problem.  That is when I ran
> across the second issue w/ the job manager pod not starting up.
>
>
>
>
>
> Thanks
>
>
>
>
>
> On Sun, Sep 20, 2020 at 10:23 PM Xintong Song 
> wrote:
>
> Hi Claude,
>
>
>
> IIUC, in your case the leader retrieving problem is triggered by adding
> the `java.opts`? Then could you try to find and post the complete command
> for launching the JVM process? You can try log into the pod and execute `ps
> -ef | grep `.
>
>
>
> A few more questions:
>
> - What do you mean by "resolve this"? Does the jobmanager pod get stuck
> there, and recover when you remove the folder from ZK? Do you have to do
> the removal for everytime submitting the Kubernetes?
>
> The only way I can resolve this is to delete the folder from zookeeper
> which I shouldn't have to do.
>
> - Which Flink's kubernetes deployment are you using? The standalone or
> native Kubernetes?
>
> - Which cluster mode are you using? Job cluster, session cluster, or the
> application mode?
>
>

Re: [SQL] parse table name from sql statement

2020-09-21 Thread silence
我写过一个类似的可以参考一下

private static List lookupSelectTable(SqlNode sqlNode) {
List list = new ArrayList<>();
if (sqlNode instanceof SqlSelect) {
SqlNode from = ((SqlSelect) sqlNode).getFrom();
list.addAll(lookupSelectTable(from));
} else if (sqlNode instanceof SqlJoin) {
SqlJoin sqlJoin = (SqlJoin) sqlNode;
list.addAll(lookupSelectTable(sqlJoin.getLeft()));
list.addAll(lookupSelectTable(sqlJoin.getRight()));
} else if (sqlNode instanceof SqlBasicCall) {
SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
SqlOperator operator = sqlBasicCall.getOperator();
if (SqlKind.AS.equals(operator.getKind())) {
   
list.addAll(lookupSelectTable(sqlBasicCall.getOperands()[0]));
} else if (SqlKind.UNION.equals(operator.getKind())) {
for (SqlNode operandSqlNode : sqlBasicCall.getOperands()) {
list.addAll(lookupSelectTable(operandSqlNode));
}
} else {
throw new RuntimeException("operator " + operator.getKind()
+ " not support");
}
} else if (sqlNode instanceof SqlIdentifier) {
list.add(((SqlIdentifier) sqlNode).getSimple());
} else {
throw new RuntimeException("operator " + sqlNode.getClass() + "
not support");
}
return list;
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [SQL] parse table name from sql statement

2020-09-21 Thread silence
写过一个类似的可以参考一下

private static List lookupSelectTable(SqlNode sqlNode) {
List list = new ArrayList<>();
if (sqlNode instanceof SqlSelect) {
SqlNode from = ((SqlSelect) sqlNode).getFrom();
list.addAll(lookupSelectTable(from));
} else if (sqlNode instanceof SqlJoin) {
SqlJoin sqlJoin = (SqlJoin) sqlNode;
list.addAll(lookupSelectTable(sqlJoin.getLeft()));
list.addAll(lookupSelectTable(sqlJoin.getRight()));
} else if (sqlNode instanceof SqlBasicCall) {
SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
SqlOperator operator = sqlBasicCall.getOperator();
if (SqlKind.AS.equals(operator.getKind())) {
   
list.addAll(lookupSelectTable(sqlBasicCall.getOperands()[0]));
} else if (SqlKind.UNION.equals(operator.getKind())) {
for (SqlNode operandSqlNode : sqlBasicCall.getOperands()) {
list.addAll(lookupSelectTable(operandSqlNode));
}
} else {
throw new RuntimeException("operator " + operator.getKind()
+ " not support");
}
} else if (sqlNode instanceof SqlIdentifier) {
list.add(((SqlIdentifier) sqlNode).getSimple());
} else {
throw new RuntimeException("operator " + sqlNode.getClass() + "
not support");
}
return list;
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


RE: metaspace out-of-memory & error while retrieving the leader gateway

2020-09-21 Thread Zhou, Brian
Hi Xintong and Claude,

In our internal tests, we also encounter these two issues and we spent much 
time debugging them. There are two points I need to confirm if we share the 
same problem.

  1.  Your job is using default restart strategy, which is per-second restart.
  2.  Your CPU resource on jobmanager might be small

Here is some findings I want to share.
## Metaspace OOM
Due to https://issues.apache.org/jira/browse/FLINK-15467 , when we have some 
job restarts, there will be some threads from the sourceFunction hanging, cause 
the class loader cannot close. New restarts would load new classes, then expand 
the metaspace, and finally OOM happens.

## Leader retrieving
Constant restarts may be heavy for jobmanager, if JM CPU resources are not 
enough, the thread for leader retrieving may be stuck.

Best Regards,
Brian

From: Xintong Song 
Sent: Tuesday, September 22, 2020 10:16
To: Claude M; user
Subject: Re: metaspace out-of-memory & error while retrieving the leader gateway

## Metaspace OOM
As the error message already suggested, the metaspace OOM you encountered is 
likely caused by a class loading leak. I think you are on the right direction 
trying to look into the heap dump and find out where the leak comes from. IIUC, 
after removing the ZK folder, you are now able to run Flink with the heap dump 
options.

The problem does not occur in previous versions because Flink starts to set the 
metaspace limit since the 1.10 release. The class loading leak might have 
already been there, but is never discovered. This could lead to unpredictable 
stability and performance issues. That's why Flink updated its memory model and 
explicitly set the metaspace limit in the 1.10 release.

## Leader retrieving
The command looks good to me. If this problem happens only once, it could be 
irrelevant to adding the options. If that does not block you from getting the 
heap dump, we can look into it later.


Thank you~

Xintong Song


On Mon, Sep 21, 2020 at 9:37 PM Claude M 
mailto:claudemur...@gmail.com>> wrote:
Hi Xintong,

Thanks for your reply.  Here is the command output w/ the java.opts:

/usr/local/openjdk-8/bin/java -Xms768m -Xmx768m -XX:+UseG1GC 
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/flink/log 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath 
/opt/flink/lib/flink-metrics-datadog-statsd-2.11-0.1.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:/opt/flink/lib/flink-table-blink_2.11-1.10.2.jar:/opt/flink/lib/flink-table_2.11-1.10.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.10.2.jar::/etc/hadoop/conf:
 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint 
--configDir /opt/flink/conf --executionMode cluster

To answer your questions:

  *   Correct, in order for the pod to start up, I have to remove the flink app 
folder from zookeeper.  I only have to delete once after applying the java.opts 
arguments.  It doesn't make sense though that I should have to do this just 
from adding a parameter.
  *   I'm using the standalone deployment.
  *   I'm using job cluster mode.
A higher priority issue I'm trying to solve is this metaspace out of memory 
that is occuring in task managers.  This was not happening before I upgraded to 
Flink 1.10.2.  Even after increasing the memory, I'm still encountering the 
problem.  That is when I added the java.opts argument to see if I can get more 
information about the problem.  That is when I ran across the second issue w/ 
the job manager pod not starting up.


Thanks


On Sun, Sep 20, 2020 at 10:23 PM Xintong Song 
mailto:tonysong...@gmail.com>> wrote:
Hi Claude,

IIUC, in your case the leader retrieving problem is triggered by adding the 
`java.opts`? Then could you try to find and post the complete command for 
launching the JVM process? You can try log into the pod and execute `ps -ef | 
grep `.

A few more questions:
- What do you mean by "resolve this"? Does the jobmanager pod get stuck there, 
and recover when you remove the folder from ZK? Do you have to do the removal 
for everytime submitting the Kubernetes?
The only way I can resolve this is to delete the folder from zookeeper which I 
shouldn't have to do.
- Which Flink's kubernetes deployment are you using? The standalone or native 
Kubernetes?
- Which cluster mode are you using? Job cluster, session cluster, or the 
application mode?


Thank you~

Xintong Song


On Sat, Sep 19, 2020 at 1:22 AM Claude M 
mailto:claudemur...@gmail.com>> wrote:
Hello,

I upgraded from Flink 1.7.2 to 1.10.2.  One of the jobs running on the task 
managers is periodically crashing w/ the following error:

java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has 
occurred. This can mean two things: either the job requires a larger size of 
JVM metaspace to load classes or there is a class loading leak. In 

Re: Support for gRPC in Flink StateFun 2.x

2020-09-21 Thread Dalmo Cirne
Thank you for the quick reply, Igal.

Our use case is the following: A stream of data from Kafka is fed into Flink 
where data transformations take place. After that we send that transformed data 
to an inference engine to score the relevance of each record. (Rough 
simplification.)

Doing that using HTTP endpoints is possible, and it is the solution we have in 
place today, however, for each request to that endpoint, we need to incur the 
cost of establishing the connection, etc., thus increasing the latency of the 
system.

We do process data in batches to mitigate the latency, but it is not the same 
as having a bi-directional stream, as it would be possible using gRPC. 
Furthermore, we already use gRPC in other parts of our system.

We also want to be able to scale those endpoints up and down, as demand for the 
service fluctuates depending on the hour and day. Combining StateFun and 
Kubernetes would allow for that elasticity of the service, while keeping state 
of the execution, since inferences are not always just one endpoint, but a 
collection of them where the output of one becomes the input of the next, 
culminating with the predicted score(s).

We are evaluating StateFun because Flink is already part of the infrastructure. 
With that said, gRPC is also part of our requirements, thus motivation for the 
question.

I’d love to hear more about plans to implement support for gRPC and perhaps 
become an early adopter.

I hope this helps with understanding of the use case. Happy to talk further and 
answer more questions.

Best,

Dalmo



From: Igal Shilman 
Date: Saturday, September 19, 2020 at 01:41
To: Dalmo Cirne 
Cc: "user@flink.apache.org" 
Subject: Re: Support for gRPC in Flink StateFun 2.x

Hi,

Your observation is correct, currently the only way to invoke a remote function 
is trough an HTTP POST request to a service that exposes a StateFun endpoint.

The endpoint must implement the client side of a the “RequestReply” protocol as 
defined by StateFun (basically an invocation contains the state and message, 
and a response contains a description of the side effects).

While gRPC can be easily added a as a replacement for the transport layer, the 
client side (the remote function) would still have to implement the 
RequestReply protocol.

To truly utilize gRPC we would want to introduce a new type of protocol, that 
can exploit the low latency bi-directional streams to and from the function.

While for the later it is a bit difficult to commit for a specific date the 
former can be easily implemented in the next StateFun release.

Would you be able to share with us a little bit more about your original 
motivation to ask this question :-)
This would help us as we gather more and more use cases.

For example: target language, environment, how gRPC services are being 
discovered.

Thanks,
Igal

On Thursday, September 17, 2020, Dalmo Cirne 
mailto:dalmo.ci...@workday.com>> wrote:
Hi,

In the latest Flink Forward, from April 2020, there were mentions that adding 
support to gRPC, in addition to HTTP, was in the works and would be implemented 
in the future.

Looking into the 
flink-statefun
 repository on GitHub, one can see that there is already some work done with 
gRPC, but parity with its HTTP counterpart is not there, yet.

Is there a roadmap or an estimate of when gRPC will be implemented in StateFun?

Thank you,

Dalmo











Re: 资源均衡问题

2020-09-21 Thread Xintong Song
Flink 现阶段并不支持你说的这种针对特定算子的负载均衡。如果采用默认的 slot sharing 策略,是可以通过调整每台机器上的 tm 和
slot 个数来控制 cluster 总共只有 15 个 slot,这样可以保证 C 在 5 台机器上是均衡的。但是 B 目前是没有比较好的方法保证的。

Thank you~

Xintong Song



On Mon, Sep 21, 2020 at 5:45 PM 赵一旦  wrote:

>
> 当前的flink资源分配问题。我一个任务3个算子,算子A并行度为1,算子B并行度为10,算子C并行度15。集群5台机器。我怎么保证算子B和C在5台机器完全均衡呢?我不介意算子A在哪,但算子B和C必须完全均衡到5个机器。
>
>
> 算子更多了就更复杂了,部分算子的并行度不是5的倍数,会导致越来越多的类似B和C这些算子在5台机器的不均衡。
>


Re: metaspace out-of-memory & error while retrieving the leader gateway

2020-09-21 Thread Xintong Song
## Metaspace OOM
As the error message already suggested, the metaspace OOM you encountered
is likely caused by a class loading leak. I think you are on the right
direction trying to look into the heap dump and find out where the leak
comes from. IIUC, after removing the ZK folder, you are now able to run
Flink with the heap dump options.

The problem does not occur in previous versions because Flink starts to set
the metaspace limit since the 1.10 release. The class loading leak might
have already been there, but is never discovered. This could lead to
unpredictable stability and performance issues. That's why Flink updated
its memory model and explicitly set the metaspace limit in the 1.10 release.

## Leader retrieving
The command looks good to me. If this problem happens only once, it could
be irrelevant to adding the options. If that does not block you from
getting the heap dump, we can look into it later.

Thank you~

Xintong Song



On Mon, Sep 21, 2020 at 9:37 PM Claude M  wrote:

> Hi Xintong,
>
> Thanks for your reply.  Here is the command output w/ the java.opts:
>
> /usr/local/openjdk-8/bin/java -Xms768m -Xmx768m -XX:+UseG1GC
> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/flink/log
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> -classpath
> /opt/flink/lib/flink-metrics-datadog-statsd-2.11-0.1.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:/opt/flink/lib/flink-table-blink_2.11-1.10.2.jar:/opt/flink/lib/flink-table_2.11-1.10.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.10.2.jar::/etc/hadoop/conf:
> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
> --configDir /opt/flink/conf --executionMode cluster
>
> To answer your questions:
>
>- Correct, in order for the pod to start up, I have to remove the
>flink app folder from zookeeper.  I only have to delete once after applying
>the java.opts arguments.  It doesn't make sense though that I should have
>to do this just from adding a parameter.
>- I'm using the standalone deployment.
>- I'm using job cluster mode.
>
> A higher priority issue I'm trying to solve is this metaspace out of
> memory that is occuring in task managers.  This was not happening before I
> upgraded to Flink 1.10.2.  Even after increasing the memory, I'm still
> encountering the problem.  That is when I added the java.opts argument to
> see if I can get more information about the problem.  That is when I ran
> across the second issue w/ the job manager pod not starting up.
>
>
> Thanks
>
>
> On Sun, Sep 20, 2020 at 10:23 PM Xintong Song 
> wrote:
>
>> Hi Claude,
>>
>> IIUC, in your case the leader retrieving problem is triggered by adding
>> the `java.opts`? Then could you try to find and post the complete command
>> for launching the JVM process? You can try log into the pod and execute `ps
>> -ef | grep `.
>>
>> A few more questions:
>> - What do you mean by "resolve this"? Does the jobmanager pod get stuck
>> there, and recover when you remove the folder from ZK? Do you have to do
>> the removal for everytime submitting the Kubernetes?
>>
>>> The only way I can resolve this is to delete the folder from zookeeper
>>> which I shouldn't have to do.
>>>
>> - Which Flink's kubernetes deployment are you using? The standalone or
>> native Kubernetes?
>> - Which cluster mode are you using? Job cluster, session cluster, or the
>> application mode?
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Sat, Sep 19, 2020 at 1:22 AM Claude M  wrote:
>>
>>> Hello,
>>>
>>> I upgraded from Flink 1.7.2 to 1.10.2.  One of the jobs running on the
>>> task managers is periodically crashing w/ the following error:
>>>
>>> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
>>> has occurred. This can mean two things: either the job requires a larger
>>> size of JVM metaspace to load classes or there is a class loading leak. In
>>> the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
>>> should be increased. If the error persists (usually in cluster after
>>> several job (re-)submissions) then there is probably a class loading leak
>>> which has to be investigated and fixed. The task executor has to be
>>> shutdown.
>>>
>>> I found this issue regarding it:
>>> https://issues.apache.org/jira/browse/FLINK-16406
>>>
>>> I have tried increasing the taskmanager.memory.jvm-metaspace.size to
>>> 256M & 512M and still was having the problem.
>>>
>>> I then added the following to the flink.conf to try to get more
>>> information about the error:
>>> env.java.opts: -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError
>>> -XX:HeapDumpPath=/opt/flink/log
>>>
>>> When I deployed the change which is in a Kubernetes cluster, the
>>> jobmanager pod fails to start up and the following message shows
>>> repeatedly:
>>>
>>> 2020-09-18 17:03:46,255 WARN
>>>  

Re: Flink multiple task managers setup

2020-09-21 Thread Yangze Guo
Hi,

As the error message said, it could not find the flink-dist.jar in
"/cygdrive/d/Apacheflink/dist/apache-flink-1.9.3/deps/lib". Where is
your flink distribution and do you change the directory structure of
it?

Best,
Yangze Guo

On Mon, Sep 21, 2020 at 5:31 PM saksham sapra  wrote:
>
> HI,
>
>  i installed cygdrive and tried to run start-cluster.sh where zookeeper is up 
> and running and defined one job manager and one task manager,
> but getting this issue.
>
> $ start-cluster.sh start
> Starting HA cluster with 1 masters.
> -zTheFIND: Invalid switch
>  system cannot find the file specified.
> [ERROR] Flink distribution jar not found in 
> /cygdrive/d/Apacheflink/dist/apache-flink-1.9.3/deps/lib.
> File not found - 
> D:\Apacheflink\dist\apache-flink-1.9.3\deps/log/flink--standalo
> nesession-7-PLRENT-5LC73H2*
> Starting standalonesession daemon on host PLRENT-5LC73H2.
> -zThe system cannot FIND: Invalid switch
> find the file specified.
> [ERROR] Flink distribution jar not found in 
> /cygdrive/d/Apacheflink/dist/apache-flink-1.9.3/deps/lib.
> File not found - 
> D:\Apacheflink\dist\apache-flink-1.9.3\deps/log/flink--taskexec
> utor-7-PLRENT-5LC73H2*
> Starting taskexecutor daemon on host PLRENT-5LC73H2.
>>>
>>>


Re: flink集成spring

2020-09-21 Thread nashcen
springboot 本质上是 tomcat 容器,tomcat 是单机版的;flink 是分布式计算框架,flink 不跑在
tomcat里面。起初我也尝试过集成springboot 和 tomcat,当你搞清楚 flink
是什么之后,你不会再去做这些尝试。在web开发领域,总习惯万物皆可和springboot集成;在大数据开发里面,flink就是springboot一样的存在(这里我是指地位,而不是功能)。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink集成spring

2020-09-21 Thread Husky Zeng
Hi,

我正在做一项将flink-sql-gateway使用spring
cloud框架进行改造,以通过外围组件实现flink服务化、多租户资源隔离的工作,但是flink本身完全没有必要使用spring。不知道你的使用场景是什么?

Best Regards




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink-1.11 Table API &符号 语法问题

2020-09-21 Thread Leonard Xu
Hi,

可以的,我看了下,你可以把你异常和可以复现的代码贴下吗?

祝好,
Leonard


> 在 2020年9月22日,09:44,nashcen <2415370...@qq.com> 写道:
> 
> 语法提示错误,就没有运行。在你们的IDEA里面,1.11 Table API 的& 写法,是否显示正常?
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink-sql消费基于on kafka的flink table,每次select这个flink table相当于是不同group id了吗?

2020-09-21 Thread 赵一旦
这个问题不在于是否同一个group id,kafka消费分很多模式的。flink默认应该是assign topic
partition的方式,这种方式不受group的影响的。

凌天荣 <466792...@qq.com> 于2020年9月10日周四 上午10:08写道:

> CREATE TABLE ODS_PaymentOrdert (
> orderId INT,
> memberId INT,
> orderAmount DECIMAL(10, 2),
> paymentStatus SMALLINT,
> orderDate VARCHAR,
> payDate VARCHAR,
> paymentIP VARCHAR,
> orderSrc VARCHAR,
> channelType SMALLINT,
> productId SMALLINT,
> amount SMALLINT,
> unit VARCHAR,
> paymentChannel SMALLINT,
> serviceOrderType SMALLINT,
> refundAmount DECIMAL(10, 2),
> proctime as PROCTIME(),
> primary key(orderId) NOT ENFORCED
> ) WITH (
>   'connector' = 'kafka',
>   'properties.group.id' = 'flink-sql',
>   'properties.bootstrap.servers' = 'xx.xx.xx.xxx:9092',
>   'topic' = 'ODS_PaymentOrdert',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>
> 这是kafka table option,
>
>
> 这是设置checkpoint了
>
> -- 原始邮件 --
> *发件人:* "user-zh" ;
> *发送时间:* 2020年9月9日(星期三) 晚上9:46
> *收件人:* "user-zh";
> *主题:* Re: flink-sql消费基于on kafka的flink table,每次select这个flink
> table相当于是不同group id了吗?
>
> Hi
>
> 可以看下贴下你Kafka table的option 和 作业的 checkpoint配置吗?
> 可以确定的是,用的都是同一个group id,。
> 如果你没有配置 checkpoint,  Flink Kafka consumer 的 enable.auto.commit 默认设置为
> false,就不会提交对应group 的offset, 此时你两个作业只是用 group id 确定一个起始消费offset,得到的数据就是一致的。
> 你可以看看[1][2]里面对这个机制的解释。
>
> Best
> Leonard
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#%E9%85%8D%E7%BD%AE-kafka-consumer-%E5%BC%80%E5%A7%8B%E6%B6%88%E8%B4%B9%E7%9A%84%E4%BD%8D%E7%BD%AE
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#kafka-consumer-%E6%8F%90%E4%BA%A4-offset-%E7%9A%84%E8%A1%8C%E4%B8%BA%E9%85%8D%E7%BD%AE
>
> > 在 2020年9月9日,16:24,凌天荣 <466792...@qq.com> 写道:
> >
> > 现有一张表基于kafka的flink
> table,我同时起两个任务对同一张表(这张表)进行select,得到的是分别每个任务得到一样的数据,也就是说每次select这个表,每次group
> id都不同吗?
>
>
>


Re: flink集成spring

2020-09-21 Thread 赵一旦
你需要说明白为什么需要继承springboot,具体场景和意义?
目前貌似没有什么场景需要这么做。

罗显宴 <15927482...@163.com> 于2020年9月10日周四 上午10:00写道:

> flink只是一个计算框架,你引入jar就好了,当然集群需要手动启动
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年09月10日 09:09,1115098...@qq.com 写道:
> 大家好,我在将spring boot集成到flink的过程中,遇到很多问题,感觉不太兼容。看官方文档也没有集成spring
> boot的介绍,是不是flink设计的时候就没有考虑与spring boot的集成?


Re: 退订

2020-09-21 Thread 吴学文
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org 


详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1 


Best,
SeanWu

> 在 2020年9月22日,上午9:41,费文杰 <15171440...@163.com> 写道:
> 
> 退订
> 
> 
> | |
> 费文杰
> |
> |
> 邮箱:fwj15272040...@163.com
> |
> 
> 签名由 网易邮箱大师 定制



Re: Flink-1.11 Table API &符号 语法问题

2020-09-21 Thread nashcen
语法提示错误,就没有运行。在你们的IDEA里面,1.11 Table API 的& 写法,是否显示正常?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

退订

2020-09-21 Thread 费文杰
退订


| |
费文杰
|
|
邮箱:fwj15272040...@163.com
|

签名由 网易邮箱大师 定制

flink读写kafka

2020-09-21 Thread 宁吉浩
hi,all
最近用flink写入kafka,发现checkpoint失败特别多,基本50%的都失败了。
checkpoint时间间隔的30~60s 之间,没有大状态,基本就是维护offset的状态
希望能帮我看看 是什么原因导致的,能否降低一下 checkpoint的失败率

主要报错如下所示:
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.


Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of 
transactions failed, logging first encountered failure

kafka日志如下:
org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no 
longer valid. There is probably another producer with a newer epoch. 16744 
(request epoch), 16745 (server epoch)
org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no 
longer valid. There is probably another producer with a newer epoch. 16744 
(request epoch), 16745 (server epoch)
org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no 
longer valid. There is probably another producer with a newer epoch. 16744 
(request epoch), 16745 (server epoch)
org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no 
longer valid. There is probably another producer with a newer epoch. 16744 
(request epoch), 16745 (server epoch)

hdfs暂时没发现异常

hourly counter

2020-09-21 Thread Lian Jiang
Hi,

I have a window function with a window width of 1 min. I want to have an
hourly counter which is reset every hour so it never overflows. There are
multiple ways but none of them is straightforward:

StatsDClient instance = new NonBlockingStatsDClientBuilder()

int count = 0;

void incr() {
  metricClient.count("mycounter", 1, "mytag");

  count++;

}

void reset() {
   metricClient.count("mycounter", -count, "mytag");

   count = 0;

}

As you can see, the code needs to maintain a "count" variable to reset
mycounter.
Also since timer is not available in Window function, extra code is
needed to reset mycounter every hour.
Is there an easier way for implementing hourly counter? Or it is not a
concern that a counter will overflow?

Thanks

Lian


Re: Watermark advancement in late side output

2020-09-21 Thread orips
Great, thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-21 Thread Lian Jiang
Thanks guys. Given Flink 1.12 is not ready (e.g. not available in Maven
repo), I need to stick to 1.11.

Dawid,

For the code throwing "java.lang.Long cannot be cast to java.time.Instant",

The avro schema has:
union {null, timestamp_ms } eventTime = null;

The avro pojo does have the logical type conversion:

  private static SpecificData MODEL$ = new SpecificData();
static {
MODEL$.addLogicalTypeConversion(new
org.apache.avro.data.TimeConversions.TimestampMillisConversion());
  }

I don't see SpecificRecord#getConversions() you mentioned in avro repo.

The pojo code throws:
public void put(int field$, java.lang.Object value$) {
  switch (field$) {
  case 3: eventTime = (java.time.Instant)value$; break; // throw here
  }

I will send the full avdl and pojo offline to you for a close look.


Regards

Lian




On Mon, Sep 21, 2020 at 12:31 AM Aljoscha Krettek 
wrote:

> Hi All,
>
> Avro was finally bumped in
> https://issues.apache.org/jira/browse/FLINK-18192.
>
> The implementers didn't see
> https://issues.apache.org/jira/browse/FLINK-12532, but it is also
> updated now.
>
> Best,
> Aljoscha
>
> On 21.09.20 08:04, Arvid Heise wrote:
> > Hi Lian,
> >
> > we had a similar discussion on [1].
> >
> > TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2]
> until
> > Hive bumps it [3]. In the thread, I gave some options to avoid running
> into
> > the issue.
> > The easiest fix is to use Avro 1.8.2 all the way, but you may run into
> [4]
> > if your logical type is nullable (which is not necessary in most cases).
> >
> > Still, I think it's time for us to revise the decision to wait for Hive
> to
> > bump and rather upgrade independently. Avro was for a long time stuck on
> > 1.8 but the project gained traction again in the past two years. On the
> > other hand, Hive seems to be rather slow to respond to that and we
> > shouldn't have a slow moving component block us to support a fast moving
> > component if it's such apparent that users want it.
> > @Aljoscha Krettek  could you please pick that
> topic up
> > and ping the respective maintainers?
> >
> > [1]
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
> > [2] https://issues.apache.org/jira/browse/FLINK-12532
> > [3] https://issues.apache.org/jira/browse/HIVE-21737
> > [4] https://issues.apache.org/jira/browse/AVRO-1891
> >
> > On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang 
> wrote:
> >
> >> Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
> >> trying ConfluentRegistryAvroDeserializationSchema (if this is what you
> >> mean) but got "java.lang.Long cannot be cast to java.time.Instant". This
> >> may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
> >>  Is there any
> progress
> >> for this JIRA? Thanks. Regards!
> >>
> >>
> >> Stacktrace:
> >> java.lang.ClassCastException: java.lang.Long cannot be cast to
> >> java.time.Instant
> >> at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
> >> at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
> >> at
> >>
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
> >> at
> >>
> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
> >> at
> >>
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
> >> at
> >>
> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
> >> at
> >>
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
> >> at
> >>
> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
> >> at
> >>
> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> >> at
> >>
> 

Re: How to disconnect taskmanager via rest api?

2020-09-21 Thread Timo Walther

Hi Luan,

this sound more of a new feature request to me. Maybe you can already 
open an issue for it.


I will loop in Chesnay in CC if there is some possibility to achieve 
this already?


Regards,
Timo

On 21.09.20 06:37, Luan Cooper wrote:

Hi

We're running flink standalone cluster on k8s
when deleting a taskmanager pod manually, jobmanager *should disconnect 
it immediately*


however no such rest api available right now
we have to wait `akka.tcp.timeout` which means 30s later or more

What if I want to disconnect tm via rest api
Which way did you suggest ?

1. add disconnectTaskManager to 
org.apache.flink.runtime.dispatcher.Dispatcher

which means a new Interface

CompletableFuturedisconnectTaskManager(JobID jobId, ResourceID 
resourceId);

in org.apache.flink.runtime.webmonitor.RestfulGateway

2. Any other suggestions?

Thanks




Re: Problem with zookeeper and flink config

2020-09-21 Thread Timo Walther

Hi Saksham,

if I understand you correctly, you are running Zookeeper and Flink 
locally on your machine? Are you using Docker or is this a bare metal 
setup? The exception indicates that your paths contain `hdfs:` as URL 
scheme. Are you sure you want to use HDFS? If yes, you need to add an 
HDFS dependency. See:


https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/

Regards,
Timo

On 21.09.20 08:03, saksham sapra wrote:
I am trying to run multiple clusters in a Windows machine using 
zookeeper, so isued this page to configure zookeeper and flink config.yaml.


https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html

but somehow getting these exceptions, also i have worked on hadoop earlier.

Thanks & Regards,
Saksham Sapra




Re: Watermark advancement in late side output

2020-09-21 Thread Timo Walther

Hi Ori,

first of all, watermarks are sent to all side outputs (this is tested 
here [1]). Thus, operators in the side output branch of the pipeline 
will work similar to operators in the main branch.


When calling `assignTimestampsAndWatermarks`, the inserted operator will 
erase incoming watermarks and only emit self-generated ones. The logic 
can be found here [2]. Thus, downstream operators in the side output 
will only consider the newly assigned one (+ the end watermark Long.MAX).


I hope this helps.

Regards,
Timo

[1] 
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java#L114


On 21.09.20 12:21, Ori Popowski wrote:
Let's say I have an event-time stream with a window and a side output 
for late data, and in the side output of the late data, I further assign 
timestamps and do windowing - what is the watermark situation here?


The main stream has its own watermark advancement but the side output 
has its own. Do they maintain separate watermarks? Or they intermingle?


Thanks




Re: Flink-1.11 Table API &符号 语法问题

2020-09-21 Thread Leonard Xu
Hi

> 
> 在Flink-1.11 中,关于Table API的 select、filter方法,官方推荐语法,用$符号替换原来的"
> 
>  
> 
> 但是我在IDEA里,按照官方API的示例去写,$符号不生效,这是什么原因?
> 
>  

 看你用的scala,scala里推荐的应该是:$”a” 这种写法吧[1], 文档上得tab区分了scala和java的,在java 
和scala中$()都是个方法名,$“a” 是scala里的方法访问的一个隐式转换。在1.11前应该是单引号’a, 
不是双引号,也是用于构造Expression的一种方式。

即使用$(”a” )在scala中也应该是ok的,可以贴下报错信息吗?

Best
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html 

 



Re: K8s native 部署失败

2020-09-21 Thread Yang Wang
你确认一下挂载给TM的ConfigMap都是一样的吗,因为从你给的Log来看,应该不是用的社区文档里面的yaml[1]来运行的
另外,如果能够把JobManager和TaskManager的log分享一下的话,查问题会更方便一些

[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html

Best,
Yang

yanzhibo  于2020年9月18日周五 下午6:57写道:

>
> 跟jobmanager在同一个node上的tm是可以注册到jm上的,其他的node是不行的
>
> Jm是单机模式部署
>
> > 2020年9月17日 下午3:55,yanzhibo  写道:
> >
> > 是非ha,所有tm都注册不上来,但是在tm的pod中  根据service 是可以ping 通 jobmanager的
> >
> >
> >> 2020年9月17日 上午11:10,Yang Wang  写道:
> >>
> >> 你这个报错看着是TM向JM注册超时了,使用的HA还是非HA部署呢
> >>
> >> 如果是HA的话,TM是直接使用JM的Pod ip进行通信的,这个时候需要登录pod确认一下网络是否是通的
> >> 如果是非HA的话,TM是使用service来向JM注册,你需要检查一下K8s的kube proxy是否正常
> >>
> >> 另外,是所有TM都注册不上来,还是只有个别的。这个也可以排除网络问题
> >>
> >>
> >> Best,
> >> Yang
> >>
> >> yanzhibo  于2020年9月16日周三 下午5:25写道:
> >>
> >>> 一个job manager pod 提交job后,申请taskmanager失败
> >>>
> >>>
> >>> Taskmanager 的异常
> >>>
> >>> Fatal error occurred in TaskExecutor akka.tcp://
> >>> flink@179.10.251.70:6122/user/rpc/taskmanager_0.
> >>>
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
> >>> Could not register at the ResourceManager within the specified maximum
> >>> registration duration 30 ms. This indicates a problem with this
> >>> instance. Terminating now.
> >>>   at
> >>>
> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251)
> >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> >>>
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237)
> >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> >>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> >>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> >>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at akka.actor.Actor.aroundReceive(Actor.scala:517)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> >>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> >>>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> >>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>>   at
> >>>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >>> [flink-dist_2.12-1.11.1.jar:1.11.1]
> >>> 2020-09-16 09:14:39,077 ERROR
> >>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal
> >>> error occurred while executing the TaskManager. Shutting it down...
> >>>
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
> >>> Could not register at the ResourceManager within the specified maximum
> >>> registration duration 30 ms. This indicates a problem with this
> >>> instance. Terminating now.
> >>>   at
> >>>
> 

Zookeeper connection loss causing checkpoint corruption

2020-09-21 Thread Peter Westermann
I recently ran into an issue with our Flink cluster: A zookeeper service deploy 
caused a temporary connection loss and triggered a new jobmanager leader 
election. Leadership election was successful and our Flink job restarted from 
the last checkpoint.
This checkpoint appears to have been taken while we los connection to Zookeeper 
and ended up in a corrupted state so the Flink job kept failing. Here’s the 
exception stack trace for that:
2020-09-18 01:10:57
java.lang.Exception: Exception while creating StreamOperatorStateContext.
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) 
from any of the 1 provided restore options.
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
 ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
 at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)
 at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ... 11 more
Caused by: java.io.IOException: Error while opening RocksDB instance.
 at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)
 at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
 at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:220)
 at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
 at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
 at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
 at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:276)
 ... 15 more
Caused by: org.rocksdb.RocksDBException: Sst file size mismatch: 
/mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012554.sst.
 Size recorded in manifest 5309, actual size 1199
Sst file size mismatch: 
/mnt/data/tmp/flink-io-7de3e9df-f6d5-49f2-a92a-200d3c45d64d/job_2f6bc855799715e7846e2b95a9c01e6a_op_KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de__27_40__uuid_0aaa0092-2b59-48ef-b560-727a9e81a64f/db/012548.sst.
 Size recorded in manifest 654588, actual size 1541818

 at org.rocksdb.RocksDB.open(Native Method)
 at org.rocksdb.RocksDB.open(RocksDB.java:286)
 at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:77)
 ... 21 more

This is for Flink 1.10.2 with Zookeeper for HA, S3 as a state backend and 

Re: [DISCUSS] Drop Scala 2.11

2020-09-21 Thread Theo Diefenthal
We use a Cloudera 6.3 cluster in prod. I'd guess that it's still widely used in 
prod as those cloudera upgrades for major versions are planned long time ahead 
and take a significant amount of resources in big data lakes. 

On that 6.3. cluster, if I open spark-shell, I still see scala 2.11 in use: 
> Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 
> 1.8.0_181) 

Just to don't mess with any dependencies and as we have both, Flink and Spark 
jobs, we declare all dependencies with scala_2.11. But we never actually tried 
what happens if we would change that. We just seek maximum compatibility while 
at the same time, maintain and upgrade Flink ourselves to keep up with the 
development pace of the fast community and being able to use the latest major 
version quickly. 

So my suggestion: As long as it isn't much pain, I'd like to keep the scala 
2.11 support for a while. (or even better for me as a Java, but never scala 
developer: Drop it completely :) ) Speaking for my project: I think our ops 
team plans to upgrade to Cloudera 7 somewhere around Q2 2021, so personally I'm 
fine with dropping it then. 

Best regards 
Theo 




Von: "Igal Shilman"  
An: "Seth Wiesman"  
CC: "dev" , "user"  
Gesendet: Freitag, 11. September 2020 13:15:38 
Betreff: Re: [DISCUSS] Drop Scala 2.11 

@Galen FYI: the upcoming StateFun release would use Scala2.12 

On Thu, Sep 10, 2020 at 5:14 PM Seth Wiesman < [ mailto:sjwies...@gmail.com | 
sjwies...@gmail.com ] > wrote: 



@glen 

Yes, we would absolutely migrate statefun. StateFun can be compiled with Scala 
2.12 today, I'm not sure why it's not cross released. 

@aljoscha :) 

@mathieu Its on the roadmap but it's non-trivial and I'm not aware of anyone 
actively working on it. 

On Thu, Sep 10, 2020 at 10:09 AM Matthieu Bonneviot 
 wrote: 

BQ_BEGIN
That makes sense. 
We are using 2.12 for our production 
Also, for flink scala 2.12 support, it is in fact limited to scala 2.12.7. 
It is binary incompatible with version 2.12 above ( 
[ https://issues.apache.org/jira/browse/FLINK-12461 | 
https://issues.apache.org/jira/browse/FLINK-12461 ] ) 
That would be great to at least move to a more recent 2.12 version, and 
ideally to 2.13. 

Is there any scala support plan available? 

Matthieu 


On Thu, Sep 10, 2020 at 5:00 PM Aljoscha Krettek < [ mailto:aljos...@apache.org 
| aljos...@apache.org ] > 
wrote: 

> Yes! I would be in favour of this since it's blocking us from upgrading 
> certain dependencies. 
> 
> I would also be in favour of dropping Scala completely but that's a 
> different story. 
> 
> Aljoscha 
> 
> On 10.09.20 16:51, Seth Wiesman wrote: 
> > Hi Everyone, 
> > 
> > Think of this as a pre-flip, but what does everyone think about dropping 
> > Scala 2.11 support from Flink. 
> > 
> > The last patch release was in 2017 and in that time the scala community 
> has 
> > released 2.13 and is working towards a 3.0 release. Apache Kafka and 
> Spark 
> > have both dropped 2.11 support in recent versions. In fact, Flink's 
> > universal Kafka connector is stuck on 2.4 because that is the last 
> version 
> > with scala 2.11 support. 
> > 
> > What are people's thoughts on dropping Scala 2.11? How many are still 
> using 
> > it in production? 
> > 
> > Seth 
> > 
> 
> 

-- 
Matthieu Bonneviot 
Senior RD Engineer, DataDome 
M +33 7 68 29 79 34 <+33+7+68+29+79+34> 
E [ mailto:matthieu.bonnev...@datadome.co | matthieu.bonnev...@datadome.co ] < 
[ mailto:matthieu.bonnev...@datadome.co | matthieu.bonnev...@datadome.co ] > 
W [ http://www.datadome.co/ | www.datadome.co ] 
< [ 
http://www.datadome.co/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
 | 
http://www.datadome.co?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
 ] > 

[image: facebook] 
< [ 
https://www.facebook.com/datadome/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
 | 
https://www.facebook.com/datadome/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
 ] > 
[image: 
linkedin] 
< [ 
https://fr.linkedin.com/company/datadome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
 | 
https://fr.linkedin.com/company/datadome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
 ] > 
[image: 
twitter] 
< [ 
https://twitter.com/data_dome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
 | 
https://twitter.com/data_dome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
 ] > 




BQ_END




Re: 消费kafka source反压

2020-09-21 Thread Benchao Li
这个性能影响指的是跟那种情况进行对比呢?

smq <374060...@qq.com> 于2020年9月21日周一 下午6:49写道:

> 谢谢,多问一句,并行度为1的话,keyby算子加上keydstate对性能影响大吗
>
>
>
> ---原始邮件---
> 发件人: "Benchao Li" 发送时间: 2020年9月21日(周一) 下午4:39
> 收件人: "user-zh" 主题: Re: 消费kafka source反压
>
>
> 这种反压一般是下游反压过来的,可以检查下最后一个反压的算子,那个才是处理能力的瓶颈。
>
> smq <374060...@qq.com 于2020年9月21日周一 下午2:08写道:
>
> 
> 
> 大家好,在测试flink消费速率时,发现数据处理比较慢,大概一个task每秒处理1000条左右,经过查看UI界面,发现读取kafka数据源这块source反压达到1,请问有这方面经验吗?
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li


Re: App gets stuck in Created State

2020-09-21 Thread Zhu Zhu
Hi Arpith,

All tasks in CREATED state indicates no task is scheduled yet. It is
strange it a job gets stuck in this state.
Is it possible that you share the job manager log so we can check what is
happening there?

Thanks,
Zhu

Arpith P  于2020年9月21日周一 下午3:52写道:

> Hi,
>
> We have Flink 1.8.0 cluster deployed in Hadoop distributed mode, I often
> see even though Hadoop has enough resources Flink sits in Created state.
> We have 4 operators using 15 parallelism, 1 operator using 40 & 2 operators
> using 10. At time of submission I'm passing taskmanager memory as 4Gb and
> job manager memory as 2gb. and 2 slots This request should only take 20
> containers and 40 Vcores. But I see Flink is overallocating resource of 65
> containers and 129 Cores . I've attached snapshots for references.
>
> Right now I'm passing:  -yD yarn.heartbeat.container-request-interval=1000
> -yD taskmanager.network.memory.fraction=0.045 -yD
> taskmanager.memory.preallote=true.
>
> How do I control resource allocation?.
>
>


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 Thread Benchao Li
可以通过SQL的where条件来过滤吧

chuyuan  于2020年9月21日周一 下午6:48写道:

> 好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


回复:消费kafka source反压

2020-09-21 Thread smq
谢谢,多问一句,并行度为1的话,keyby算子加上keydstate对性能影响大吗



---原始邮件---
发件人: "Benchao Li"

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 Thread chuyuan
好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink-1.11 Table API &符号 语法问题

2020-09-21 Thread nashcen
在Flink-1.11 中,关于Table API的 select、filter方法,官方推荐语法,用$符号替换原来的"

 

但是我在IDEA里,按照官方API的示例去写,$符号不生效,这是什么原因?

 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Watermark advancement in late side output

2020-09-21 Thread Ori Popowski
Let's say I have an event-time stream with a window and a side output for
late data, and in the side output of the late data, I further assign
timestamps and do windowing - what is the watermark situation here?

The main stream has its own watermark advancement but the side output has
its own. Do they maintain separate watermarks? Or they intermingle?

Thanks


Re: 回复: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

2020-09-21 Thread 赵一旦
有人解答下,flink sql情况下的watermark生成是否有datastream api中的多分区取小机制呢?

这个问题datastream api是肯定不存在的。
情况1: 如果10个分区,来10个并发即可,然后在后续跟上watermark生成,本身watermark会合并取小。

情况2: 即使是2个并发,每个并发消费5个分区,但只要利用kafkaSouce提供的watermark生成机制也不会有这个问题。


anonnius  于2020年9月18日周五 下午3:47写道:

> hi: 感觉你的关注和回复
> 1> 下面是我的分析过程
> 1. 第一次是, 先在sql-client.sh 中执行sql
> select
> tumble_start(rowtime, interval '2' MINUTE) as wStart,
> tumble_end(rowtime, interval '2' MINUTE) as wEnd,
> count(1) as pv,
> count(distinct uuid) as uv
> from iservVisit
> group by tumble(rowtime, interval '2' MINUTE)
>
> 此时, 由于数据 是一条一条的通过kafka生产者工具(kafka-console-producer.sh)写入,
> 并且由kafka-connector会不停的消费数据, 获取的数据是和手动写入的数据的顺序是一样的
>
> 2. 第二次是, 退出sql-client.sh后在执行sql
> select
> tumble_start(rowtime, interval '2' MINUTE) as wStart,
> tumble_end(rowtime, interval '2' MINUTE) as wEnd,
> count(1) as pv,
> count(distinct uuid) as uv
> from iservVisit
> group by tumble(rowtime, interval '2' MINUTE)
> 这时由于数据已经写入kafka了, 在由kafka-connector进行消费的时候, 由于topic有3个分区, 消费后获取的消息的顺序和
> 手动通过kafka生产者工具(kafka-console-producer.sh)写入时的顺序
> 不一致了, 这样rowtime时间靠后的数据可能先被消费, 导致产生了比较大的watermark, 导致后续消费的部分消息被忽略了
>
> 3. 通过将建表时 watermark的间隔变大些, 能还原第一次的结果, 这种方式还是考虑中(考虑是否一致有效)
> create table iservVisit (
> type string comment '时间类型',
> uuid string comment '用户uri',
> clientTime string comment '10位时间戳',
> rowtime as
> to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10)
> as bigint))), -- 计算列, 10位时间戳转为timestamp类型
> WATERMARK for rowtime as rowtime - INTERVAL '5' MINUTE -- 计算列,
> 作为watermark, 有1分钟变为5分钟
> ) with (
> 'connector' = 'kafka-0.10',
> 'topic' = 'message-json',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'consumer-rt',
> 'format' = 'json',
> 'json.ignore-parse-errors' = 'true',
> 'scan.startup.mode' = 'earliest-offset'
> )
> 4. 初步结论是: 如何保证/或通过什么办法, 让每个分区的消费数据的速度保持一致
> 5. 附件可以通过sublime sql/hql插件查看, 这样显示会清晰点
>
>
>
>
>
>
>
> 在 2020-09-18 14:42:42,"chengyanan1...@foxmail.com" 
>  写道:
> >先占个楼
> >我按照题主给的文档,一边发送数据,一边执行以下SQL实时查看查询结果
> >select
> >tumble_start(rowtime, interval '2' MINUTE) as wStart,
> >tumble_end(rowtime, interval '2' MINUTE) as wEnd,
> >count(1) as pv,
> >count(distinct uuid) as uv
> >from iservVisit
> >group by tumble(rowtime, interval '2' MINUTE)
> >最后得到的结果是这样的 :(跟题主不一样)
> >
> > wStart  wEndpv  
> >   uv
> >  2020-09-18T09:14  2020-09-18T09:16 
> > 2 2
> >  2020-09-18T09:16  2020-09-18T09:18 
> > 8 3
> >  2020-09-18T09:18  2020-09-18T09:20 
> > 8 3
> >  2020-09-18T09:20  2020-09-18T09:22 
> > 2 2
> >
> >等所有数据都发送完,退出sql-client然后再执行上边的查询语句最后得到的结果:(跟题主是一样的):
> >wStartwEnd   
> >pvuv
> >2020-09-18T09:14  2020-09-18T09:16  2
> > 2
> >2020-09-18T09:16  2020-09-18T09:18  2
> > 2
> >2020-09-18T09:18  2020-09-18T09:20  8
> > 3
> >2020-09-18T09:20  2020-09-18T09:22  2
> > 2
> >
> >
> >
> >
> >发件人: anonnius
> >发送时间: 2020-09-18 11:24
> >收件人: user-zh
> >主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
> >hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
> >0> mac本地环境
> >1> flink 1.11.1
> >2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
> >3> 使用的是sql-client.sh 环境
> >4> 先在sql-cli中创建了iservVisit表
> >create table iservVisit (
> >type string comment '时间类型',
> >uuid string comment '用户uri',
> >clientTime string comment '10位时间戳',
> >rowtime as 
> > to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) 
> > as bigint))), -- 计算列, 10位时间戳转为timestamp类型
> >WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 
> > 作为watermark
> >) with (
> >'connector' = 'kafka-0.10',
> >'topic' = 'message-json',
> >'properties.bootstrap.servers' = 'localhost:9092',
> >'properties.group.id' = 'consumer-rt',
> >'format' = 'json',
> >'json.ignore-parse-errors' = 'true',
> >'scan.startup.mode' = 'earliest-offset'
> >)
> >然后在sql-cli执行sql
> >select
> >tumble_start(rowtime, interval '2' MINUTE) as wStart,
> >tumble_end(rowtime, interval '2' MINUTE) as wEnd,
> >count(1) as pv,
> >count(distinct uuid) as uv
> >from iservVisit
> >group by tumble(rowtime, interval '2' MINUTE)
> >5> 向kafka生产者依次发送下面的json消息
> >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}
> >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"}
> >{"type": "iservVisit", "uuid": "a", "clientTime": 

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-21 Thread 赵一旦
的确问题没说明白,貌似flink不会存在类似问题。

hao kong  于2020年9月16日周三 下午6:45写道:

> 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。
>
> Congxian Qiu  于2020年9月16日周三 下午1:55写道:
>
> > Hi
> > 没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
> > 如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
> > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink
> > 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
> > 另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
> > [2]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
> > Best,
> > Congxian
> >
> >
> > hao kong  于2020年9月16日周三 上午10:24写道:
> >
> > > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
> > > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
> > >
> > >
> >
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
> > >
> >
>


资源均衡问题

2020-09-21 Thread 赵一旦
当前的flink资源分配问题。我一个任务3个算子,算子A并行度为1,算子B并行度为10,算子C并行度15。集群5台机器。我怎么保证算子B和C在5台机器完全均衡呢?我不介意算子A在哪,但算子B和C必须完全均衡到5个机器。


算子更多了就更复杂了,部分算子的并行度不是5的倍数,会导致越来越多的类似B和C这些算子在5台机器的不均衡。


Re: Flink Table SQL and writing nested Avro files

2020-09-21 Thread Dawid Wysakowicz
Hi Dan,

I think the best what I can suggest is this:

|SELECT ||
|

|    ROW(left.field0, left.field1, left.field2, ...),|

|    ROW(right.field0, right.field1, right.field2, ...)|

|FROM ...|

You will need to list all the fields manually, as SQL does not allow for
asterisks in regular function calls.

If you are willing to give the Table API a try you might workaround some
of the manual work with the Column Function[1]

        Table join = t1.join(t2).where($("id1").isEqual($("id2")));
        join
            .select(
                row(withColumns(range(1, t1.getSchema().getFieldCount(,
                row(withColumns(range(
                    t1.getSchema().getFieldCount() + 1,
                    t1.getSchema().getFieldCount() +
t2.getSchema().getFieldCount(
            )
            .executeInsert("flat_avro")
            .await();


Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#column-functions

On 18/09/2020 09:47, Dan Hill wrote:
> Hi!
>
> I want to join two tables and write the results to Avro where the left
> and right rows are nested in the avro output.  Is it possible to do
> this with the SQL interface?  
>
> Thanks!
> - Dan
>  CREATE TABLE `flat_avro` (
>`left` ROW,
>`right` ROW
> ) WITH (
>'connector' = 'filesystem',
>'path' = 's3p://blah/blah',
>'format' = 'avro'
> );INSERT INTO `flat_avro`
> SELECT left.*, right.* FROM `left`
> LEFT JOIN `right`
> ON `left`.`id` = `right`.`id` );


signature.asc
Description: OpenPGP digital signature


Fwd: Flink multiple task managers setup

2020-09-21 Thread saksham sapra
HI,

 i installed cygdrive and tried to run start-cluster.sh where zookeeper is
up and running and defined one job manager and one task manager,
but getting this issue.

$ start-cluster.sh start
Starting HA cluster with 1 masters.
-zTheFIND: Invalid switch
 system cannot find the file specified.
[ERROR] Flink distribution jar not found in
/cygdrive/d/Apacheflink/dist/apache-flink-1.9.3/deps/lib.
File not found -
D:\Apacheflink\dist\apache-flink-1.9.3\deps/log/flink--standalo
nesession-7-PLRENT-5LC73H2*
Starting standalonesession daemon on host PLRENT-5LC73H2.
-zThe system cannot FIND: Invalid switch
find the file specified.
[ERROR] Flink distribution jar not found in
/cygdrive/d/Apacheflink/dist/apache-flink-1.9.3/deps/lib.
File not found -
D:\Apacheflink\dist\apache-flink-1.9.3\deps/log/flink--taskexec
utor-7-PLRENT-5LC73H2*
Starting taskexecutor daemon on host PLRENT-5LC73H2.

>
>>


flink--standalonesession-7-PLRENT-5LC73H2.out
Description: Binary data


flink--standalonesession-6-PLRENT-5LC73H2.out
Description: Binary data


flink--taskexecutor-7-PLRENT-5LC73H2.out
Description: Binary data


flink--taskexecutor-6-PLRENT-5LC73H2.out
Description: Binary data


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 Thread Benchao Li
为什么要用DataStream解析之后再注册成table呢?
可以尝试下直接用DDL声明一个source,用内置的json format来解析。

chuyuan  于2020年9月21日周一 下午4:44写道:

> 我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
> {
> "properties":{
> "platformType":"APP",
> "$os":"iOS",
> "$screen_width":414,
> "$app_version":"1.0",
> "$is_first_day":false,
> "$model":"x86_64",
> "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
> "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
> "isLogin":false,
> "zrIdfa":"----",
> "$network_type":"WIFI",
> "$wifi":true,
> "$timezone_offset":-480,
> "$resume_from_background":false,
> "tdid":"",
> "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
> "$screen_height":896,
> "$lib_version":"2.0.10",
> "$lib":"iOS",
> "$os_version":"13.4.1",
> "$manufacturer":"Apple",
> "$is_first_time":false,
> "$app_id":"Com.ziroom..ZRSensorsSDK"
> },
> "type":"track",
> "lib":{
> "$lib_version":"2.0.10",
> "$lib":"iOS",
> "$app_version":"1.0",
> "$lib_method":"autoTrack"
> }
> }
> 其中key为lib和properties的value是Json类型,其中字段可动态追加。
>
>
> 第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map类型),
> @Data
> public static class CustomBuriedPointDTO {
> /**
>  * 跟踪ID
>  */
> private Long track_id;
> /**
>  * 事件时间
>  */
> private Long event_time;
>
> /**
>  * 类型
>  */
> private String type;
> /**
>  * 排重后Id
>  */
> private String distinct_id;
> /**
>  * 匿名ID
>  */
> private String anonymous_id;
> /**
>  * 包信息
>  */
> private @DataTypeHint("RAW") Map lib;
> /**
>  * 事件
>  */
> private String event;
> /**
>  * 属性
>  */
> // private Map properties;
> private @DataTypeHint("RAW") Map
> properties;
> /**
>  * 刷新时间
>  */
> private Long flush_time;
> /**
>  * 事件日期
>  */
> private String dt;
>
>
> /**
>  * 封装数据对象中字段信息
>  */
> public void assembly(CustomBuriedPointDO pointDO) {
> // 复制DO属性到DTO
> BeanUtils.copyProperties(pointDO, this);
>
> /*
> 转换特殊字段
>  */
> // 设置分区日期
> Long eventTimeLong = pointDO.getEvent_time();
> if (eventTimeLong == null) {
> eventTimeLong = System.currentTimeMillis();
> }
> Date eventTime = new Date(eventTimeLong);
> DateFormat dateFormatDate = new
> SimpleDateFormat("-MM-dd");
> this.setDt(dateFormatDate.format(eventTime));
>
> // json字段转换为Map类型
> Map propertiesMap = null;
> if
> (StringUtils.isNotBlank(pointDO.getProperties()))
> {
> propertiesMap = (Map)
> JSON.parse(pointDO.getProperties());
> }
> this.setProperties(propertiesMap);
> Map libMap = null;
> if (StringUtils.isNotBlank(pointDO.getLib())) {
> libMap = (Map)
> JSON.parse(pointDO.getLib());
> }
> this.setLib(libMap);
> }
> }
>
> 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
> "CREATE TABLE test.test(" +
> "  type STRING," +
> "  lib MAP,"
> +
> "  properties
> MAP" +
> ") PARTITIONED BY (" +
> "  dt string" +
> " ) stored as orcfile " +
> " TBLPROPERTIES" +
> " (" +
>
> "'partition.time-extractor.kind'='custom'," +
>
> 

Re: Re: [SQL] parse table name from sql statement

2020-09-21 Thread Harold.Miao
大佬  能不能给点示例

Benchao Li  于2020年9月21日周一 下午4:38写道:

> 我感觉可以先把SQL转成RelNode,然后用Calcite的visitor模式的RelShuttle来获取?
>
> Harold.Miao  于2020年9月21日周一 下午1:58写道:
>
> > 主要是我没有完整的所有单元case, 总是感觉写的不完整。
> >
> > 郭士榕  于2020年9月21日周一 上午11:08写道:
> >
> > >
> > >
> > >
> > > 就是要一个一个判断做解析下推的,比如你举的SqlJoin例子, 然后继续left,right下推。
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2020-09-21 10:50:31,"Harold.Miao"  写道:
> > > >主要是嵌套回溯特别复杂, 例如getFrom之后后面可能又是嵌套一个SqlJoin等等类似情况太多。 还有要做很多的类型转换。
> > > >
> > > >郭士榕  于2020年9月21日周一 上午10:21写道:
> > > >
> > > >> 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> 在 2020-09-21 10:12:13,"Harold.Miao"  写道:
> > > >> >hi all
> > > >> >
> > > >> >请教大家在复杂sql语句中parse所有的table name是怎么实现的。
> > > >> >
> > > >> >谢谢
> > > >> >
> > > >> >--
> > > >> >
> > > >> >Best Regards,
> > > >> >Harold Miao
> > > >>
> > > >
> > > >
> > > >--
> > > >
> > > >Best Regards,
> > > >Harold Miao
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best Regards,
Harold Miao


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 Thread chuyuan
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
{
"properties":{
"platformType":"APP",
"$os":"iOS",
"$screen_width":414,
"$app_version":"1.0",
"$is_first_day":false,
"$model":"x86_64",
"$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
"isLogin":false,
"zrIdfa":"----",
"$network_type":"WIFI",
"$wifi":true,
"$timezone_offset":-480,
"$resume_from_background":false,
"tdid":"",
"zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"$screen_height":896,
"$lib_version":"2.0.10",
"$lib":"iOS",
"$os_version":"13.4.1",
"$manufacturer":"Apple",
"$is_first_time":false,
"$app_id":"Com.ziroom..ZRSensorsSDK"
},
"type":"track",
"lib":{
"$lib_version":"2.0.10",
"$lib":"iOS",
"$app_version":"1.0",
"$lib_method":"autoTrack"
}
}
其中key为lib和properties的value是Json类型,其中字段可动态追加。

第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map类型),
@Data
public static class CustomBuriedPointDTO {
/**
 * 跟踪ID
 */
private Long track_id;
/**
 * 事件时间
 */
private Long event_time;

/**
 * 类型
 */
private String type;
/**
 * 排重后Id
 */
private String distinct_id;
/**
 * 匿名ID
 */
private String anonymous_id;
/**
 * 包信息
 */
private @DataTypeHint("RAW") Map lib;
/**
 * 事件
 */
private String event;
/**
 * 属性
 */
// private Map properties;
private @DataTypeHint("RAW") Map properties;
/**
 * 刷新时间
 */
private Long flush_time;
/**
 * 事件日期
 */
private String dt;
   

/**
 * 封装数据对象中字段信息
 */
public void assembly(CustomBuriedPointDO pointDO) {
// 复制DO属性到DTO
BeanUtils.copyProperties(pointDO, this);

/*
转换特殊字段
 */
// 设置分区日期
Long eventTimeLong = pointDO.getEvent_time();
if (eventTimeLong == null) {
eventTimeLong = System.currentTimeMillis();
}
Date eventTime = new Date(eventTimeLong);
DateFormat dateFormatDate = new
SimpleDateFormat("-MM-dd");
this.setDt(dateFormatDate.format(eventTime));

// json字段转换为Map类型
Map propertiesMap = null;
if (StringUtils.isNotBlank(pointDO.getProperties()))
{
propertiesMap = (Map)
JSON.parse(pointDO.getProperties());
}
this.setProperties(propertiesMap);
Map libMap = null;
if (StringUtils.isNotBlank(pointDO.getLib())) {
libMap = (Map)
JSON.parse(pointDO.getLib());
}
this.setLib(libMap);
}
}

第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
"CREATE TABLE test.test(" +
"  type STRING," +
"  lib MAP,"
+
"  properties
MAP" +
") PARTITIONED BY (" +
"  dt string" +
" ) stored as orcfile " +
" TBLPROPERTIES" +
" (" +
   
"'partition.time-extractor.kind'='custom'," +
   
"'partition.time-extractor.timestamp-pattern'='$dt'," +
   
"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+
   
"'sink.partition-commit.trigger'='partition-time'," +
 

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 Thread chuyuan
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
{
"properties":{
"platformType":"APP",
"$os":"iOS",
"$screen_width":414,
"$app_version":"1.0",
"$is_first_day":false,
"$model":"x86_64",
"$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
"isLogin":false,
"zrIdfa":"----",
"$network_type":"WIFI",
"$wifi":true,
"$timezone_offset":-480,
"$resume_from_background":false,
"tdid":"",
"zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"$screen_height":896,
"$lib_version":"2.0.10",
"$lib":"iOS",
"$os_version":"13.4.1",
"$manufacturer":"Apple",
"$is_first_time":false,
"$app_id":"Com.ziroom..ZRSensorsSDK"
},
"type":"track",
"lib":{
"$lib_version":"2.0.10",
"$lib":"iOS",
"$app_version":"1.0",
"$lib_method":"autoTrack"
}
}
其中key为lib和properties的value是Json类型,其中字段可动态追加。

第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map类型),
@Data
public static class CustomBuriedPointDTO {
/**
 * 跟踪ID
 */
private Long track_id;
/**
 * 事件时间
 */
private Long event_time;

/**
 * 类型
 */
private String type;
/**
 * 排重后Id
 */
private String distinct_id;
/**
 * 匿名ID
 */
private String anonymous_id;
/**
 * 包信息
 */
private @DataTypeHint("RAW") Map lib;
/**
 * 事件
 */
private String event;
/**
 * 属性
 */
// private Map properties;
private @DataTypeHint("RAW") Map properties;
/**
 * 刷新时间
 */
private Long flush_time;
/**
 * 事件日期
 */
private String dt;
   

/**
 * 封装数据对象中字段信息
 */
public void assembly(CustomBuriedPointDO pointDO) {
// 复制DO属性到DTO
BeanUtils.copyProperties(pointDO, this);

/*
转换特殊字段
 */
// 设置分区日期
Long eventTimeLong = pointDO.getEvent_time();
if (eventTimeLong == null) {
eventTimeLong = System.currentTimeMillis();
}
Date eventTime = new Date(eventTimeLong);
DateFormat dateFormatDate = new
SimpleDateFormat("-MM-dd");
this.setDt(dateFormatDate.format(eventTime));

// json字段转换为Map类型
Map propertiesMap = null;
if (StringUtils.isNotBlank(pointDO.getProperties()))
{
propertiesMap = (Map)
JSON.parse(pointDO.getProperties());
}
this.setProperties(propertiesMap);
Map libMap = null;
if (StringUtils.isNotBlank(pointDO.getLib())) {
libMap = (Map)
JSON.parse(pointDO.getLib());
}
this.setLib(libMap);
}
}

第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
"CREATE TABLE test.test(" +
"  type STRING," +
"  lib MAP,"
+
"  properties
MAP" +
") PARTITIONED BY (" +
"  dt string" +
" ) stored as orcfile " +
" TBLPROPERTIES" +
" (" +
   
"'partition.time-extractor.kind'='custom'," +
   
"'partition.time-extractor.timestamp-pattern'='$dt'," +
   
"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+
   
"'sink.partition-commit.trigger'='partition-time'," +
 

Re: 消费kafka source反压

2020-09-21 Thread Benchao Li
这种反压一般是下游反压过来的,可以检查下最后一个反压的算子,那个才是处理能力的瓶颈。

smq <374060...@qq.com> 于2020年9月21日周一 下午2:08写道:

>
> 大家好,在测试flink消费速率时,发现数据处理比较慢,大概一个task每秒处理1000条左右,经过查看UI界面,发现读取kafka数据源这块source反压达到1,请问有这方面经验吗?



-- 

Best,
Benchao Li


Re: Re: [SQL] parse table name from sql statement

2020-09-21 Thread Benchao Li
我感觉可以先把SQL转成RelNode,然后用Calcite的visitor模式的RelShuttle来获取?

Harold.Miao  于2020年9月21日周一 下午1:58写道:

> 主要是我没有完整的所有单元case, 总是感觉写的不完整。
>
> 郭士榕  于2020年9月21日周一 上午11:08写道:
>
> >
> >
> >
> > 就是要一个一个判断做解析下推的,比如你举的SqlJoin例子, 然后继续left,right下推。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-09-21 10:50:31,"Harold.Miao"  写道:
> > >主要是嵌套回溯特别复杂, 例如getFrom之后后面可能又是嵌套一个SqlJoin等等类似情况太多。 还有要做很多的类型转换。
> > >
> > >郭士榕  于2020年9月21日周一 上午10:21写道:
> > >
> > >> 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> 在 2020-09-21 10:12:13,"Harold.Miao"  写道:
> > >> >hi all
> > >> >
> > >> >请教大家在复杂sql语句中parse所有的table name是怎么实现的。
> > >> >
> > >> >谢谢
> > >> >
> > >> >--
> > >> >
> > >> >Best Regards,
> > >> >Harold Miao
> > >>
> > >
> > >
> > >--
> > >
> > >Best Regards,
> > >Harold Miao
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>


-- 

Best,
Benchao Li


Re:Re: 使用flinksql时 jdbc connector参数不起作用

2020-09-21 Thread chenxuying
好的, 明白




在 2020-09-17 20:29:09,"Jark Wu"  写道:
>>  sink.buffer-flush.max-rows = '0' 导致每接收一条数据就插入数据库
>
>这个应该是个 bug,我建了个 issue:https://issues.apache.org/jira/browse/FLINK-19280
>
>Best,
>Jark
>
>On Thu, 17 Sep 2020 at 18:15, chenxuying  wrote:
>
>> 环境是flink1.11.2+idea
>> sql:
>> CREATE TABLE sourceTable (
>> platform STRING
>> ,game_id bigint
>> ) WITH (
>> ...
>> );
>> CREATE TABLE sinktable (
>> platform STRING
>> ,game_id bigint
>> ) WITH (
>> 'connector' = 'jdbc',
>> 'url' = '',
>> 'table-name' = '',
>> 'driver' = 'com.mysql.jdbc.Driver',
>> 'username' = '',
>> 'password' = '',
>> 'sink.buffer-flush.max-rows' = '2',
>> 'sink.buffer-flush.interval' = '30s'
>> );
>> insert into sinktable select platform,game_id from sourceTable;
>>
>>
>> 官方文档[1]中 , 说到 sink.buffer-flush.max-rows和sink.buffer-flush.interval
>> 这两个属性可以设置成 '0' 来禁用他 , 不过我试了下是不行
>> 如果设置如下
>>sink.buffer-flush.max-rows = '0'
>>'sink.buffer-flush.interval' = '60s'
>> 导致每接收一条数据就插入数据库
>> 如果设置如下
>>sink.buffer-flush.max-rows = '10'
>>'sink.buffer-flush.interval' = '0'
>> 导致无法插入数据库
>>
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options
>>
>>


App gets stuck in Created State

2020-09-21 Thread Arpith P
Hi,

We have Flink 1.8.0 cluster deployed in Hadoop distributed mode, I often
see even though Hadoop has enough resources Flink sits in Created state.
We have 4 operators using 15 parallelism, 1 operator using 40 & 2 operators
using 10. At time of submission I'm passing taskmanager memory as 4Gb and
job manager memory as 2gb. and 2 slots This request should only take 20
containers and 40 Vcores. But I see Flink is overallocating resource of 65
containers and 129 Cores . I've attached snapshots for references.

Right now I'm passing:  -yD yarn.heartbeat.container-request-interval=1000
-yD taskmanager.network.memory.fraction=0.045 -yD
taskmanager.memory.preallote=true.

How do I control resource allocation?.


Re: Disable WAL in RocksDB recovery

2020-09-21 Thread Yu Li
Great, thanks for the follow up.

Best Regards,
Yu


On Mon, 21 Sep 2020 at 15:04, Juha Mynttinen 
wrote:

> Good,
>
> I opened this JIRA for the issue
> https://issues.apache.org/jira/browse/FLINK-19303. The discussion can be
> moved there.
>
> Regards,
> Juha
> --
> *From:* Yu Li 
> *Sent:* Friday, September 18, 2020 3:58 PM
> *To:* Juha Mynttinen 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Disable WAL in RocksDB recovery
>
> Thanks for bringing this up Juha, and good catch.
>
> We actually are disabling WAL for routine writes by default when using
> RocksDB and never encountered segment fault issues. However, from history
> in FLINK-8922, segment fault issue occurs during restore if WAL is
> disabled, so I guess the root cause lies in RocksDB batch write
> (org.rocksdb.WriteBatch). And IMHO this is a RocksDB bug (it should work
> well when WAL is disabled, no matter under single or batch write).
>
> +1 for opening a new JIRA to figure the root cause out, fix it and disable
> WAL during restore by default (maybe checking the fixes around WriteBatch
> in later RocksDB versions could help locate the issue more quickly), and
> thanks for volunteering taking the efforts. I will follow up and help
> review if any findings / PR submission.
>
> Best Regards,
> Yu
>
>
> On Wed, 16 Sep 2020 at 13:58, Juha Mynttinen 
> wrote:
>
> Hello there,
>
> I'd like to bring to discussion a previously discussed topic - disabling
> WAL in RocksDB recovery.
>
> It's clear that WAL is not needed during the process, the reason being
> that the WAL is never read, so there's no need to write it.
>
> AFAIK the last thing that was done with WAL during recovery is an attempt
> to remove it and later reverting that removal 
> (https://issues.apache.org/jira/browse/FLINK-8922
> [issues.apache.org]
> ).
> If I interpret the comments in the ticket correctly, what happened was that
> a) WAL was kept in the recovery, 2) it's unknown why removing WAL causes
> segfault.
>
> What can be seen in the ticket is that having WAL causes a significant
> performance penalty. Thus, getting rid of WAL would be a very nice
> performance improvement. I think it'd be worth to creating a new JIRA
> ticket at least as a reminder that WAL should be removed?
>
> I'm planning adding an experimental flag to remove WAL in the environment
> I'm using Flink and trying it out. If the flag is made configurable, WAL
> can always be re-enabled if removing it causes issues.
>
> Thoughts?
>
> Regards,
> Juha
>
>


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 Thread Benchao Li
Hi chuyuan,

可以详细描述下你遇到的问题么,比如下面这些信息
- 用的是哪个Flink版本
- SQL(包括DDL和query)
- 数据是什么样子的

chuyuan  于2020年9月21日周一 下午2:40写道:

>  LEGACY('RAW',
> 'ANY')对应sql中数据类型改为:MAP,仍然报错,异常:
> org.apache.flink.table.api.TableException: A raw type backed by type
> information has no serializable string representation. It needs to be
> resolved into a proper raw type.
> 方便说下具体实现细节吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-21 Thread Aljoscha Krettek

Hi All,

Avro was finally bumped in 
https://issues.apache.org/jira/browse/FLINK-18192.


The implementers didn't see 
https://issues.apache.org/jira/browse/FLINK-12532, but it is also 
updated now.


Best,
Aljoscha

On 21.09.20 08:04, Arvid Heise wrote:

Hi Lian,

we had a similar discussion on [1].

TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until
Hive bumps it [3]. In the thread, I gave some options to avoid running into
the issue.
The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4]
if your logical type is nullable (which is not necessary in most cases).

Still, I think it's time for us to revise the decision to wait for Hive to
bump and rather upgrade independently. Avro was for a long time stuck on
1.8 but the project gained traction again in the past two years. On the
other hand, Hive seems to be rather slow to respond to that and we
shouldn't have a slow moving component block us to support a fast moving
component if it's such apparent that users want it.
@Aljoscha Krettek  could you please pick that topic up
and ping the respective maintainers?

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
[2] https://issues.apache.org/jira/browse/FLINK-12532
[3] https://issues.apache.org/jira/browse/HIVE-21737
[4] https://issues.apache.org/jira/browse/AVRO-1891

On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang  wrote:


Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
trying ConfluentRegistryAvroDeserializationSchema (if this is what you
mean) but got "java.lang.Long cannot be cast to java.time.Instant". This
may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
 Is there any progress
for this JIRA? Thanks. Regards!


Stacktrace:
java.lang.ClassCastException: java.lang.Long cannot be cast to
java.time.Instant
at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
at
com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
at
com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
at
org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)



Code:

import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import 
org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import 
org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.io.Serializable;

public class SpecificRecordSerDe implements
 KafkaSerializationSchema, KafkaContextAware, 

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-21 Thread Dawid Wysakowicz
Hey Arvid,

Just a quick comment to Arvid's mail for now. It should be safe to
update the Avro version even if we've been declaring dependency on Avro
1.8.2 by default. Moreover up until now we do not bundle any version of
Avro in any of the uber jars we ship. It is true we used Avro version
1.8.2 by default because that's the version that hadoop ships with (the
hadoop distributions really bundle avro dependency as part of their
binaries).

As for the other issue, because Hadoop is no longer the most frequent
environment Flink is run and as you said they are not the fastest with
upgrading dependencies we decided to upgrade the default Avro version
that Flink declares. From Flink 1.12 by default we depend on Avro 1.10
It has already been merged into master[1]. Still users should be able to
downgrade the avro version if they need. (If they have specific records
generated with older versions or they use hadoop.)

@Lian Will look further into the issue. My suspicion though is there is
a problem with the Conversions your generated class declares. In order
for Flink to handle logical types correctly the generated Avro class
must return valid Conversions via SpecificRecord#getConversions(). Could
you share the avro schema and the generated class? Without the full
picture it will be hard to track down the problem. Again best would be
an example that I could run.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-18192

On 21/09/2020 08:04, Arvid Heise wrote:
> Hi Lian,
>
> we had a similar discussion on [1].
>
> TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2]
> until Hive bumps it [3]. In the thread, I gave some options to avoid
> running into the issue.
> The easiest fix is to use Avro 1.8.2 all the way, but you may run into
> [4] if your logical type is nullable (which is not necessary in most
> cases).
>
> Still, I think it's time for us to revise the decision to wait for
> Hive to bump and rather upgrade independently. Avro was for a long
> time stuck on 1.8 but the project gained traction again in the past
> two years. On the other hand, Hive seems to be rather slow to respond
> to that and we shouldn't have a slow moving component block us to
> support a fast moving component if it's such apparent that users want it.
> @Aljoscha Krettek  could you please pick
> that topic up and ping the respective maintainers?
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
> [2] https://issues.apache.org/jira/browse/FLINK-12532
> [3] https://issues.apache.org/jira/browse/HIVE-21737
> [4] https://issues.apache.org/jira/browse/AVRO-1891
>
> On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang  > wrote:
>
> Thanks Dawid for proposing ConfluentRegistryDeserializationSchema.
> I am trying ConfluentRegistryAvroDeserializationSchema (if this is
> what you mean)but got "java.lang.Long cannot be cast to
> java.time.Instant". This may be caused by
> https://issues.apache.org/jira/browse/FLINK-11030.
>  Is there any
> progress for this JIRA? Thanks. Regards!
>
>
> Stacktrace:
> java.lang.ClassCastException: java.lang.Long cannot be cast to
> java.time.Instant
> at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
> at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
> at
> 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
> at
> 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
> at
> 
> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> at
> 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
> at
> 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> at
> 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
> at
> 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
> at
> 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
> at
> 
> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> at
> 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
> at
> 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> at
> 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
> at
> 
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
> at
> 
> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
> at
> 
> 

Flink消费kafka中json数据,其中有个value是Json类型,写入Hive表Map结构异常

2020-09-21 Thread chuyuan
hello,大婶们,Flink消费kafka中json数据,示例:
{
"properties":{
"platformType":"APP",
"$os":"iOS",
"$screen_width":414,
"$app_version":"1.0",
"$is_first_day":false,
"$model":"x86_64",
"$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
"isLogin":false,
"zrIdfa":"----",
"$network_type":"WIFI",
"$wifi":true,
"$timezone_offset":-480,
"$resume_from_background":false,
"tdid":"",
"zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"$screen_height":896,
"$lib_version":"2.0.10",
"$lib":"iOS",
"$os_version":"13.4.1",
"$manufacturer":"Apple",
"$is_first_time":false,
"$app_id":"Com.ziroom..ZRSensorsSDK"
},
"type":"track",
"lib":{
"$lib_version":"2.0.10",
"$lib":"iOS",
"$app_version":"1.0",
"$lib_method":"autoTrack"
}
}
其中key为lib和properties的value是Json类型,其中字段可动态追加,
然后我把Json传封装为DO,最后为了转换lib和properties的value为Map,转换成了DTO,
@Data
public static class CustomBuriedPointDTO {
/**
 * 跟踪ID
 */
private Long track_id;
/**
 * 事件时间
 */
private Long event_time;

/**
 * 类型
 */
private String type;
/**
 * 排重后Id
 */
private String distinct_id;
/**
 * 匿名ID
 */
private String anonymous_id;
/**
 * 包信息
 */
private @DataTypeHint("RAW") Map lib;
/**
 * 事件
 */
private String event;
/**
 * 属性
 */
//  private Map properties;
private @DataTypeHint("RAW") Map properties;
/**
 * 刷新时间
 */
private Long flush_time;
/**
 * 事件日期
 */
private String dt;


/**
 * 封装数据对象中字段信息
 */
public void assembly(CustomBuriedPointDO pointDO) {
// 复制DO属性到DTO
BeanUtils.copyProperties(pointDO, this);

/*
转换特殊字段
 */
// 设置分区日期
Long eventTimeLong = pointDO.getEvent_time();
if (eventTimeLong == null) {
eventTimeLong = System.currentTimeMillis();
}
Date eventTime = new Date(eventTimeLong);
DateFormat dateFormatDate = new 
SimpleDateFormat("-MM-dd");
this.setDt(dateFormatDate.format(eventTime));

// json字段转换为Map类型
Map propertiesMap = null;
if (StringUtils.isNotBlank(pointDO.getProperties())) {
propertiesMap = (Map)
JSON.parse(pointDO.getProperties());
}
this.setProperties(propertiesMap);
Map libMap = null;
if (StringUtils.isNotBlank(pointDO.getLib())) {
libMap = (Map) 
JSON.parse(pointDO.getLib());
}
this.setLib(libMap);
}
}
,然后把DataStream转成了Hive临时表,最后写入Hive表,hive表定义如下:
"CREATE TABLE test.test(" +
"  type STRING," +
"  lib MAP," +
"  properties 
MAP" +
") PARTITIONED BY (" +
"  dt string" +
" ) stored as orcfile " +
" TBLPROPERTIES" +
" (" +

"'partition.time-extractor.kind'='custom'," +

"'partition.time-extractor.timestamp-pattern'='$dt'," +

"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+

"'sink.partition-commit.trigger'='partition-time'," +


Re: Disable WAL in RocksDB recovery

2020-09-21 Thread Juha Mynttinen
Good,

I opened this JIRA for the issue 
https://issues.apache.org/jira/browse/FLINK-19303. The discussion can be moved 
there.

Regards,
Juha

From: Yu Li 
Sent: Friday, September 18, 2020 3:58 PM
To: Juha Mynttinen 
Cc: user@flink.apache.org 
Subject: Re: Disable WAL in RocksDB recovery

Thanks for bringing this up Juha, and good catch.

We actually are disabling WAL for routine writes by default when using RocksDB 
and never encountered segment fault issues. However, from history in 
FLINK-8922, segment fault issue occurs during restore if WAL is disabled, so I 
guess the root cause lies in RocksDB batch write (org.rocksdb.WriteBatch). And 
IMHO this is a RocksDB bug (it should work well when WAL is disabled, no matter 
under single or batch write).

+1 for opening a new JIRA to figure the root cause out, fix it and disable WAL 
during restore by default (maybe checking the fixes around WriteBatch in later 
RocksDB versions could help locate the issue more quickly), and thanks for 
volunteering taking the efforts. I will follow up and help review if any 
findings / PR submission.

Best Regards,
Yu


On Wed, 16 Sep 2020 at 13:58, Juha Mynttinen 
mailto:juha.myntti...@king.com>> wrote:
Hello there,

I'd like to bring to discussion a previously discussed topic - disabling WAL in 
RocksDB recovery.

It's clear that WAL is not needed during the process, the reason being that the 
WAL is never read, so there's no need to write it.

AFAIK the last thing that was done with WAL during recovery is an attempt to 
remove it and later reverting that removal 
(https://issues.apache.org/jira/browse/FLINK-8922 
[issues.apache.org]).
 If I interpret the comments in the ticket correctly, what happened was that a) 
WAL was kept in the recovery, 2) it's unknown why removing WAL causes segfault.

What can be seen in the ticket is that having WAL causes a significant 
performance penalty. Thus, getting rid of WAL would be a very nice performance 
improvement. I think it'd be worth to creating a new JIRA ticket at least as a 
reminder that WAL should be removed?

I'm planning adding an experimental flag to remove WAL in the environment I'm 
using Flink and trying it out. If the flag is made configurable, WAL can always 
be re-enabled if removing it causes issues.

Thoughts?

Regards,
Juha



Re: [DKIM Failure] Re: flink-CDC client 一对多问题

2020-09-21 Thread Jark Wu
1. concat 是内置函数,可以直接用。
2. 内置函数中没有 group_concat,不过有个类似功能的 listagg
3. 内置函数,不需要额外引入包,可以直接使用。内置函数列表请查看官方文档 [1]
4. 没有所谓的 flink sql cdc client, 只有 flink sql client,cdc 是其支持的一个功能。
5. 不清楚你说的只支持部分 sql 操作是指哪些 sql 操作不支持? 具体支持的 sql 操作,请查看官网[2].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html

On Mon, 21 Sep 2020 at 14:59, Li,Qian(DXM,PB)  wrote:

> Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b)
>
> 是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了,
>
> 另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么?
>
>
> 在 2020/9/21 下午1:42,“Jark Wu” 写入:
>
> 你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下:
>
> select userId, collect(userBankTime)
> from (
>   select  userId, concat(userBankNo, '_', createTime) as userBankTime
>   from aa as a left join bb as b where a.userId=b.userId
> ) group by userId;
>
>
> Best,
> Jark
>
> On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB) 
> wrote:
>
> > 请问:
> >
> > 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系
> > 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
> >   表aa
> >   id, userId
> >   表 bb
> >   userId,userBankNo,createTime
> >
> > select  * from aa as a left join bb as b where a.userId=b.userId
> >
> > 谢谢!
> >
> >
>
>
>


Re: [DKIM Failure] Re: flink-CDC client 一对多问题

2020-09-21 Thread Li,Qian(DXM,PB)
Fink sql cdc client 好多sql函数都不支持呀,比如CONCAT(a,b,c) 或者 GROUP_CONCAT(a,b)

是我还需要引入什么包么?如果支持的话,上面说的这种格式就可以用GROUP_CONCAT 很好的解决了,

另外Fink sql cdc client只支持部分sql操作么?具体支持那些函数,有相关的官方文档么?


在 2020/9/21 下午1:42,“Jark Wu” 写入:

你可以通过 groupby collect 来将一对多的关系聚合起来,代码类似如下:

select userId, collect(userBankTime)
from (
  select  userId, concat(userBankNo, '_', createTime) as userBankTime
  from aa as a left join bb as b where a.userId=b.userId
) group by userId;


Best,
Jark

On Mon, 21 Sep 2020 at 12:20, Li,Qian(DXM,PB)  wrote:

> 请问:
>
> 我在使用Flink CDC SQL CLI的时候,想将关联的两张表的一对多关系
> 映射成ARRAY[ROW(userBankNo,createTime)]的形式,要怎么映射呢?
>   表aa
>   id, userId
>   表 bb
>   userId,userBankNo,createTime
>
> select  * from aa as a left join bb as b where a.userId=b.userId
>
> 谢谢!
>
>




Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 Thread chuyuan
 LEGACY('RAW', 'ANY')对应sql中数据类型改为:MAP,仍然报错,异常:
org.apache.flink.table.api.TableException: A raw type backed by type
information has no serializable string representation. It needs to be
resolved into a proper raw type.
方便说下具体实现细节吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-21 Thread Yu Li
Thanks Zhu Zhu for being our release manager and everyone else who made the
release possible!

Best Regards,
Yu


On Thu, 17 Sep 2020 at 13:29, Zhu Zhu  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/09/17/release-1.11.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Thanks,
> Zhu
>


Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-21 Thread Yu Li
Thanks Zhu Zhu for being our release manager and everyone else who made the
release possible!

Best Regards,
Yu


On Thu, 17 Sep 2020 at 13:29, Zhu Zhu  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/09/17/release-1.11.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Thanks,
> Zhu
>


线上环境出现:org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer,本地没有

2020-09-21 Thread Jeff
在本地IDEA里测试处理相同TOPIC正常,但在线上环境出现了这样的异常:
org.apache.kafka.common.serialization.StringSerializer is not an instance of 
org.apache.kafka.common.serialization.Serializer。将StringSerializer 换成 
ByteArraySerializer也是类似错误,不知道该如何解决该问题了。请问还有其它思路来解决这个问题吗? 
业务逻辑非常简单:从SOURCE表内过滤数据到sink表。
flink版本:1.11.1 kafka版本:2.1.0


SQL内KAFKA配置如下:
source:
create table ***
with (
'connector' = 'kafka',
'topic'='**',
'scan.startup.mode'='latest-offset',
'format'='json',
'properties.group.id' = '***',
'properties.bootstrap.servers'='***:9092',
'properties.enable.auto.commit'='true',
'properties.auto.commit.interval.ms'='1000',
'properties.key.deserializer'='org.apache.kafka.common.serialization.StringDeserializer',
'properties.value.deserializer'='org.apache.kafka.common.serialization.StringDeserializer',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"\" password=\"\";');


sink:
create table ***
with (
'connector' = 'kafka',
'topic'='***',
'scan.startup.mode'='latest-offset',
'format'='json',
'properties.bootstrap.servers'='*:9092',
'properties.max.poll.records'='50',
'properties.enable.auto.commit'='true',
'properties.auto.commit.interval.ms'='1000',
'properties.key.serializer'='org.apache.kafka.common.serialization.StringSerializer',
'properties.value.serializer'='org.apache.kafka.common.serialization.StringSerializer',
'properties.security.protocol'='SASL_PLAINTEXT',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"*\" password=\"**\";');







Re: flink-CDC client 一对多问题

2020-09-21 Thread silence
可以写一个group_array的udaf
select  *  from aa as a left join (
select userId,group_array(row(userId, userBankNo, userBankNo)) from bb
group by userId
) as b where a.userId=b.userId




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 任务提交中使用了hive的udf时的疑问

2020-09-21 Thread Rui Li
Hi,

这种模式目前还不支持,因为现在没有动态添加jar的功能,所以需要事先把依赖的jar准备好。

On Mon, Sep 21, 2020 at 9:47 AM Husky Zeng <568793...@qq.com> wrote:

> Hi all,
>
> 在提交的flink任务中使用了hive的udf时,任务的运行会依赖一些udf相关的jar包,配置文件等资源。
>
>
> 在我们的生产环境中,这些udf相关的jar包,配置文件都由hive的metaStore统一管理着,因此,flink在同hive交互时,是可以拿到这些文件的远程存储路径的(hdfs路径)。
>
>
> 我们希望flink任务在提交时能够只提交这些从hive中得到的文件路径,而不必传输这些文件(在flink外围去做这样一个查询hive然后下载文件的事情,在生产环境中相当于多了一个步骤,带来了不必要的风险,因此希望能够在flink任务运行时自动完成这些事情)。在这样的方案里,flink任务会在运行时根据路径从hdfs下载相关jar包和配置文件。
>
> 从代码里可以看到 ,FunctionInfo 这个对象里已经有了resources的相关路径。但是看上去并没有合理的利用它。
>
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java#L80
>
> 请问你们有没有什么办法,可以在提交任务时,不提交udf相关的资源文件?或者对于这样的方案,有没有开发计划?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


消费kafka source反压

2020-09-21 Thread smq
大家好,在测试flink消费速率时,发现数据处理比较慢,大概一个task每秒处理1000条左右,经过查看UI界面,发现读取kafka数据源这块source反压达到1,请问有这方面经验吗?

Problem with zookeeper and flink config

2020-09-21 Thread saksham sapra
I am trying to run multiple clusters in a Windows machine using zookeeper,
so isued this page to configure zookeeper and flink config.yaml.

https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html

but somehow getting these exceptions, also i have worked on hadoop earlier.

Thanks & Regards,
Saksham Sapra


flink-Saksham.Sapra-jobmanager.log
Description: Binary data


flink-Saksham.Sapra-taskmanager.log
Description: Binary data


Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-21 Thread Arvid Heise
Hi Lian,

we had a similar discussion on [1].

TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until
Hive bumps it [3]. In the thread, I gave some options to avoid running into
the issue.
The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4]
if your logical type is nullable (which is not necessary in most cases).

Still, I think it's time for us to revise the decision to wait for Hive to
bump and rather upgrade independently. Avro was for a long time stuck on
1.8 but the project gained traction again in the past two years. On the
other hand, Hive seems to be rather slow to respond to that and we
shouldn't have a slow moving component block us to support a fast moving
component if it's such apparent that users want it.
@Aljoscha Krettek  could you please pick that topic up
and ping the respective maintainers?

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
[2] https://issues.apache.org/jira/browse/FLINK-12532
[3] https://issues.apache.org/jira/browse/HIVE-21737
[4] https://issues.apache.org/jira/browse/AVRO-1891

On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang  wrote:

> Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
> trying ConfluentRegistryAvroDeserializationSchema (if this is what you
> mean) but got "java.lang.Long cannot be cast to java.time.Instant". This
> may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
>  Is there any progress
> for this JIRA? Thanks. Regards!
>
>
> Stacktrace:
> java.lang.ClassCastException: java.lang.Long cannot be cast to
> java.time.Instant
> at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
> at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
> at
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
> at
> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
> at
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
> at
> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
> at
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
> at
> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
> at
> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
>
>
> Code:
>
> import org.apache.avro.specific.SpecificRecord;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import 
> org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
> import 
> org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
> import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
> import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
> import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
> import org.apache.kafka.clients.consumer.ConsumerRecord;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import javax.annotation.Nullable;
> import java.io.Serializable;
>
> public class SpecificRecordSerDe implements
> KafkaSerializationSchema, KafkaContextAware, 
> KafkaDeserializationSchema, Serializable {
>
> private final Class tClass;
> private String topic; // for