Re: I call Pandas UDF N times, do I have to initiate the UDF N times?

2021-05-07 Thread Dian Fu
Hi Yik San,

Is it acceptable to rewrite the UDF a bit to accept multiple parameters and 
then rewrite the program as following:

```
SELECT
LABEL_ENCODE(a, b, c)
...
```

Regards,
Dian

> 2021年5月8日 上午11:56,Yik San Chan  写道:
> 
> Hi community,
> 
> I am using PyFlink and Pandas UDF in my job.
> 
> The job executes a SQL like this:
> 
> ```
> SELECT
> LABEL_ENCODE(a),
> LABEL_ENCODE(b),
> LABEL_ENCODE(c)
> ...
> ```
> 
> And my LABEL_ENCODE UDF is defined below:
> 
> ```
> class LabelEncode(ScalarFunction):
>   def open(self, function_context):
> logging.info ("LabelEncode.open")
> self.encoder = load_encoder()
>   def eval(self, x):
> ...
> 
> labelEncode = udf(LabelEncode(), ...)
> ```
> 
> When I run the job, according to taskmanger log, "LabelEncode.open" is 
> printed 3 times, which is exactly the times LABEL_ENCODE udf is called.
> 
> Since every LabelEncode.open causes an I/O (load_encoder() does so), I wonder 
> if I can only initiate the UDF once, and use it 3 times?
> 
> Thank you!
> 
> Best,
> Yik San



Re: How to recovery from last count when using CUMULATE window after restart flink-sql job?

2021-05-07 Thread Kurt Young
Hi, please use user mailing list only to discuss these issues.

Best,
Kurt


On Sat, May 8, 2021 at 1:05 PM 1095193...@qq.com <1095193...@qq.com> wrote:

> Hi
>I have tried cumalate window function in Flink-1.13 sql to accumulate
> data from Kafka. When I restart a cumulate window sql job,  last count
> state is not considered and the count state accumulates from 1. Any
> solutions can help recovery from last count state when restarting Flink-sql
> job?
> Thank you
> --
> 1095193...@qq.com
>


How to recovery from last count when using CUMULATE window after restart flink-sql job?

2021-05-07 Thread 1095193...@qq.com
Hi
   I have tried cumalate window function in Flink-1.13 sql to accumulate data 
from Kafka. When I restart a cumulate window sql job,  last count state is not 
considered and the count state accumulates from 1. Any solutions can help 
recovery from last count state when restarting Flink-sql job?
Thank you 


1095193...@qq.com


I call Pandas UDF N times, do I have to initiate the UDF N times?

2021-05-07 Thread Yik San Chan
Hi community,

I am using PyFlink and Pandas UDF in my job.

The job executes a SQL like this:

```
SELECT
LABEL_ENCODE(a),
LABEL_ENCODE(b),
LABEL_ENCODE(c)
...
```

And my LABEL_ENCODE UDF is defined below:

```
class LabelEncode(ScalarFunction):
  def open(self, function_context):
logging.info("LabelEncode.open")
self.encoder = load_encoder()
  def eval(self, x):
...

labelEncode = udf(LabelEncode(), ...)
```

When I run the job, according to taskmanger log, "LabelEncode.open" is
printed 3 times, which is exactly the times LABEL_ENCODE udf is called.

Since every LabelEncode.open causes an I/O (load_encoder() does so), I
wonder if I can only initiate the UDF once, and use it 3 times?

Thank you!

Best,
Yik San


flink on native kubernetes要如何修改Logging配置?

2021-05-07 Thread casel.chen
我用native kubernetes方式部署flink session 
cluster,想修改某个包下的日志级别,如果直接修改configmap下的log4j-console.properties再重新部署是能生效的,但是通过命令行
 (./bin/kubernetes-session.sh -Dkubernetes.cluster-id=xxx) 起flink session 
cluster会将之前的修改冲掉,有什么办法可以保留下之前的修改吗?是否有命令行启动参数可以指定自定义的logging配置?谢谢!

Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-07 Thread Yang Wang
Since your problem is about the flink-native-k8s-operator, let's move the
discussion there.

Best,
Yang

Fuyao Li  于2021年5月8日周六 上午5:41写道:

> Hi Austin, Yang, Matthias,
>
>
>
> I am following up to see if you guys have any idea regarding this problem.
>
>
>
> I also created an issue here to describe the problem. [1]
>
>
>
> After looking into the source code[1], I believe for native k8s, three
> configuration files should be added to the flink-config-
> configmap automatically. However, it just have the flink-conf.yaml in the
> operator created flink application. And that is also causing the start
> command difference mentioned in the issue.
>
>
>
> Native k8s using Flink CLI: Args:
>   native-k8s
>   $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 
> -Xms1073741824 -XX:MaxMetaspaceSize=268435456 
> -Dlog.file=/opt/flink/log/jobmanager.log 
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties 
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
> -D jobmanager.memory.off-heap.size=134217728b -D 
> jobmanager.memory.jvm-overhead.min=201326592b -D 
> jobmanager.memory.jvm-metaspace.size=268435456b -D 
> jobmanager.memory.heap.size=1073741824b -D 
> jobmanager.memory.jvm-overhead.max=201326592b
>
>
>
>
>
> Operator flink app:
>
> Args:
>   native-k8s
>   $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376
> -Xms3462817376 -XX:MaxMetaspaceSize=268435456
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
> -D jobmanager.memory.off-heap.size=134217728b -D
> jobmanager.memory.jvm-overhead.min=429496736b -D
> jobmanager.memory.jvm-metaspace.size=268435456b -D
> jobmanager.memory.heap.size=3462817376b -D
> jobmanager.memory.jvm-overhead.max=429496736b
>
>
>
> Please share your opinion on this. Thanks!
>
>
>
> [1] https://github.com/wangyang0918/flink-native-k8s-operator/issues/4
>
> [2]
> https://github.com/apache/flink/blob/release-1.12/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
>
>
>
> Have a good weekend!
>
> Best,
>
> Fuyao
>
>
>
>
>
> *From: *Fuyao Li 
> *Date: *Tuesday, May 4, 2021 at 19:52
> *To: *Austin Cawley-Edwards ,
> matth...@ververica.com , Yang Wang <
> danrtsey...@gmail.com>
> *Cc: *user 
> *Subject: *Re: [External] : Re: StopWithSavepoint() method doesn't work
> in Java based flink native k8s operator
>
> Hello All,
>
>
>
> I also checked the native-k8s’s automatically generated configmap. It only
> contains the flink-conf.yaml, but no log4j.properties. I am not very
> familiar with the implementation details behind native k8s.
>
>
>
> That should be the root cause, could you check the implementation and help
> me to locate the potential problem.
>
> Yang’s initial code:
> https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
> 
>
> My modified version:
> https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
> 
>
>
>
> Thank you so much.
>
>
>
> Best,
>
> Fuyao
>
>
>
> *From: *Fuyao Li 
> *Date: *Tuesday, May 4, 2021 at 19:34
> *To: *Austin Cawley-Edwards ,
> matth...@ververica.com , Yang Wang <
> danrtsey...@gmail.com>
> *Cc: *user 
> *Subject: *Re: [External] : Re: StopWithSavepoint() method doesn't work
> in Java based flink native k8s operator
>
> Hello Austin, Yang,
>
>
>
> For the logging issue, I think I have found something worth to notice.
>
>
>
> They are all based on base image flink:1.12.1-scala_2.11-java11
>
>
>
> Dockerfile: https://pastebin.ubuntu.com/p/JTsHygsTP6/
> 
>
>
>
> In the JM and TM provisioned by the k8s operator. There is only
> flink-conf.yaml in the $FLINK_HOME/conf directory. Even if I tried to add
> these configurations to the image in advance. It seems the operator is
> seems overriding it and removing all other log4j configurations. This is
> causing the logs can’t be printed correctly.
>
> 

What does enableObjectReuse exactly do?

2021-05-07 Thread 杨力
I wrote a streaming job with scala, using only immutable case class. Is it
safe to enable object reuse? Will it get benefits from enabling object
reuse?

I reached to documents but they cover neither streaming cases nor immutable
data structures.


Flinl CDC????????

2021-05-07 Thread ELVIS_SUE
Flink CDC??Unable to convert to LocalDateTime 
from unexpected value '2020-02-25T23:26:14Z' of type 
java.lang.String??MySQL??timestampFlink


??
flink-1.11.3,
flink-sql-connector-mysql-cdc-1.0.0.jar


Flink CDC??
CREATE TABLE byx_user_auth(
id BIGINT,
user_id BIGINT,
role TINYINT,
name STRING,
id_card_type TINYINT,
id_card STRING,
bank_card STRING,
phone STRING,
auth_type TINYINT,
remark STRING,
create_date TIMESTAMP,
create_user STRING,
update_date TIMESTAMP,
update_user STRING,
del_flag TINYINT,
proc_time AS PROCTIME() -- 
) WITH ('connector' = 'mysql-cdc',--??
'hostname' = 'XXX',--mysql
'port' = '3310',--mysql
'username' = 'XXX',--mysql??
'password' = 'XXX',--mysql
'database-name' = 'byx_user',--??
'table-name' = 'byx_user_auth'
);



MySQL??
CREATE TABLE `byx_user_auth` (
 `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '',
 `user_id` bigint(20) NOT NULL COMMENT '',
 `role` tinyint(4) NOT NULL COMMENT 
'??1-2-3-??4-??5-7-??',
 `name` varchar(64) NOT NULL COMMENT '',
 `id_card_type` tinyint(4) NOT NULL COMMENT '?? 
1-2-??3-??4-??5-',
 `id_card` varchar(32) NOT NULL COMMENT '',
 `bank_card` varchar(64) NOT NULL COMMENT '',
 `phone` varchar(11) DEFAULT NULL COMMENT '',
 `auth_type` tinyint(4) NOT NULL COMMENT '',
 `remark` varchar(255) DEFAULT NULL COMMENT '',
 `create_date` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 
'',
 `create_user` varchar(20) DEFAULT NULL COMMENT '??',
 `update_date` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP COMMENT '',
 `update_user` varchar(20) DEFAULT NULL COMMENT '??',
 `del_flag` tinyint(4) DEFAULT '0' COMMENT '??0-?? 1-',
 PRIMARY KEY (`id`),
 UNIQUE KEY `uk_four_factors` 
(`name`,`id_card_type`,`id_card`,`bank_card`,`phone`),
 KEY `idx_id_role` (`user_id`,`role`)
)

Re: How to increase the number of task managers?

2021-05-07 Thread Yik San Chan
Hi Yangze,

Thanks for the answer! That helps.

Best,
Yik San

On Sat, May 8, 2021 at 10:15 AM Yangze Guo  wrote:

> Hi,
>
> > I wonder if I can tune the number of task managers? Is there a
> corresponding config?
>
> With K8S/Yarn resource provider, the task managers are allocated on
> demand. So, the number of them are depends on the max parallelism and
> the slot sharing group topology of your job.
> In standalone mode, you need to config the "conf/workers" in your
> flink distribution to decide the number of task managers[3].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-and-stopping-a-cluster
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Fri, May 7, 2021 at 7:34 PM Tamir Sagi 
> wrote:
> >
> > Hey
> >
> > num of TMs = parallelism / num of slots
> >
> > parallelism.default is another config you should consider.
> >
> > Read also
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/parallel/
> >
> >
> > 
> > From: Yik San Chan 
> > Sent: Friday, May 7, 2021 1:56 PM
> > To: user 
> > Subject: How to increase the number of task managers?
> >
> >
> > EXTERNAL EMAIL
> >
> >
> >
> > Hi community,
> >
> > According to the [docs](
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> ):
> >
> > > taskmanager.numberOfTaskSlots: The number of slots that a TaskManager
> offers (default: 1). Each slot can take one task or pipeline. Having
> multiple slots in a TaskManager can help amortize certain constant
> overheads (of the JVM, application libraries, or network connections)
> across parallel tasks or pipelines. See the Task Slots and Resources
> concepts section for details.
> >
> > > Running more smaller TaskManagers with one slot each is a good
> starting point and leads to the best isolation between tasks. Dedicating
> the same resources to fewer larger TaskManagers with more slots can help to
> increase resource utilization, at the cost of weaker isolation between the
> tasks (more tasks share the same JVM).
> >
> > We're able to tune slot count by setting taskmanager.numberOfTaskSlots,
> that may help parallelize my task.
> >
> > I wonder if I can tune the number of task managers? Is there a
> corresponding config?
> >
> > Best,
> > Yik San
> >
> >
> > Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> > Viruses: Although we have taken steps toward ensuring that this e-mail
> and attachments are free from any virus, we advise that in keeping with
> good computing practice the recipient should ensure they are actually virus
> free.
>


Re: Flink写clickhouse怎么实现精准一次性

2021-05-07 Thread 张锴
clickhouse不支持事务及幂等写入,无法保证end to end 精准一次。

李一飞  于2021年5月7日周五 下午10:27写道:

> 请问   Flink写clickhouse怎么实现精准一次性,有啥好办法呀


退订

2021-05-07 Thread 薛旭旭
退订

Flink CDC 问题

2021-05-07 Thread lp
我最近正在研究flink
Connector相关的内容,官网:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/;又了解到Flink
CDC相关的概念:https://github.com/ververica/flink-cdc-connectors;请教一下flink
Connector和Flink CDC二者之间是什么样的关系?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Table name for table created fromDataStream

2021-05-07 Thread Leonard Xu


> 在 2021年5月8日,08:00,tbud  写道:
> 
> Hi Leonard,
> Yes that would be one solution. But why is it necessary to create a
> temporaryView from already created table ?


The name “Table” is quite misleading here,  the table API object Table actually 
represents a relational query (e.g. Table table = 
myTableObject.join(…).where(…).select(…)).
Thus we use createTemporaryView(String path, Table view)  rather than 
registerTable(String name, Table table)
Given a background that registerTable(String name, Table table) is old 
interface and has been deprecated because the reason I explained. This is a 
part of FLIP-64[1]

Hope I make this clear.

Best,
Leonard
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module




Re: How to increase the number of task managers?

2021-05-07 Thread Yangze Guo
Hi,

> I wonder if I can tune the number of task managers? Is there a corresponding 
> config?

With K8S/Yarn resource provider, the task managers are allocated on
demand. So, the number of them are depends on the max parallelism and
the slot sharing group topology of your job.
In standalone mode, you need to config the "conf/workers" in your
flink distribution to decide the number of task managers[3].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/
[3] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-and-stopping-a-cluster

Best,
Yangze Guo

Best,
Yangze Guo


On Fri, May 7, 2021 at 7:34 PM Tamir Sagi  wrote:
>
> Hey
>
> num of TMs = parallelism / num of slots
>
> parallelism.default is another config you should consider.
>
> Read also
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/parallel/
>
>
> 
> From: Yik San Chan 
> Sent: Friday, May 7, 2021 1:56 PM
> To: user 
> Subject: How to increase the number of task managers?
>
>
> EXTERNAL EMAIL
>
>
>
> Hi community,
>
> According to the 
> [docs](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/):
>
> > taskmanager.numberOfTaskSlots: The number of slots that a TaskManager 
> > offers (default: 1). Each slot can take one task or pipeline. Having 
> > multiple slots in a TaskManager can help amortize certain constant 
> > overheads (of the JVM, application libraries, or network connections) 
> > across parallel tasks or pipelines. See the Task Slots and Resources 
> > concepts section for details.
>
> > Running more smaller TaskManagers with one slot each is a good starting 
> > point and leads to the best isolation between tasks. Dedicating the same 
> > resources to fewer larger TaskManagers with more slots can help to increase 
> > resource utilization, at the cost of weaker isolation between the tasks 
> > (more tasks share the same JVM).
>
> We're able to tune slot count by setting taskmanager.numberOfTaskSlots, that 
> may help parallelize my task.
>
> I wonder if I can tune the number of task managers? Is there a corresponding 
> config?
>
> Best,
> Yik San
>
>
> Confidentiality: This communication and any attachments are intended for the 
> above-named persons only and may be confidential and/or legally privileged. 
> Any opinions expressed in this communication are not necessarily those of 
> NICE Actimize. If this communication has come to you in error you must take 
> no action based on it, nor must you copy or show it to anyone; please 
> delete/destroy and inform the sender by e-mail immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and 
> attachments are free from any virus, we advise that in keeping with good 
> computing practice the recipient should ensure they are actually virus free.


Re: Enabling Checkpointing using FsStatebackend

2021-05-07 Thread Yangze Guo
Hi,

I think the checkpointing is not the root cause of your job failure.
As the log describes, your job failed caused by the authorization
issue of Kafka. "Caused by:
org.apache.kafka.common.errors.TransactionalIdAuthorizationException:
Transactional Id authorization failed."

Best,
Yangze Guo

On Fri, May 7, 2021 at 11:29 PM sudhansu jena
 wrote:
>
> Hi Team,
>
> We have recently enabled checking pointing using FsStateBackend where we are 
> trying to use S3 bucket as the persistent storage but after enabling it we 
> are running into issues while submitting the job into the cluster.
>
> Can you please let us know if we are missing anything ?
>
>
> Below is the code sample  for enabling Checkpointing.
>
> env.setStateBackend(new FsStateBackend("s3://flinkcheckpointing/fhirmapper"));
> env.enableCheckpointing(1000);
>
>
>
> Below logs for the issue.
>
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, 
> backoffTimeMS=15000)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
> at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
> at jdk.internal.reflect.GeneratedMethodAccessor366.invoke(Unknown Source)
> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.kafka.common.errors.TransactionalIdAuthorizationException: 
> Transactional Id authorization failed.
>
>
> Thanks,
> Sudhansu
>
>


Re: flink job task在taskmanager上分布不均衡

2021-05-07 Thread wenyuan138
测试了下,这个参数确实有有效



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink job task在taskmanager上分布不均衡

2021-05-07 Thread wenyuan138
十分感谢黄潇
这个参数的功能描述看起来完全跟我的现象一致,今天我来修改尝试下




--
Sent from: http://apache-flink.147419.n8.nabble.com/

退订

2021-05-07 Thread 袁刚
退订

退订

2021-05-07 Thread 杨军
退订

Re: Table name for table created fromDataStream

2021-05-07 Thread tbud
Hi Leonard,
Yes that would be one solution. But why is it necessary to create a
temporaryView from already created table ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question regarding cpu limit config in Flink standalone mode

2021-05-07 Thread Fan Xie
Hi Xintong,

Thanks for answering my question! After discussing with my teammates, we decide 
to rely on k8s pod and an external control plane to restrict the CPU usage of a 
job.

Thanks again for your help!

Best,
Fan

From: Xintong Song 
Sent: Thursday, May 6, 2021 8:39 PM
To: user@flink.apache.org 
Subject: Re: Question regarding cpu limit config in Flink standalone mode

Hi Fan,

For a java application, you cannot specify how many cpu a process should use. 
The JVM process will always try to use as much cpu time as it needs. The 
limitation can only come from external: hardware limit, OS scheduling, cgroups, 
etc.

On Kubernetes, it is the pod's resource specifications that decide how many cpu 
resources a Flink JM/TM can use.
- For the standalone kubernetes deployment, you can specify the pods' resources 
in your yaml files.
- For the native kubernetes deployment, TM pods are requested by Flink's 
ResourceManager. Thus, the configuration option `kubernets.taskmanager.cpu` 
controls the cpu resource of pods Flink requests from Kubernetes.


Thank you~

Xintong Song


On Fri, May 7, 2021 at 10:35 AM Fan Xie 
mailto:f...@linkedin.com>> wrote:
Hi Flink Community,

Recently I am working on an auto-scaling project that needs to dynamically 
adjust the cpu config of Flink standalone jobs . Our jobs will be running on 
standalone mode in a k8s cluster. After going through the configuration doc: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/,
 I can't find a config that can directly control the cpu of a standalone flink 
job. I can only see kubernetes.taskmanager.cpu, but looks like this config is 
only useful in native k8s mode. I also notice another config: 
taskmanager.numberOfTaskSlots that can control the cpu config in an indirect 
way. Is there any reason why we can't config the cpu for a standalone job 
directly?

Thanks for answering my question.

Best,
Fan



Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-07 Thread Fuyao Li
Hi Austin, Yang, Matthias,

I am following up to see if you guys have any idea regarding this problem.

I also created an issue here to describe the problem. [1]

After looking into the source code[1], I believe for native k8s, three 
configuration files should be added to the flink-config- configmap 
automatically. However, it just have the flink-conf.yaml in the operator 
created flink application. And that is also causing the start command 
difference mentioned in the issue.


Native k8s using Flink CLI: Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 
-Xms1073741824 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/opt/flink/log/jobmanager.log 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=201326592b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=1073741824b -D 
jobmanager.memory.jvm-overhead.max=201326592b


Operator flink app:
Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376 
-Xms3462817376 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=429496736b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=3462817376b -D 
jobmanager.memory.jvm-overhead.max=429496736b

Please share your opinion on this. Thanks!

[1] https://github.com/wangyang0918/flink-native-k8s-operator/issues/4
[2] 
https://github.com/apache/flink/blob/release-1.12/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java

Have a good weekend!
Best,
Fuyao


From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 19:52
To: Austin Cawley-Edwards , matth...@ververica.com 
, Yang Wang 
Cc: user 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello All,

I also checked the native-k8s’s automatically generated configmap. It only 
contains the flink-conf.yaml, but no log4j.properties. I am not very familiar 
with the implementation details behind native k8s.

That should be the root cause, could you check the implementation and help me 
to locate the potential problem.
Yang’s initial code: 
https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
My modified version: 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java

Thank you so much.

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 19:34
To: Austin Cawley-Edwards , matth...@ververica.com 
, Yang Wang 
Cc: user 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello Austin, Yang,

For the logging issue, I think I have found something worth to notice.

They are all based on base image flink:1.12.1-scala_2.11-java11

Dockerfile: 
https://pastebin.ubuntu.com/p/JTsHygsTP6/

In the JM and TM provisioned by the k8s operator. There is only flink-conf.yaml 
in the $FLINK_HOME/conf directory. Even if I tried to add these configurations 
to the image in advance. It seems the operator is seems overriding it and 
removing all other log4j configurations. This is causing the logs can’t be 
printed correctly.
root@flink-demo-5fc78c8cf-hgvcj:/opt/flink/conf# ls
flink-conf.yaml


However, for the pods that is provisioned by flink native k8s CLI. There exists 
some log4j related configurations.

root@test-application-cluster-79c7f9dcf7-44bq8:/opt/flink/conf# ls
flink-conf.yaml  log4j-console.properties  logback-console.xml


The native Kubernetes operator pod can print logs correctly because it has the 
log4j.properties file mounted to the opt/flink/conf/ directory. [1]
For the Flink pods, it seems 

Re: Questions about implementing a flink source

2021-05-07 Thread Evan Palmer
Hi Arvid, thank you so much for the detailed reply!

A few replies / questions inline. Somewhat relatedly, I'm also wondering
where this connector should live. I saw that there's already a pubsub
connector in
https://github.com/apache/flink/tree/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub,
so if flink is willing to host it, perhaps it could live near there?
Alternatively, it could live alongside our client library in
https://github.com/googleapis/java-pubsublite.

On Mon, May 3, 2021 at 1:54 PM Arvid Heise  wrote:

> Hi Evan,
>
> 1) You are absolutely correct that we would urge users to add new sources
> as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
> For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource
> [2] as a starting point. Especially basing the reader implementation on
> SingleThreadMultiplexSourceReaderBase will give you some performance boost
> over naive implementations.
> It is probably initially overwhelming but there is lots of thought behind
> the Source interface. We plan on having better documentation and more
> examples in the next months to ease the ramp up but it's also kind of a
> hen-egg problem.
>

Okay, great, the Source interface seems much easier to work with. I haven't
gotten around to thinking about our Sink yet, but I'm sure I'll have some
questions when I do :)

I read through SourceReaderBase and SingleThreadMultiplexSourceReaderBase.
It seems like these base implementations are mostly designed to help in
cases where the client library uses a synchronous pull based approach. Our
client library is async - we use a bidirectional stream to pull
messages from our brokers and we have some flow control settings to limit
the number of bytes and messages outstanding to the client. I'm wondering
if because of this, we should just implement the SourceReader interface. In
particular, we have a per partition subscriber class which buffers messages
up to the flow control limit and exposes an API almost identical to
SourceReader's pollNext and IsAvailable. What do you think?

>
> I can also provide guidance outside of the ML if it's easier.
>
> 2) You are right, the currentParallelism is static in respect to the
> creation of the SourceReaders. Any change to the parallelism would also
> cause a recreation of the readers.
> Splits are usually checkpointed alongside the readers. On recovery, the
> readers are restored with their old splits. Only when splits cannot be
> recovered in the context of a reader (for example downscaling), the splits
> would be re-added to the enumerator.
>

> Rebalancing can happen in SplitEnumerator#addReader or
> #handleSplitRequest. The Kafka and File source use even different
> approaches with eager and lazy initialization respectively. Further, you
> can send arbitrary events between the enumerator and readers to work out
> the rebalancing. In theory, you can also dynamically rebalance splits,
> however, you lose ordering guarantees of the messages at the moment (if you
> have records r1, r2 in this order in split s and you reassign s, then you
> may end up with r2, r1 in the sink).
>

Ah, okay, this helped a lot. I'd missed that rebalancing dynamically would
break ordering guarantees, so when I read through the Kafka source, I was
really confused by the lack of rebalancing.

>
> [1]
> https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75-L75
> [2]
> https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java#L99-L99
>
> On Mon, May 3, 2021 at 1:40 AM Evan Palmer  wrote:
>
>> Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite
>> which is a partition based Pub/Sub product, and I have a few questions.
>>
>> 1.
>>
>> I saw that there are two sets of interfaces used in existing sources: The
>> RichSourceFunction, and the set of interfaces from FLIP-27. It seems like
>> the Source interfaces are preferred for new sources, but I wanted to be
>> sure.
>>
>> 2.
>>
>> I’m having a little bit of trouble working out how when the
>> currentParallelism returned by the SplitEnumeratorContext [1] can change,
>> and how a source should react to that.
>>
>> For context, I’m currently thinking about single partitions as “splits”,
>> so a source would have an approximately constant number of splits which
>> each has an potentially unbounded amount of work (at least in continuous
>> mode). Each split will be assigned to some SourceReader by the split
>> enumerator. If the value of currentParallelism changes, it seems like I’ll
>> need to find a way to redistribute my partitions over SourceReaders, or
>> else I'll end up with an unbalanced distribution of 

Savepoint/checkpoint confusion

2021-05-07 Thread Igor Basov
Hello,
I got confused about usage of savepoints and checkpoints in different
scenarios.
I understand that checkpoints' main purpose is fault tolerance, they are
more lightweight and don't support changing job graph, parallelism or state
backend when restoring from them, as mentioned in the latest 1.13 docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#difference-to-savepoints

At the same time:
1) Reactive scaling mode (in 1.13) uses checkpoints exactly for that -
rescaling.
2) There are use cases like here:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-happens-when-a-job-is-rescaled-td39462.html
where people seem to be using retained checkpoints instead of savepoints to
do manual job restarts with rescaling.
3) There are claims like here:
https://lists.apache.org/thread.html/4299518f4da2810aa88fe6b21f841880b619f3f8ac264084a318c034%40%3Cuser.flink.apache.org%3E
that in HA setup JobManager is able to restart from a checkpoint even if
operators are added/removed or parallelism is changed (in this case I'm not
sure if the checkpoints used by HA JM in `high-availability.storageDir` is
the same thing as usual checkpoints).

So I guess the questions are:
1) Can retained checkpoints be safely used for manual restarting and
rescaling a job?
2) Are checkpoints made by HA JM structurally different from the usual
ones? Can they be used to restore a job with a changed job graph?

Thank you,
Igor


how to split a column value into multiple rows in flink sql?

2021-05-07 Thread 1095193...@qq.com
Hi
  For example , a table like this:
   A B  C
  --
   a1   b1c1,c2,c3
  ---
  how to split c1,c2,c3 into multiple rows like this in flink sql function:
A  B   C

a1  b1  c1
a1  b1  c2
a1  b1  c3 Thank you



1095193...@qq.com


Viewing the offsets stored in a Savepoint

2021-05-07 Thread Zachary Manno
Hello,
Someone else asked this question on Stackoverflow and we would also find it
very helpful:
https://stackoverflow.com/questions/66256168/querying-kafka-offsets-from-flink-savepoint

Is there a way to check the external savepoint data for what Kafka offset
it is going to resume from? We produced Kafka records, took a savepoint,
deployed from that savepoint, and found that some earlier records were
replayed when that was not our intent.

Aside from this issue, just for general monitoring we would really like to
know "savepoint taken at 1:00PM, offsets = x,y,z"

Thanks for any help,
Zach

__



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Re: Task Local Recovery with mountable disks in the cloud

2021-05-07 Thread Sonam Mandal
Hi Till,

Thanks for getting back to me. Apologies for my delayed response.

Thanks for confirming that the slot ID (Allocation ID) is indeed necessary 
today for task local recovery to kick in, and thanks for your insights on how 
to make this work.

We are interested in exploring this disaggregation between local state storage 
and slots to allow potential reuse of local state even when TMs go down.

I'm planning to spend some time exploring the Flink code around local recovery 
and state persistence. I'm still new to Flink, so any guidance will be helpful. 
I think both of your ideas on how to make this happen are interesting and worth 
exploring. What's the procedure to collaborate or get guidance on this feature? 
Will a FLIP be required, or will opening a ticket do?

Thanks,
Sonam

From: Till Rohrmann 
Sent: Monday, April 26, 2021 10:24 AM
To: dev 
Cc: user@flink.apache.org ; Sonam Mandal 

Subject: Re: Task Local Recovery with mountable disks in the cloud

Hi Sonam,

sorry for the late reply. We were a bit caught in the midst of the feature 
freeze for the next major Flink release.

In general, I think it is a very good idea to disaggregate the local state 
storage to make it reusable across TaskManager failures. However, it is also 
not trivial to do.

Maybe let me first describe how the current task local recovery works and then 
see how we could improve it:

Flink creates for every slot allocation an AllocationID. The AllocationID 
associates a slot on a TaskExecutor with a job and is also used for scoping the 
lifetime of a slot wrt a job (theoretically, one and the same slot could be 
used to fulfill multiple slot requests of the same job if the slot allocation 
is freed in between). Note that the AllocationID is a random ID and, thus, 
changes whenever the ResourceManager allocates a new slot on a TaskExecutor for 
a job.

Task local recovery is effectively a state cache which is associated with an 
AllocationID. So for every checkpoint and every task, a TaskExecutor copies the 
state data and stores them in the task local recovery cache. The cache is 
maintained as long as the slot allocation is valid (e.g. the slot has not been 
freed by the JobMaster and the slot has not timed out). This makes the 
lifecycle management of the state data quite easy and makes sure that a process 
does not clutter local disks. On the JobMaster side, Flink remembers for every 
Execution, where it is deployed (it remembers the AllocationID). If a failover 
happens, then Flink tries to re-deploy the Executions into the slots they were 
running in before by matching the AllocationIDs.

The reason why we scoped the state cache to an AllocationID was for simplicity 
and because we couldn't guarantee that a failed TaskExecutor X will be 
restarted on the same machine again and thereby having access to the same local 
disk as before. That's also why Flink deletes the cache directory when a slot 
is freed or when the TaskExecutor is shut down gracefully.

With persistent volumes this changes and we can make the TaskExecutors 
"stateful" in the sense that we can reuse an already occupied cache. One rather 
simple idea could be to also persist the slot allocations of a TaskExecutor 
(which slot is allocated and what is its assigned AllocationID). This 
information could be used to re-initialize the TaskExecutor upon restart. That 
way, it does not have to register at the ResourceManager and wait for new slot 
allocations but could directly start offering its slots to the jobs it 
remembered. If the TaskExecutor cannot find the JobMasters for the respective 
jobs, it would then free the slots and clear the cache accordingly.

This could work as long as the ResourceManager does not start new TaskExecutors 
whose slots could be used to recover the job. If this is a problem, then one 
needs to answer the question how long to wait for the old TaskExecutors to come 
back and reusing their local state vs. starting quickly a fresh instance but 
having to restore state remotely.

An alternative solution proposal which is probably more powerful albeit also 
more complex would be to make the cache information explicit when registering 
the TaskExecutor at the ResourceManager and later offering slots to the 
JobMaster. For example, the TaskExecutor could tell the ResourceManager which 
states it has locally cached (it probably needs to contain key group ranges for 
every stored state) and this information could be used to decide from which 
TaskExecutor to allocate slots for a job. Similarly on the JobMaster side we 
could use this information to calculate the best mapping between Executions and 
slots. I think that mechanism could better deal with rescaling events where 
there is no perfect match between Executions and slots because of the changed 
key group ranges.

So to answer your question: There is currently no way to preserve AllocationIDs 
across restarts. However, we could use the persistent 

Enabling Checkpointing using FsStatebackend

2021-05-07 Thread sudhansu jena
Hi Team,

We have recently enabled checking pointing using FsStateBackend where we
are trying to use S3 bucket as the persistent storage but after enabling it
we are running into issues while submitting the job into the cluster.

Can you please let us know if we are missing anything ?


Below is the code sample  for enabling Checkpointing.


*env.setStateBackend(new
FsStateBackend("s3://flinkcheckpointing/fhirmapper"));env.enableCheckpointing(1000);*



Below logs for the issue.


org.apache.flink.runtime.JobException: Recovery is suppressed by
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10,
backoffTimeMS=15000)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at jdk.internal.reflect.GeneratedMethodAccessor366.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by:
org.apache.kafka.common.errors.TransactionalIdAuthorizationException:
Transactional Id authorization failed.


Thanks,
Sudhansu


Iceberg Upsert: Iceberg 通过Kafka一次性插入多条主键值相同的数据,数据查询不了

2021-05-07 Thread xwmr
Iceberg upsert:

当iceberg中同一批插入多条主键值相同的数据的时候,通过flink sql进行查询,数据查询不到,并且会报如下的错误:

java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2

这个报错的意思是:原本的数据是两列,表的schema也是两列,但是现在schema已经变成了三列了

大家有见过这个报错或问题么?

但是当主键不同时候,同一批插入多条数据,是可以进行正常查询的,并且也是可以upsert的



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Iceberg Upsert: Iceberg 通过Kafka一次性插入多条主键值相同的数据,数据查询不了

2021-05-07 Thread xwmr
Iceberg upsert:

当iceberg中同一批插入主键相同的多条数据,flink sql 查询不出来,会报错。报错如下:
java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2

这个错误的意思就是,我插入的数据是两列,但是table schema已经是三列了。
大家知道这是什么问题么?

当主键不同的时候,同一批插入多条数据都是可以正常插入,并且也是可以upsert的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink写clickhouse怎么实现精准一次性

2021-05-07 Thread 李一飞
请问   Flink写clickhouse怎么实现精准一次性,有啥好办法呀

Re: Flink :Exception in thread "main" java.lang.NoSuchMethodError: scala.Function1.$init$(Lscala/Function1;)V

2021-05-07 Thread Chesnay Schepler

This is where the 2.21 dependency comes from:
[INFO] +- io.confluent:kafka-schema-registry:jar:5.5.1:compile
[INFO] |  +- org.apache.kafka:kafka_2.12:jar:5.5.1-ccs:compile

This is the entry added by your dependency:
[INFO] +- org.apache.kafka:kafka_2.11:jar:0.10.2.1:compile

On 5/7/2021 3:15 PM, Ragini Manjaiah wrote:

hi ,
true.. but iam using where scala.version is 2.11 . wondering from 
where this 2.12 is added


 org.apache.kafka kafka_${scala.version} 
${kafka.version} 

On Fri, May 7, 2021 at 6:24 PM Chesnay Schepler > wrote:


I see a several scala 2.12 dependencies in there.

[INFO] |  +- org.apache.kafka:kafka_2.12:jar:5.5.1-ccs:compile

[INFO] +- io.confluent:kafka-schema-registry:jar:5.5.1:compile
[INFO] |  |  \-
com.kjetland:mbknor-jackson-jsonschema_2.12:jar:1.0.39:compile

On 5/7/2021 2:47 PM, Ragini Manjaiah wrote:

Hi ,
Please find the details


[INFO] X:XXSNAPSHOT
[INFO] +- org.apache.flink:flink-java:jar:1.11.3:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.11.3:compile
[INFO] |  |  +- org.apache.flink:flink-annotations:jar:1.11.3:compile
[INFO] |  |  +-
org.apache.flink:flink-metrics-core:jar:1.11.3:compile
[INFO] |  |  +-
org.apache.flink:flink-shaded-asm-7:jar:7.1-11.0:compile
[INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
[INFO] |  |     \- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] |  +- org.apache.commons:commons-math3:jar:3.5:compile
[INFO] |  +- org.slf4j:slf4j-api:jar:1.7.15:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  \- org.apache.flink:force-shading:jar:1.11.3:compile
[INFO] +-
org.apache.flink:flink-statebackend-rocksdb_2.11:jar:1.11.3:compile
[INFO] |  \-
com.data-artisans:frocksdbjni:jar:5.17.2-artisans-2.0:compile
[INFO] +- org.apache.flink:flink-clients_2.11:jar:1.11.3:compile
[INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.11.3:compile
[INFO] |  +- org.apache.flink:flink-optimizer_2.11:jar:1.11.3:compile
[INFO] |  \- commons-cli:commons-cli:jar:1.3.1:compile
[INFO] +- io.confluent:kafka-avro-serializer:jar:5.5.1:compile
[INFO] |  +- org.apache.avro:avro:jar:1.9.2:compile
[INFO] |  |  \-
com.fasterxml.jackson.core:jackson-core:jar:2.10.2:compile
[INFO] |  +-
io.confluent:kafka-schema-registry-client:jar:5.5.1:compile
[INFO] |  |  +- javax.ws.rs:javax.ws.rs-api:jar:2.1.1:compile
[INFO] |  |  \-
org.glassfish.jersey.core:jersey-common:jar:2.30:compile
[INFO] |  |     +-
jakarta.annotation:jakarta.annotation-api:jar:1.3.5:compile
[INFO] |  |     \-
org.glassfish.hk2:osgi-resource-locator:jar:1.0.3:compile
[INFO] |  +- io.confluent:common-config:jar:5.5.1:compile
[INFO] |  \- io.confluent:common-utils:jar:5.5.1:compile
[INFO] +- io.confluent:kafka-schema-serializer:jar:5.5.1:compile
[INFO] +- io.confluent:kafka-schema-registry:jar:5.5.1:compile
[INFO] |  +-
io.confluent:kafka-json-schema-provider:jar:5.5.1:compile
[INFO] |  |  +-
com.github.everit-org.json-schema:org.everit.json.schema:jar:1.12.1:compile
[INFO] |  |  |  +-
commons-validator:commons-validator:jar:1.6:compile
[INFO] |  |  |  +-
com.damnhandy:handy-uri-templates:jar:2.1.8:compile
[INFO] |  |  |  \- com.google.re2j:re2j:jar:1.3:compile
[INFO] |  |  +-
com.fasterxml.jackson.datatype:jackson-datatype-guava:jar:2.10.2:compile
[INFO] |  |  +-
com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.10.2:compile
[INFO] |  |  +-
com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.10.2:compile
[INFO] |  |  +-
com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.10.2:compile
[INFO] |  |  +-

com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.10.2:compile
[INFO] |  |  \-
com.kjetland:mbknor-jackson-jsonschema_2.12:jar:1.0.39:compile
[INFO] |  |     +-
org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable:jar:1.3.50:compile
[INFO] |  |     |  +-

org.jetbrains.kotlin:kotlin-scripting-compiler-impl-embeddable:jar:1.3.50:runtime
[INFO] |  |     |  |  +-
org.jetbrains.kotlin:kotlin-scripting-common:jar:1.3.50:runtime
[INFO] |  |     |  |  |  \-
org.jetbrains.kotlin:kotlin-reflect:jar:1.3.50:runtime
[INFO] |  |     |  |  +-
org.jetbrains.kotlin:kotlin-scripting-jvm:jar:1.3.50:runtime
[INFO] |  |     |  |  |  \-
org.jetbrains.kotlin:kotlin-script-runtime:jar:1.3.50:runtime
[INFO] |  |     |  |  \-
org.jetbrains.kotlinx:kotlinx-coroutines-core:jar:1.1.1:runtime
[INFO] |  |     |  \-
org.jetbrains.kotlin:kotlin-stdlib:jar:1.3.50:compile
[INFO] |  |     |     \- org.jetbrains:annotations:jar:13.0:compile
[INFO] |  |     \- 

Re: FlieSystem Connector's Success File Was Submitted Late

2021-05-07 Thread Leonard Xu
Hi, forideal

I also encountered this problem and opened an issue[1], you can have a look.

Best,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-22472



> 在 2021年5月7日,20:31,forideal  写道:
> 
> I found the reason:
> 
>Late data processing: The record will be written into its partition when a 
> record is supposed to be written into a partition that has already been 
> committed, and then the committing of this partition will be triggered again.
> So, I see that the success file is slower to update the file.
> 
> Best,
> Forideal
> 
> At 2021-05-07 19:41:45, "forideal"  wrote:
> 
> Hi My friends:
> I use FlieSystem in Flink SQL, and I found that my success file was 
> submitted late, probably dozens of minutes late.
> Here I provide some information:
> 1.Flink version is 1.11.1.
> 2.Source DDL
>create table test (
>   `timestamp bigint`,
>event_time as _timestamp(timestamp),
>  WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE
>)...
>   3.Sink DDL
>create table sinkTest( 
> xxx
> dtm VARCHAR,
> hh VARCHAR
>   ) PARTITIONED BY (dtm, hh) 
>with(
> 'connector' = 'filesystem',
> 'format' = 'parquet',
> 'parquet.compression' = 'SNAPPY',
> 'sink.rolling-policy.file-size' = '512MB',
> 'sink.rolling-policy.check-interval' = '10 min',
> 'sink.partition-commit.trigger' = 'partition-time',
> 'sink.partition-commit.delay' = '1 h',
> 'sink.partition-commit.policy.kind' = 'success-file',
> 'sink.file-suffix' = '.parquet',
> 'partition.time-extractor.timestamp-pattern' = '$dtm $hh:00:00'
>)
> 
>4.The interval for task submission checkpoint is 5 minutes, and the 
> checkpoints are all successful.
> 
>I think that if my task is not delayed, then our success file will be 
> submitted in about 10 minutes every hour, but the fact is that it is 
> submitted very late.
>Here are some source codes about submitting success file. When the 
> watermark is greater than the current partition time + delay time, I can 
> submit the success file.
> public List committablePartitions(long checkpointId) {
>if (!watermarks.containsKey(checkpointId)) {
>   throw new IllegalArgumentException(String.format(
> "Checkpoint(%d) has not been snapshot. The watermark information 
> is: %s.",
> checkpointId, watermarks));
>}
> 
>long watermark = watermarks.get(checkpointId);
>watermarks.headMap(checkpointId, true).clear();
> 
>List needCommit = new ArrayList<>();
>Iterator iter = pendingPartitions.iterator();
>while (iter.hasNext()) {
>   String partition = iter.next();
>   LocalDateTime partTime = extractor.extract(
> partitionKeys, extractPartitionValues(new Path(partition)));
>   if (watermark > toMills(partTime) + commitDelay) {
>  needCommit.add(partition);
>  iter.remove();
>   }
>}
>return needCommit;
> }
> Best,
> Forideal
> 
> 
>  
> 
> 
>  



Re: Flink :Exception in thread "main" java.lang.NoSuchMethodError: scala.Function1.$init$(Lscala/Function1;)V

2021-05-07 Thread Ragini Manjaiah
hi ,
true.. but iam using where scala.version is 2.11 . wondering from where
this 2.12 is added


   org.apache.kafka
   kafka_${scala.version}
   ${kafka.version}



On Fri, May 7, 2021 at 6:24 PM Chesnay Schepler  wrote:

> I see a several scala 2.12 dependencies in there.
>
> [INFO] |  +- org.apache.kafka:kafka_2.12:jar:5.5.1-ccs:compile
>
> [INFO] +- io.confluent:kafka-schema-registry:jar:5.5.1:compile
> [INFO] |  |  \-
> com.kjetland:mbknor-jackson-jsonschema_2.12:jar:1.0.39:compile
>
> On 5/7/2021 2:47 PM, Ragini Manjaiah wrote:
>
> Hi ,
> Please find the details
>
>
> [INFO] X:XXSNAPSHOT
> [INFO] +- org.apache.flink:flink-java:jar:1.11.3:compile
> [INFO] |  +- org.apache.flink:flink-core:jar:1.11.3:compile
> [INFO] |  |  +- org.apache.flink:flink-annotations:jar:1.11.3:compile
> [INFO] |  |  +- org.apache.flink:flink-metrics-core:jar:1.11.3:compile
> [INFO] |  |  +- org.apache.flink:flink-shaded-asm-7:jar:7.1-11.0:compile
> [INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
> [INFO] |  | \- com.esotericsoftware.minlog:minlog:jar:1.2:compile
> [INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
> [INFO] |  +- org.apache.commons:commons-math3:jar:3.5:compile
> [INFO] |  +- org.slf4j:slf4j-api:jar:1.7.15:compile
> [INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [INFO] |  \- org.apache.flink:force-shading:jar:1.11.3:compile
> [INFO] +-
> org.apache.flink:flink-statebackend-rocksdb_2.11:jar:1.11.3:compile
> [INFO] |  \- com.data-artisans:frocksdbjni:jar:5.17.2-artisans-2.0:compile
> [INFO] +- org.apache.flink:flink-clients_2.11:jar:1.11.3:compile
> [INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.11.3:compile
> [INFO] |  +- org.apache.flink:flink-optimizer_2.11:jar:1.11.3:compile
> [INFO] |  \- commons-cli:commons-cli:jar:1.3.1:compile
> [INFO] +- io.confluent:kafka-avro-serializer:jar:5.5.1:compile
> [INFO] |  +- org.apache.avro:avro:jar:1.9.2:compile
> [INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.10.2:compile
> [INFO] |  +- io.confluent:kafka-schema-registry-client:jar:5.5.1:compile
> [INFO] |  |  +- javax.ws.rs:javax.ws.rs-api:jar:2.1.1:compile
> [INFO] |  |  \- org.glassfish.jersey.core:jersey-common:jar:2.30:compile
> [INFO] |  | +-
> jakarta.annotation:jakarta.annotation-api:jar:1.3.5:compile
> [INFO] |  | \-
> org.glassfish.hk2:osgi-resource-locator:jar:1.0.3:compile
> [INFO] |  +- io.confluent:common-config:jar:5.5.1:compile
> [INFO] |  \- io.confluent:common-utils:jar:5.5.1:compile
> [INFO] +- io.confluent:kafka-schema-serializer:jar:5.5.1:compile
> [INFO] +- io.confluent:kafka-schema-registry:jar:5.5.1:compile
> [INFO] |  +- io.confluent:kafka-json-schema-provider:jar:5.5.1:compile
> [INFO] |  |  +-
> com.github.everit-org.json-schema:org.everit.json.schema:jar:1.12.1:compile
> [INFO] |  |  |  +- commons-validator:commons-validator:jar:1.6:compile
> [INFO] |  |  |  +- com.damnhandy:handy-uri-templates:jar:2.1.8:compile
> [INFO] |  |  |  \- com.google.re2j:re2j:jar:1.3:compile
> [INFO] |  |  +-
> com.fasterxml.jackson.datatype:jackson-datatype-guava:jar:2.10.2:compile
> [INFO] |  |  +-
> com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.10.2:compile
> [INFO] |  |  +-
> com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.10.2:compile
> [INFO] |  |  +-
> com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.10.2:compile
> [INFO] |  |  +-
> com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.10.2:compile
> [INFO] |  |  \-
> com.kjetland:mbknor-jackson-jsonschema_2.12:jar:1.0.39:compile
> [INFO] |  | +-
> org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable:jar:1.3.50:compile
> [INFO] |  | |  +-
> org.jetbrains.kotlin:kotlin-scripting-compiler-impl-embeddable:jar:1.3.50:runtime
> [INFO] |  | |  |  +-
> org.jetbrains.kotlin:kotlin-scripting-common:jar:1.3.50:runtime
> [INFO] |  | |  |  |  \-
> org.jetbrains.kotlin:kotlin-reflect:jar:1.3.50:runtime
> [INFO] |  | |  |  +-
> org.jetbrains.kotlin:kotlin-scripting-jvm:jar:1.3.50:runtime
> [INFO] |  | |  |  |  \-
> org.jetbrains.kotlin:kotlin-script-runtime:jar:1.3.50:runtime
> [INFO] |  | |  |  \-
> org.jetbrains.kotlinx:kotlinx-coroutines-core:jar:1.1.1:runtime
> [INFO] |  | |  \- org.jetbrains.kotlin:kotlin-stdlib:jar:1.3.50:compile
> [INFO] |  | | \- org.jetbrains:annotations:jar:13.0:compile
> [INFO] |  | \- io.github.classgraph:classgraph:jar:4.8.21:compile
> [INFO] |  +- io.confluent:kafka-protobuf-provider:jar:5.5.1:compile
> [INFO] |  |  +- com.squareup.wire:wire-schema:jar:3.2.2:compile
> [INFO] |  |  |  +-
> org.jetbrains.kotlin:kotlin-stdlib-jdk8:jar:1.3.71:compile
> [INFO] |  |  |  |  \-
> org.jetbrains.kotlin:kotlin-stdlib-jdk7:jar:1.3.71:compile
> [INFO] |  |  |  +-
> org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.3.71:compile
> [INFO] |  |  |  +- com.squareup.wire:wire-runtime:jar:3.2.2:compile
> [INFO] |  |  |  \- com.squareup.okio:okio:jar:2.5.0:compile
> [INFO] |  | 

Re: Flink :Exception in thread "main" java.lang.NoSuchMethodError: scala.Function1.$init$(Lscala/Function1;)V

2021-05-07 Thread Chesnay Schepler

I see a several scala 2.12 dependencies in there.

[INFO] |  +- org.apache.kafka:kafka_2.12:jar:5.5.1-ccs:compile

[INFO] +- io.confluent:kafka-schema-registry:jar:5.5.1:compile
[INFO] |  |  \- 
com.kjetland:mbknor-jackson-jsonschema_2.12:jar:1.0.39:compile


On 5/7/2021 2:47 PM, Ragini Manjaiah wrote:

Hi ,
Please find the details


[INFO] X:XXSNAPSHOT
[INFO] +- org.apache.flink:flink-java:jar:1.11.3:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.11.3:compile
[INFO] |  |  +- org.apache.flink:flink-annotations:jar:1.11.3:compile
[INFO] |  |  +- org.apache.flink:flink-metrics-core:jar:1.11.3:compile
[INFO] |  |  +- org.apache.flink:flink-shaded-asm-7:jar:7.1-11.0:compile
[INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
[INFO] |  |     \- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] |  +- org.apache.commons:commons-math3:jar:3.5:compile
[INFO] |  +- org.slf4j:slf4j-api:jar:1.7.15:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  \- org.apache.flink:force-shading:jar:1.11.3:compile
[INFO] +- 
org.apache.flink:flink-statebackend-rocksdb_2.11:jar:1.11.3:compile

[INFO] |  \- com.data-artisans:frocksdbjni:jar:5.17.2-artisans-2.0:compile
[INFO] +- org.apache.flink:flink-clients_2.11:jar:1.11.3:compile
[INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.11.3:compile
[INFO] |  +- org.apache.flink:flink-optimizer_2.11:jar:1.11.3:compile
[INFO] |  \- commons-cli:commons-cli:jar:1.3.1:compile
[INFO] +- io.confluent:kafka-avro-serializer:jar:5.5.1:compile
[INFO] |  +- org.apache.avro:avro:jar:1.9.2:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.10.2:compile
[INFO] |  +- io.confluent:kafka-schema-registry-client:jar:5.5.1:compile
[INFO] |  |  +- javax.ws.rs:javax.ws.rs-api:jar:2.1.1:compile
[INFO] |  |  \- org.glassfish.jersey.core:jersey-common:jar:2.30:compile
[INFO] |  |     +- 
jakarta.annotation:jakarta.annotation-api:jar:1.3.5:compile
[INFO] |  |     \- 
org.glassfish.hk2:osgi-resource-locator:jar:1.0.3:compile

[INFO] |  +- io.confluent:common-config:jar:5.5.1:compile
[INFO] |  \- io.confluent:common-utils:jar:5.5.1:compile
[INFO] +- io.confluent:kafka-schema-serializer:jar:5.5.1:compile
[INFO] +- io.confluent:kafka-schema-registry:jar:5.5.1:compile
[INFO] |  +- io.confluent:kafka-json-schema-provider:jar:5.5.1:compile
[INFO] |  |  +- 
com.github.everit-org.json-schema:org.everit.json.schema:jar:1.12.1:compile

[INFO] |  |  |  +- commons-validator:commons-validator:jar:1.6:compile
[INFO] |  |  |  +- com.damnhandy:handy-uri-templates:jar:2.1.8:compile
[INFO] |  |  |  \- com.google.re2j:re2j:jar:1.3:compile
[INFO] |  |  +- 
com.fasterxml.jackson.datatype:jackson-datatype-guava:jar:2.10.2:compile
[INFO] |  |  +- 
com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.10.2:compile
[INFO] |  |  +- 
com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.10.2:compile
[INFO] |  |  +- 
com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.10.2:compile
[INFO] |  |  +- 
com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.10.2:compile
[INFO] |  |  \- 
com.kjetland:mbknor-jackson-jsonschema_2.12:jar:1.0.39:compile
[INFO] |  |     +- 
org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable:jar:1.3.50:compile
[INFO] |  |     |  +- 
org.jetbrains.kotlin:kotlin-scripting-compiler-impl-embeddable:jar:1.3.50:runtime
[INFO] |  |     |  |  +- 
org.jetbrains.kotlin:kotlin-scripting-common:jar:1.3.50:runtime
[INFO] |  |     |  |  |  \- 
org.jetbrains.kotlin:kotlin-reflect:jar:1.3.50:runtime
[INFO] |  |     |  |  +- 
org.jetbrains.kotlin:kotlin-scripting-jvm:jar:1.3.50:runtime
[INFO] |  |     |  |  |  \- 
org.jetbrains.kotlin:kotlin-script-runtime:jar:1.3.50:runtime
[INFO] |  |     |  |  \- 
org.jetbrains.kotlinx:kotlinx-coroutines-core:jar:1.1.1:runtime
[INFO] |  |     |  \- 
org.jetbrains.kotlin:kotlin-stdlib:jar:1.3.50:compile

[INFO] |  |     |     \- org.jetbrains:annotations:jar:13.0:compile
[INFO] |  |     \- io.github.classgraph:classgraph:jar:4.8.21:compile
[INFO] |  +- io.confluent:kafka-protobuf-provider:jar:5.5.1:compile
[INFO] |  |  +- com.squareup.wire:wire-schema:jar:3.2.2:compile
[INFO] |  |  |  +- 
org.jetbrains.kotlin:kotlin-stdlib-jdk8:jar:1.3.71:compile
[INFO] |  |  |  |  \- 
org.jetbrains.kotlin:kotlin-stdlib-jdk7:jar:1.3.71:compile
[INFO] |  |  |  +- 
org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.3.71:compile

[INFO] |  |  |  +- com.squareup.wire:wire-runtime:jar:3.2.2:compile
[INFO] |  |  |  \- com.squareup.okio:okio:jar:2.5.0:compile
[INFO] |  |  \- com.google.protobuf:protobuf-java-util:jar:3.11.4:compile
[INFO] |  |     \- 
com.google.errorprone:error_prone_annotations:jar:2.3.4:compile

[INFO] |  +- org.apache.kafka:kafka_2.12:jar:5.5.1-ccs:compile
[INFO] |  |  +- 
com.fasterxml.jackson.module:jackson-module-scala_2.12:jar:2.10.2:compile
[INFO] |  |  |  \- 

Re: Flink :Exception in thread "main" java.lang.NoSuchMethodError: scala.Function1.$init$(Lscala/Function1;)V

2021-05-07 Thread Ragini Manjaiah
Hi ,
Please find the details


[INFO] X:XXSNAPSHOT
[INFO] +- org.apache.flink:flink-java:jar:1.11.3:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.11.3:compile
[INFO] |  |  +- org.apache.flink:flink-annotations:jar:1.11.3:compile
[INFO] |  |  +- org.apache.flink:flink-metrics-core:jar:1.11.3:compile
[INFO] |  |  +- org.apache.flink:flink-shaded-asm-7:jar:7.1-11.0:compile
[INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
[INFO] |  | \- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] |  +- org.apache.commons:commons-math3:jar:3.5:compile
[INFO] |  +- org.slf4j:slf4j-api:jar:1.7.15:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  \- org.apache.flink:force-shading:jar:1.11.3:compile
[INFO] +-
org.apache.flink:flink-statebackend-rocksdb_2.11:jar:1.11.3:compile
[INFO] |  \- com.data-artisans:frocksdbjni:jar:5.17.2-artisans-2.0:compile
[INFO] +- org.apache.flink:flink-clients_2.11:jar:1.11.3:compile
[INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.11.3:compile
[INFO] |  +- org.apache.flink:flink-optimizer_2.11:jar:1.11.3:compile
[INFO] |  \- commons-cli:commons-cli:jar:1.3.1:compile
[INFO] +- io.confluent:kafka-avro-serializer:jar:5.5.1:compile
[INFO] |  +- org.apache.avro:avro:jar:1.9.2:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.10.2:compile
[INFO] |  +- io.confluent:kafka-schema-registry-client:jar:5.5.1:compile
[INFO] |  |  +- javax.ws.rs:javax.ws.rs-api:jar:2.1.1:compile
[INFO] |  |  \- org.glassfish.jersey.core:jersey-common:jar:2.30:compile
[INFO] |  | +-
jakarta.annotation:jakarta.annotation-api:jar:1.3.5:compile
[INFO] |  | \- org.glassfish.hk2:osgi-resource-locator:jar:1.0.3:compile
[INFO] |  +- io.confluent:common-config:jar:5.5.1:compile
[INFO] |  \- io.confluent:common-utils:jar:5.5.1:compile
[INFO] +- io.confluent:kafka-schema-serializer:jar:5.5.1:compile
[INFO] +- io.confluent:kafka-schema-registry:jar:5.5.1:compile
[INFO] |  +- io.confluent:kafka-json-schema-provider:jar:5.5.1:compile
[INFO] |  |  +-
com.github.everit-org.json-schema:org.everit.json.schema:jar:1.12.1:compile
[INFO] |  |  |  +- commons-validator:commons-validator:jar:1.6:compile
[INFO] |  |  |  +- com.damnhandy:handy-uri-templates:jar:2.1.8:compile
[INFO] |  |  |  \- com.google.re2j:re2j:jar:1.3:compile
[INFO] |  |  +-
com.fasterxml.jackson.datatype:jackson-datatype-guava:jar:2.10.2:compile
[INFO] |  |  +-
com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.10.2:compile
[INFO] |  |  +-
com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.10.2:compile
[INFO] |  |  +-
com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.10.2:compile
[INFO] |  |  +-
com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.10.2:compile
[INFO] |  |  \-
com.kjetland:mbknor-jackson-jsonschema_2.12:jar:1.0.39:compile
[INFO] |  | +-
org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable:jar:1.3.50:compile
[INFO] |  | |  +-
org.jetbrains.kotlin:kotlin-scripting-compiler-impl-embeddable:jar:1.3.50:runtime
[INFO] |  | |  |  +-
org.jetbrains.kotlin:kotlin-scripting-common:jar:1.3.50:runtime
[INFO] |  | |  |  |  \-
org.jetbrains.kotlin:kotlin-reflect:jar:1.3.50:runtime
[INFO] |  | |  |  +-
org.jetbrains.kotlin:kotlin-scripting-jvm:jar:1.3.50:runtime
[INFO] |  | |  |  |  \-
org.jetbrains.kotlin:kotlin-script-runtime:jar:1.3.50:runtime
[INFO] |  | |  |  \-
org.jetbrains.kotlinx:kotlinx-coroutines-core:jar:1.1.1:runtime
[INFO] |  | |  \- org.jetbrains.kotlin:kotlin-stdlib:jar:1.3.50:compile
[INFO] |  | | \- org.jetbrains:annotations:jar:13.0:compile
[INFO] |  | \- io.github.classgraph:classgraph:jar:4.8.21:compile
[INFO] |  +- io.confluent:kafka-protobuf-provider:jar:5.5.1:compile
[INFO] |  |  +- com.squareup.wire:wire-schema:jar:3.2.2:compile
[INFO] |  |  |  +-
org.jetbrains.kotlin:kotlin-stdlib-jdk8:jar:1.3.71:compile
[INFO] |  |  |  |  \-
org.jetbrains.kotlin:kotlin-stdlib-jdk7:jar:1.3.71:compile
[INFO] |  |  |  +-
org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.3.71:compile
[INFO] |  |  |  +- com.squareup.wire:wire-runtime:jar:3.2.2:compile
[INFO] |  |  |  \- com.squareup.okio:okio:jar:2.5.0:compile
[INFO] |  |  \- com.google.protobuf:protobuf-java-util:jar:3.11.4:compile
[INFO] |  | \-
com.google.errorprone:error_prone_annotations:jar:2.3.4:compile
[INFO] |  +- org.apache.kafka:kafka_2.12:jar:5.5.1-ccs:compile
[INFO] |  |  +-
com.fasterxml.jackson.module:jackson-module-scala_2.12:jar:2.10.2:compile
[INFO] |  |  |  \-
com.fasterxml.jackson.module:jackson-module-paranamer:jar:2.10.2:compile
[INFO] |  |  | \- com.thoughtworks.paranamer:paranamer:jar:2.8:compile
[INFO] |  |  +-
com.fasterxml.jackson.dataformat:jackson-dataformat-csv:jar:2.10.2:compile
[INFO] |  |  +-
org.scala-lang.modules:scala-collection-compat_2.12:jar:2.1.3:compile
[INFO] |  |  +-

Re: flink job task在taskmanager上分布不均衡

2021-05-07 Thread Shawn Huang
看你的描述应该是Standalone部署模式。
默认调度方法是以slot为单位的,并且会倾向于分配到同一个TaskManager的slot中。
想要充分利用所有slot,一个方法是把集群中slot总数设为所有作业的并行度之和,
或者尝试将配置项cluster.evenly-spread-out-slots设为true。

Best,
Shawn Huang


张锴  于2021年5月7日周五 下午7:50写道:

> 给l另一个job设置个组别名,不同的组不会slot共享,会跑到别的slot上,slot可以灵活的运行在不同的TM上。
>
> allanqinjy  于2021年5月7日周五 下午7:38写道:
>
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
> > flink的配置中是有flink taskmanager配置的,一个tm对应几个slots
> >
> 。taskmanager.numberOfTaskSlots默认是1.并行度是对应了slots数据,一般我们的slots与并行度最大的一样。你可以看一下这个参数设置。然后对照官网说明。
> >
> >
> > | |
> > allanqinjy
> > |
> > |
> > allanqi...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2021年05月7日 16:42,wenyuan138 写道:
> > flink集群(flink 1.10.1),taskmanager有4个,每个有10个slot。 然后我有2个job,
> > 每个并行度是4,预期是会分布到不同taskmanager的slot上(也就是4个taskmanager平均分配2个slot,
> > 这样能更好的利用cpu资源). 结果发现这2个job的8个task全部分配到同一个taskmanager上了。 为什么?
> > 有什么配置可以改变这种行为吗?
> > 我们想要的是task能分到不同的taskmanager上。 谢谢!
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Read kafka offsets from checkpoint - state processor

2021-05-07 Thread bat man
Anyone who has tried this or can help on this.

Thanks.

On Thu, May 6, 2021 at 10:34 AM bat man  wrote:

> Hi Users,
>
> Is there a way that Flink 1.9 the checkpointed data can be read using the
> state processor api.
> Docs [1] says - When reading operator state, users specify the operator
> uid, the state name, and the type information.
>
> What is the type for the kafka operator, which needs to be specified while
> reading the state.
>
> [1] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
>
> Thanks,
> Hemant
>


Re:FlieSystem Connector's Success File Was Submitted Late

2021-05-07 Thread forideal
I found the reason:


   Late data processing: The record will be written into its partition when a 
record is supposed to be written into a partition that has already been 
committed, and then the committing of this partition will be triggered again.
So, I see that the success file is slower to update the file.


Best,
Forideal




At 2021-05-07 19:41:45, "forideal"  wrote:

Hi My friends:
I use FlieSystem in Flink SQL, and I found that my success file was 
submitted late, probably dozens of minutes late.
Here I provide some information:
1.Flink version is 1.11.1.
2.Source DDL
   create table test (
  `timestamp bigint`,
 event_time as _timestamp(timestamp),
WATERMARK FOR event_time AS event_time - INTERVAL'10'MINUTE
)...
3.Sink DDL
create table sinkTest(
xxx
dtm VARCHAR,
hh VARCHAR
) PARTITIONED BY (dtm, hh)
with(
'connector' = 'filesystem',
'format' = 'parquet', 'parquet.compression' = 'SNAPPY', 
'sink.rolling-policy.file-size' = '512MB', 'sink.rolling-policy.check-interval' 
= '10 min', 'sink.partition-commit.trigger' = 'partition-time', 
'sink.partition-commit.delay' = '1 h', 'sink.partition-commit.policy.kind' = 
'success-file', 'sink.file-suffix' = '.parquet',
'partition.time-extractor.timestamp-pattern' = '$dtm $hh:00:00'
)


4.The interval for task submission checkpoint is 5 minutes, and the checkpoints 
are all successful.


I think that if my task is not delayed, then our success file will be submitted 
in about 10 minutes every hour, but the fact is that it is submitted very late.
Here are some source codes about submitting success file. When the watermark is 
greater than the current partition time + delay time, I can submit the success 
file.
public List committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}

long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();

List needCommit = new ArrayList<>();
Iterator iter = pendingPartitions.iterator();
   while (iter.hasNext()) {
  String partition = iter.next();
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
  if (watermark > toMills(partTime) + commitDelay) {
 needCommit.add(partition);
iter.remove();
}
   }
return needCommit;
}
Best,
Forideal




 

Re: Flink :Exception in thread "main" java.lang.NoSuchMethodError: scala.Function1.$init$(Lscala/Function1;)V

2021-05-07 Thread Chesnay Schepler

Can you show us the dependency tree of your project?
(If you are using maven, run "mvn dependency:tree")

On 5/7/2021 2:15 PM, Ragini Manjaiah wrote:

The scala version is same across the pom file . 2.11

On Fri, May 7, 2021 at 5:06 PM Chesnay Schepler > wrote:


It looks like you have different scala versions on the classpath.
Please
check that all your dependencies use the same scala version.

On 5/7/2021 1:25 PM, Ragini Manjaiah wrote:
> Hi ,
> I am surfacing when submitting flink from intellij  IDE . what
> cloud the issues. Do need to change the scala version
>
> flink 1.11.3
> scala 2.11
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.Function1.$init$(Lscala/Function1;)V
> at
>

scala.concurrent.java8.FuturesConvertersImpl$CF.(FutureConvertersImpl.scala:18)
> at
scala.compat.java8.FutureConverters$.toJava(FutureConverters.scala:61)
> at
>

scala.compat.java8.FutureConverters$FutureOps$.toJava$extension(FutureConverters.scala:185)
> at akka.pattern.Patterns$.ask(Patterns.scala:95)
> at akka.pattern.Patterns.ask(Patterns.scala)
> at
>

org.apache.flink.runtime.rpc.akka.SupervisorActor.startAkkaRpcActor(SupervisorActor.java:173)
> at
>

org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor(AkkaRpcService.java:293)
> at
>

org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:221)
> at
org.apache.flink.runtime.rpc.RpcEndpoint.(RpcEndpoint.java:129)
> at
>

org.apache.flink.runtime.metrics.dump.MetricQueryService.(MetricQueryService.java:75)
> at
>

org.apache.flink.runtime.metrics.dump.MetricQueryService.createMetricQueryService(MetricQueryService.java:252)
> at
>

org.apache.flink.runtime.metrics.MetricRegistryImpl.startQueryService(MetricRegistryImpl.java:182)
> at
>
org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:307)
> at
>

org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:87)
> at
>

org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
> at
>

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
> at
>

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1714)
> at
>

org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> at
>

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
> at org.sapphire.fullpayload.Application.main(Application.java:173)






Re: Flink :Exception in thread "main" java.lang.NoSuchMethodError: scala.Function1.$init$(Lscala/Function1;)V

2021-05-07 Thread Ragini Manjaiah
The scala version is same across the pom file . 2.11

On Fri, May 7, 2021 at 5:06 PM Chesnay Schepler  wrote:

> It looks like you have different scala versions on the classpath. Please
> check that all your dependencies use the same scala version.
>
> On 5/7/2021 1:25 PM, Ragini Manjaiah wrote:
> > Hi ,
> > I am surfacing when submitting flink from intellij  IDE . what
> > cloud the issues. Do need to change the scala version
> >
> > flink 1.11.3
> > scala 2.11
> >
> > Exception in thread "main" java.lang.NoSuchMethodError:
> > scala.Function1.$init$(Lscala/Function1;)V
> > at
> >
> scala.concurrent.java8.FuturesConvertersImpl$CF.(FutureConvertersImpl.scala:18)
> > at scala.compat.java8.FutureConverters$.toJava(FutureConverters.scala:61)
> > at
> >
> scala.compat.java8.FutureConverters$FutureOps$.toJava$extension(FutureConverters.scala:185)
> > at akka.pattern.Patterns$.ask(Patterns.scala:95)
> > at akka.pattern.Patterns.ask(Patterns.scala)
> > at
> >
> org.apache.flink.runtime.rpc.akka.SupervisorActor.startAkkaRpcActor(SupervisorActor.java:173)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor(AkkaRpcService.java:293)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:221)
> > at org.apache.flink.runtime.rpc.RpcEndpoint.(RpcEndpoint.java:129)
> > at
> >
> org.apache.flink.runtime.metrics.dump.MetricQueryService.(MetricQueryService.java:75)
> > at
> >
> org.apache.flink.runtime.metrics.dump.MetricQueryService.createMetricQueryService(MetricQueryService.java:252)
> > at
> >
> org.apache.flink.runtime.metrics.MetricRegistryImpl.startQueryService(MetricRegistryImpl.java:182)
> > at
> >
> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:307)
> > at
> >
> org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:87)
> > at
> >
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1714)
> > at
> >
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
> > at org.sapphire.fullpayload.Application.main(Application.java:173)
>
>
>


Re: flink job task在taskmanager上分布不均衡

2021-05-07 Thread 张锴
给l另一个job设置个组别名,不同的组不会slot共享,会跑到别的slot上,slot可以灵活的运行在不同的TM上。

allanqinjy  于2021年5月7日周五 下午7:38写道:

>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
> flink的配置中是有flink taskmanager配置的,一个tm对应几个slots
> 。taskmanager.numberOfTaskSlots默认是1.并行度是对应了slots数据,一般我们的slots与并行度最大的一样。你可以看一下这个参数设置。然后对照官网说明。
>
>
> | |
> allanqinjy
> |
> |
> allanqi...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2021年05月7日 16:42,wenyuan138 写道:
> flink集群(flink 1.10.1),taskmanager有4个,每个有10个slot。 然后我有2个job,
> 每个并行度是4,预期是会分布到不同taskmanager的slot上(也就是4个taskmanager平均分配2个slot,
> 这样能更好的利用cpu资源). 结果发现这2个job的8个task全部分配到同一个taskmanager上了。 为什么?
> 有什么配置可以改变这种行为吗?
> 我们想要的是task能分到不同的taskmanager上。 谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


FlieSystem Connector's Success File Was Submitted Late

2021-05-07 Thread forideal
Hi My friends:
I use FlieSystem in Flink SQL, and I found that my success file was 
submitted late, probably dozens of minutes late.
Here I provide some information:
1.Flink version is 1.11.1.
2.Source DDL
   create table test (
  `timestamp bigint`,
 event_time as _timestamp(timestamp),
WATERMARK FOR event_time AS event_time - INTERVAL'10'MINUTE
)...
3.Sink DDL
create table sinkTest(
xxx
dtm VARCHAR,
hh VARCHAR
) PARTITIONED BY (dtm, hh)
with(
'connector' = 'filesystem',
'format' = 'parquet', 'parquet.compression' = 'SNAPPY', 
'sink.rolling-policy.file-size' = '512MB', 'sink.rolling-policy.check-interval' 
= '10 min', 'sink.partition-commit.trigger' = 'partition-time', 
'sink.partition-commit.delay' = '1 h', 'sink.partition-commit.policy.kind' = 
'success-file', 'sink.file-suffix' = '.parquet',
'partition.time-extractor.timestamp-pattern' = '$dtm $hh:00:00'
)


4.The interval for task submission checkpoint is 5 minutes, and the checkpoints 
are all successful.


I think that if my task is not delayed, then our success file will be submitted 
in about 10 minutes every hour, but the fact is that it is submitted very late.
Here are some source codes about submitting success file. When the watermark is 
greater than the current partition time + delay time, I can submit the success 
file.
public List committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}

long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();

List needCommit = new ArrayList<>();
Iterator iter = pendingPartitions.iterator();
   while (iter.hasNext()) {
  String partition = iter.next();
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
  if (watermark > toMills(partTime) + commitDelay) {
 needCommit.add(partition);
iter.remove();
}
   }
return needCommit;
}
Best,
Forideal

Re: Flink :Exception in thread "main" java.lang.NoSuchMethodError: scala.Function1.$init$(Lscala/Function1;)V

2021-05-07 Thread Chesnay Schepler
It looks like you have different scala versions on the classpath. Please 
check that all your dependencies use the same scala version.


On 5/7/2021 1:25 PM, Ragini Manjaiah wrote:

Hi ,
I am surfacing when submitting flink from intellij  IDE . what 
cloud the issues. Do need to change the scala version


flink 1.11.3
scala 2.11

Exception in thread "main" java.lang.NoSuchMethodError: 
scala.Function1.$init$(Lscala/Function1;)V
at 
scala.concurrent.java8.FuturesConvertersImpl$CF.(FutureConvertersImpl.scala:18)

at scala.compat.java8.FutureConverters$.toJava(FutureConverters.scala:61)
at 
scala.compat.java8.FutureConverters$FutureOps$.toJava$extension(FutureConverters.scala:185)

at akka.pattern.Patterns$.ask(Patterns.scala:95)
at akka.pattern.Patterns.ask(Patterns.scala)
at 
org.apache.flink.runtime.rpc.akka.SupervisorActor.startAkkaRpcActor(SupervisorActor.java:173)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor(AkkaRpcService.java:293)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:221)

at org.apache.flink.runtime.rpc.RpcEndpoint.(RpcEndpoint.java:129)
at 
org.apache.flink.runtime.metrics.dump.MetricQueryService.(MetricQueryService.java:75)
at 
org.apache.flink.runtime.metrics.dump.MetricQueryService.createMetricQueryService(MetricQueryService.java:252)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl.startQueryService(MetricRegistryImpl.java:182)
at 
org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:307)
at 
org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:87)
at 
org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1714)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)

at org.sapphire.fullpayload.Application.main(Application.java:173)





Re: How to increase the number of task managers?

2021-05-07 Thread Tamir Sagi
Hey

num of TMs = parallelism / num of slots

parallelism.default is another config you should consider.

Read also
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/parallel/


[https://my-email-signature.link/signature.gif?u=1088647=151783011=daa41c995e0f03eb0cd358bef914ad1aaab37cbec8d5045736b62bf3f38f5497]

From: Yik San Chan 
Sent: Friday, May 7, 2021 1:56 PM
To: user 
Subject: How to increase the number of task managers?


EXTERNAL EMAIL


Hi community,

According to the 
[docs](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/):

> taskmanager.numberOfTaskSlots: The number of slots that a TaskManager offers 
> (default: 1). Each slot can take one task or pipeline. Having multiple slots 
> in a TaskManager can help amortize certain constant overheads (of the JVM, 
> application libraries, or network connections) across parallel tasks or 
> pipelines. See the Task Slots and 
> Resources
>  concepts section for details.

> Running more smaller TaskManagers with one slot each is a good starting point 
> and leads to the best isolation between tasks. Dedicating the same resources 
> to fewer larger TaskManagers with more slots can help to increase resource 
> utilization, at the cost of weaker isolation between the tasks (more tasks 
> share the same JVM).

We're able to tune slot count by setting taskmanager.numberOfTaskSlots, that 
may help parallelize my task.

I wonder if I can tune the number of task managers? Is there a corresponding 
config?

Best,
Yik San

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


回复:flink job task在taskmanager上分布不均衡

2021-05-07 Thread allanqinjy
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
flink的配置中是有flink taskmanager配置的,一个tm对应几个slots 
。taskmanager.numberOfTaskSlots默认是1.并行度是对应了slots数据,一般我们的slots与并行度最大的一样。你可以看一下这个参数设置。然后对照官网说明。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年05月7日 16:42,wenyuan138 写道:
flink集群(flink 1.10.1),taskmanager有4个,每个有10个slot。 然后我有2个job,
每个并行度是4,预期是会分布到不同taskmanager的slot上(也就是4个taskmanager平均分配2个slot,
这样能更好的利用cpu资源). 结果发现这2个job的8个task全部分配到同一个taskmanager上了。 为什么? 有什么配置可以改变这种行为吗?
我们想要的是task能分到不同的taskmanager上。 谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink :Exception in thread "main" java.lang.NoSuchMethodError: scala.Function1.$init$(Lscala/Function1;)V

2021-05-07 Thread Ragini Manjaiah
Hi ,
I am surfacing when submitting flink from intellij  IDE . what cloud the
issues. Do need to change the scala version

flink 1.11.3
scala 2.11

Exception in thread "main" java.lang.NoSuchMethodError:
scala.Function1.$init$(Lscala/Function1;)V
at
scala.concurrent.java8.FuturesConvertersImpl$CF.(FutureConvertersImpl.scala:18)
at scala.compat.java8.FutureConverters$.toJava(FutureConverters.scala:61)
at
scala.compat.java8.FutureConverters$FutureOps$.toJava$extension(FutureConverters.scala:185)
at akka.pattern.Patterns$.ask(Patterns.scala:95)
at akka.pattern.Patterns.ask(Patterns.scala)
at
org.apache.flink.runtime.rpc.akka.SupervisorActor.startAkkaRpcActor(SupervisorActor.java:173)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor(AkkaRpcService.java:293)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:221)
at org.apache.flink.runtime.rpc.RpcEndpoint.(RpcEndpoint.java:129)
at
org.apache.flink.runtime.metrics.dump.MetricQueryService.(MetricQueryService.java:75)
at
org.apache.flink.runtime.metrics.dump.MetricQueryService.createMetricQueryService(MetricQueryService.java:252)
at
org.apache.flink.runtime.metrics.MetricRegistryImpl.startQueryService(MetricRegistryImpl.java:182)
at
org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:307)
at
org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:87)
at
org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1714)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
at org.sapphire.fullpayload.Application.main(Application.java:173)


How to increase the number of task managers?

2021-05-07 Thread Yik San Chan
Hi community,

According to the [docs](
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
):

> taskmanager.numberOfTaskSlots: The number of slots that a TaskManager
offers *(default: 1)*. Each slot can take one task or pipeline. Having
multiple slots in a TaskManager can help amortize certain constant
overheads (of the JVM, application libraries, or network connections)
across parallel tasks or pipelines. See the Task Slots and Resources

 concepts section for details.

> Running more smaller TaskManagers with one slot each is a good starting
point and leads to the best isolation between tasks. Dedicating the same
resources to fewer larger TaskManagers with more slots can help to increase
resource utilization, at the cost of weaker isolation between the tasks
(more tasks share the same JVM).

We're able to tune slot count by setting taskmanager.numberOfTaskSlots,
that may help parallelize my task.

I wonder if I can tune the number of task managers? Is there a
corresponding config?

Best,
Yik San


Re: Watermark time zone issue

2021-05-07 Thread Leonard Xu
Hi, forideal

It’s not because the time zone issue, the watermark value is timestamp in UTC 
mills, you should convert it to UTC timestamp and then compare with your data.

Best,
Leonard


> 在 2021年5月7日,18:28,forideal  写道:
> 
> Hi My friends:
> My watermark added 8 more hours to the timestamp displayed on the flink 
> web. What is the reason for this? Actually looking at the data, it is 
> correct. I don't know where the problem occurred? Is it because of the time 
> zone?
>Flink 1.11.1
>  
> Best Wishes!!!
> forideal
> 
> 
>  



Watermark time zone issue

2021-05-07 Thread forideal
Hi My friends:
My watermark added 8 more hours to the timestamp displayed on the flink 
web. What is the reason for this? Actually looking at the data, it is correct. 
I don't know where the problem occurred? Is it because of the time zone?
   Flink 1.11.1
 
Best Wishes!!!
forideal

回复: Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()

2021-05-07 Thread JackJia
您好,能否把解决的思路介绍一下?


祝好
在2020年12月18日 10:38,丁浩浩<18579099...@163.com> 写道:
问题我自己已经解决。

在 2020年12月17日,下午9:00,丁浩浩 <18579099...@163.com> 写道:

flink版本:1.11.1
udaf函数代码来自于阿里云官网文档

以下是代码
public class TestSql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = FlinkUtils.getTableEnv(env);
//env.setParallelism(3);
tableEnv.createTemporarySystemFunction("count_uadf", CountUdaf.class);

Properties configs = CommonUtils.getConfigs();
//注册clazz源表
FlinkUtils.registerMysqlTable2FlinkTable(
tableEnv,configs.getProperty("url"),
configs.getProperty("user.name"), configs.getProperty("password"),
“test", "clazz_lesson");

Table table = tableEnv.sqlQuery("select count_uadf(clazz_number),clazz_number 
from clazz_lesson group by clazz_number");
//Table table = tableEnv.sqlQuery("select number,collect(extension_value) from 
clazz_extension group by number ");
tableEnv.toRetractStream(table, Row.class).print();
env.execute();


}
}



public class CountUdaf extends AggregateFunction {
//定义存放count UDAF状态的accumulator的数据的结构。
public static class CountAccum {
public long total;
}

@Override
//初始化count UDAF的accumulator。
public CountAccum createAccumulator() {
CountAccum acc = new CountAccum();
acc.total = 0;
return acc;
}
@Override
//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
public Long getValue(CountAccum accumulator) {
return accumulator.total;
}


//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
public void accumulate(CountAccum accumulator, Long iValue) {
accumulator.total++;
}
public void merge(CountAccum accumulator, Iterable its) {
for (CountAccum other : its) {
accumulator.total += other.total;
}
}
}

以下是堆栈信息

-
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. From line 1, column 8 to line 1, column 31: No match found 
for function signature count_uadf()
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at com.gaotu.data.performance.flink.job.sql.TestSql.main(TestSql.java:34)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 8 to line 1, column 31: No match found for function signature 
count_uadf()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 

How to comsume and send data with two different kerberos cetified kafka in one flink task?

2021-05-07 Thread 1095193...@qq.com
Hi
   By setting security.kerberos.* configure, we can connect one kerberos 
certified  Kafka in Flink sql  task. How to consume and produce with two 
different kerberos cetified Kafka in one flink sql task? Kafka
allow multiple SASL authenticated Java clients in a single JVM process. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-83+-+Allow+multiple+SASL+authenticated+Java+clients+in+a+single+JVM+process.
  How to acheive this in Flink sql task?


1095193...@qq.com


Re: The problem of getting data by Rest API

2021-05-07 Thread Chesnay Schepler
To be more precise, the update of the data is scheduled at most once 
every 10 seconds, but it can of course happen that the result of said 
update arrives in a different interval.


As in, this would be possible:

T00: Issue update 1
T10: Issue update 2
T12: Receive update1
T14: Receive update2

On 5/7/2021 9:14 AM, Chesnay Schepler wrote:
The WebUI also retrieves all data from the REST API, which should be 
updated with a minimum interval of 10 seconds.


On 5/7/2021 3:57 AM, penguin. wrote:
On the Web UI page, we can see that the relevant data is updated 
every 3S, such as the read-bytes of each operator.
But when I get data through Rest API, the data is updated every 6 
seconds or even more than 10 seconds. Why?

The related data of read bytes obtained through Rest API is as follows:

19:02:04                  0       0              0              0    
          0         0              0
19:02:07                  0       0              0              0    
          0         0              0
19:02:10                  0       0              0              0    
          0         0              0
19:02:13                  0       0              0              0    
          0         0              0


19:02:16             792820  807792         684683         796474    
     813133    680141         689590
19:02:19             792820  807792         684683         796474    
     813133    680141         689590
19:02:22             792820  807792         684683         796474    
     813133    680141         689590
19:02:26             792820  807792         684683         796474    
     813133    680141         689590


19:02:29            1817569 1815899        1560744        1808234    
    1836741   1559341        1568711
19:02:32            1817569 1815899        1560744        1808234    
    1836741   1559341        1568711
19:02:35            1817569 1815899        1560744        1808234    
    1836741   1559341        1568711
19:02:38            1817569 1815899        1560744        1808234    
    1836741   1559341        1568711


19:02:41            2796433 2805973        2401927        2797311    
    2838168   2397542        2417726
19:02:45            2796433 2805973        2401927        2797311    
    2838168   2397542        2417726
19:02:48            2796433 2805973        2401927        2797311    
    2838168   2397542        2417726


19:02:51            3649398 3640050        3130583        3624854    
    3687889   3122333        3144306
19:02:54            3649398 3640050        3130583        3624854    
    3687889   3122333        3144306
19:02:57            3649398 3640050        3130583        3624854    
    3687889   3122333        3144306
19:03:00            3649398 3640050        3130583        3624854    
    3687889   3122333        3144306


19:03:03            4529443 4524218        3881509        4517926    
    4582770   3884389        3907298
19:03:06            4529443 4524218        3881509        4517926    
    4582770   3884389        3907298
19:03:09            4529443 4524218        3881509        4517926    
    4582770   3884389        3907298


19:03:12            5432212 5423566        4659737        5404419    
    5492786   4649469        4685430
19:03:16            5432212 5423566        4659737        5404419    
    5492786   4649469        4685430
19:03:19            5432212 5423566        4659737        5404419    
    5492786   4649469        4685430
19:03:22            5432212 5423566        4659737        5404419    
    5492786   4649469        4685430


19:03:25            6255327 6243823        5367516        6236577    
    6321958   5360496        5396446
19:03:28            6255327 6243823        5367516        6236577    
    6321958   5360496        5396446
19:03:31            6255327 6243823        5367516        6236577    
    6321958   5360496        5396446


19:03:34            7207042 7203725        6184804        7178875    
    7286207   6172393        6212499
19:03:37            7207042 7203725        6184804        7178875    
    7286207   6172393        6212499
19:03:40            7207042 7203725        6184804        7178875    
    7286207   6172393        6212499
19:03:43            7207042 7203725        6184804        7178875    
    7286207   6172393        6212499


19:03:47            8064759 8048237        6918715        8034269    
    8144119   6912701        6957967
19:03:50            8064759 8048237        6918715        8034269    
    8144119   6912701        6957967
19:03:53            8064759 8048237        6918715        8034269    
    8144119   6912701        6957967


19:03:56            8903005 8895364        7636796        8880999    
    9004171   7628515        7684865
19:03:59            8903005 8895364        7636796        8880999    
    9004171   7628515        7684865
19:04:02            8903005 8895364        7636796        8880999    
    9004171   7628515        7684865



Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-07 Thread Yangze Guo
Thanks, Dawid & Guowei for the great work, thanks to everyone involved.

Best,
Yangze Guo

On Thu, May 6, 2021 at 5:51 PM Rui Li  wrote:
>
> Thanks to Dawid and Guowei for the great work!
>
> On Thu, May 6, 2021 at 4:48 PM Zhu Zhu  wrote:
>>
>> Thanks Dawid and Guowei for being the release managers! And thanks everyone 
>> who has made this release possible!
>>
>> Thanks,
>> Zhu
>>
>> Yun Tang  于2021年5月6日周四 下午2:30写道:
>>>
>>> Thanks for Dawid and Guowei's great work, and thanks for everyone involved 
>>> for this release.
>>>
>>> Best
>>> Yun Tang
>>> 
>>> From: Xintong Song 
>>> Sent: Thursday, May 6, 2021 12:08
>>> To: user ; dev 
>>> Subject: Re: [ANNOUNCE] Apache Flink 1.13.0 released
>>>
>>> Thanks Dawid & Guowei as the release managers, and everyone who has
>>> contributed to this release.
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, May 6, 2021 at 9:51 AM Leonard Xu  wrote:
>>>
>>> > Thanks Dawid & Guowei for the great work, thanks everyone involved.
>>> >
>>> > Best,
>>> > Leonard
>>> >
>>> > 在 2021年5月5日,17:12,Theo Diefenthal  写道:
>>> >
>>> > Thanks for managing the release. +1. I like the focus on improving
>>> > operations with this version.
>>> >
>>> > --
>>> > *Von: *"Matthias Pohl" 
>>> > *An: *"Etienne Chauchot" 
>>> > *CC: *"dev" , "Dawid Wysakowicz" <
>>> > dwysakow...@apache.org>, "user" ,
>>> > annou...@apache.org
>>> > *Gesendet: *Dienstag, 4. Mai 2021 21:53:31
>>> > *Betreff: *Re: [ANNOUNCE] Apache Flink 1.13.0 released
>>> >
>>> > Yes, thanks for managing the release, Dawid & Guowei! +1
>>> >
>>> > On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot 
>>> > wrote:
>>> >
>>> >> Congrats to everyone involved !
>>> >>
>>> >> Best
>>> >>
>>> >> Etienne
>>> >> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
>>> >>
>>> >> The Apache Flink community is very happy to announce the release of
>>> >> Apache Flink 1.13.0.
>>> >>
>>> >> Apache Flink® is an open-source stream processing framework for
>>> >> distributed, high-performing, always-available, and accurate data 
>>> >> streaming
>>> >> applications.
>>> >>
>>> >> The release is available for download at:
>>> >> https://flink.apache.org/downloads.html
>>> >>
>>> >> Please check out the release blog post for an overview of the
>>> >> improvements for this bugfix release:
>>> >> https://flink.apache.org/news/2021/05/03/release-1.13.0.html
>>> >>
>>> >> The full release notes are available in Jira:
>>> >>
>>> >> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349287
>>> >>
>>> >> We would like to thank all contributors of the Apache Flink community who
>>> >> made this release possible!
>>> >>
>>> >> Regards,
>>> >> Guowei & Dawid
>>> >>
>>> >>
>>> >
>>> >
>
>
>
> --
> Best regards!
> Rui Li


Re: The problem of getting data by Rest API

2021-05-07 Thread Chesnay Schepler
The WebUI also retrieves all data from the REST API, which should be 
updated with a minimum interval of 10 seconds.


On 5/7/2021 3:57 AM, penguin. wrote:
On the Web UI page, we can see that the relevant data is updated every 
3S, such as the read-bytes of each operator.
But when I get data through Rest API, the data is updated every 6 
seconds or even more than 10 seconds. Why?

The related data of read bytes obtained through Rest API is as follows:

19:02:04                  0     0              0              0        
      0     0              0
19:02:07                  0     0              0              0        
      0     0              0
19:02:10                  0     0              0              0        
      0     0              0
19:02:13                  0     0              0              0        
      0     0              0


19:02:16             792820  807792         684683         796474      
   813133  680141         689590
19:02:19             792820  807792         684683         796474      
   813133  680141         689590
19:02:22             792820  807792         684683         796474      
   813133  680141         689590
19:02:26             792820  807792         684683         796474      
   813133  680141         689590


19:02:29            1817569 1815899        1560744        1808234      
  1836741 1559341        1568711
19:02:32            1817569 1815899        1560744        1808234      
  1836741 1559341        1568711
19:02:35            1817569 1815899        1560744        1808234      
  1836741 1559341        1568711
19:02:38            1817569 1815899        1560744        1808234      
  1836741 1559341        1568711


19:02:41            2796433 2805973        2401927        2797311      
  2838168 2397542        2417726
19:02:45            2796433 2805973        2401927        2797311      
  2838168 2397542        2417726
19:02:48            2796433 2805973        2401927        2797311      
  2838168 2397542        2417726


19:02:51            3649398 3640050        3130583        3624854      
  3687889 3122333        3144306
19:02:54            3649398 3640050        3130583        3624854      
  3687889 3122333        3144306
19:02:57            3649398 3640050        3130583        3624854      
  3687889 3122333        3144306
19:03:00            3649398 3640050        3130583        3624854      
  3687889 3122333        3144306


19:03:03            4529443 4524218        3881509        4517926      
  4582770 3884389        3907298
19:03:06            4529443 4524218        3881509        4517926      
  4582770 3884389        3907298
19:03:09            4529443 4524218        3881509        4517926      
  4582770 3884389        3907298


19:03:12            5432212 5423566        4659737        5404419      
  5492786 4649469        4685430
19:03:16            5432212 5423566        4659737        5404419      
  5492786 4649469        4685430
19:03:19            5432212 5423566        4659737        5404419      
  5492786 4649469        4685430
19:03:22            5432212 5423566        4659737        5404419      
  5492786 4649469        4685430


19:03:25            6255327 6243823        5367516        6236577      
  6321958 5360496        5396446
19:03:28            6255327 6243823        5367516        6236577      
  6321958 5360496        5396446
19:03:31            6255327 6243823        5367516        6236577      
  6321958 5360496        5396446


19:03:34            7207042 7203725        6184804        7178875      
  7286207 6172393        6212499
19:03:37            7207042 7203725        6184804        7178875      
  7286207 6172393        6212499
19:03:40            7207042 7203725        6184804        7178875      
  7286207 6172393        6212499
19:03:43            7207042 7203725        6184804        7178875      
  7286207 6172393        6212499


19:03:47            8064759 8048237        6918715        8034269      
  8144119 6912701        6957967
19:03:50            8064759 8048237        6918715        8034269      
  8144119 6912701        6957967
19:03:53            8064759 8048237        6918715        8034269      
  8144119 6912701        6957967


19:03:56            8903005 8895364        7636796        8880999      
  9004171 7628515        7684865
19:03:59            8903005 8895364        7636796        8880999      
  9004171 7628515        7684865
19:04:02            8903005 8895364        7636796        8880999      
  9004171 7628515        7684865


19:04:05            9764407 9746490        8375615        9721560      
  9864876 8364266        8420637
19:04:08            9764407 9746490        8375615        9721560      
  9864876 8364266        8420637
19:04:11            9764407 9746490        8375615        9721560      
  9864876 8364266        8420637
19:04:14            9764407 9746490        8375615        9721560      
  9864876 8364266        8420637


19:04:18           

????minicluster????????????????

2021-05-07 Thread ????buaa
miniclusterslotdebugminicluster

Re: History Server是否可以查看TaskManager聚合后的日志

2021-05-07 Thread Yang Wang
目前Flink的history server并没有和Yarn NM的log
aggregation进行整合,所以任务结束以后只能看webui以及exception
日志是没有办法看的

Best,
Yang

lhuiseu  于2021年5月7日周五 下午1:57写道:

> Hi:
> flink 1.12.0
> on yarn 模式
> 已经Finish的任务可以再history server中找到。但是通过WebUI查看TaskManager Log报404。目前Flink
> History Server是不支持查看TaskManager聚合后的日志吗?希望了解history serve相关原理的同学给予帮助。
> 非常感谢。
>
> 
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


回复: Re: 扩展SqlServerDialect 运行在flink on k8s报错

2021-05-07 Thread 18756225...@163.com
非常感谢!


 
发件人: Leonard Xu
发送时间: 2021-05-07 14:26
收件人: user-zh
主题: Re: 扩展SqlServerDialect 运行在flink on k8s报错
Hi
 
看日志是加载不到对应的class文件,(1)可以对比下你jar包里的路径是不是不正确,(2) 检查下集群上是不是还有之前的jar包,没替换干净
 
祝好
Leonard
 
> 在 2021年5月7日,13:58,18756225...@163.com 写道:
> 
> 大家好,遇到一个问题:
> 坏境:flink 版本1.12.1,  k8s集群为session模式,  该集群之前可以将数据正常写入到mysql
> 参考mysqlDialect 扩展了一个 
> SqlServerDialect,替换了flink的lib包下的flink-connector-jdbc_2.11-1.12.1.jar,在on 
> yarn时 任务正常运行,flink-sql也可以将数据写入到sqlserver
> 在同一台机器坏境 提交到k8s搭建的flink session模式集群, 就报如下错误 , JdbcBatchingOutputFormat 
> 这个类加载不到? 
> 
> 谢谢!
> 
> 完整异常如下:
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
>at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:498)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Cannot load user class: 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
>at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:574)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:164)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: 
> 

Re: 扩展SqlServerDialect 运行在flink on k8s报错

2021-05-07 Thread Leonard Xu
Hi

看日志是加载不到对应的class文件,(1)可以对比下你jar包里的路径是不是不正确,(2) 检查下集群上是不是还有之前的jar包,没替换干净

祝好
Leonard

> 在 2021年5月7日,13:58,18756225...@163.com 写道:
> 
> 大家好,遇到一个问题:
> 坏境:flink 版本1.12.1,  k8s集群为session模式,  该集群之前可以将数据正常写入到mysql
> 参考mysqlDialect 扩展了一个 
> SqlServerDialect,替换了flink的lib包下的flink-connector-jdbc_2.11-1.12.1.jar,在on 
> yarn时 任务正常运行,flink-sql也可以将数据写入到sqlserver
> 在同一台机器坏境 提交到k8s搭建的flink session模式集群, 就报如下错误 , JdbcBatchingOutputFormat 
> 这个类加载不到? 
> 
> 谢谢!
> 
> 完整异常如下:
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
>at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:498)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Cannot load user class: 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
>at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:574)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:164)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat
>at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>at 

请问在native kubernetes上如何运行Flink History Server?

2021-05-07 Thread casel.chen
请问在native kubernetes上如何运行Flink History Server? 有没有相应的文档?

扩展SqlServerDialect 运行在flink on k8s报错

2021-05-07 Thread 18756225...@163.com
大家好,遇到一个问题:
 坏境:flink 版本1.12.1,  k8s集群为session模式,  该集群之前可以将数据正常写入到mysql
 参考mysqlDialect 扩展了一个 
SqlServerDialect,替换了flink的lib包下的flink-connector-jdbc_2.11-1.12.1.jar,在on yarn时 
任务正常运行,flink-sql也可以将数据写入到sqlserver
 在同一台机器坏境 提交到k8s搭建的flink session模式集群, 就报如下错误 , JdbcBatchingOutputFormat 
这个类加载不到? 

谢谢!

完整异常如下:
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:574)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:164)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
at