Flink on yarn的日志监控和checkpoint的监控生产是如何处理的?

2021-08-30 Thread guanyq
flink on yarn 在集群中启动很多的task,生产应用中是如何监控task的日志,和checkpoint的呢?


求大佬指导。

Re: Delete Keyed State outside of StateTTL

2021-08-30 Thread narasimha
Thank JING.

But I have a question here, what will happen to the keyed stream in that
case?  Will it be removed automatically? or will be present but the state
will be empty, in that case what is the implication on memory occupation?

On Tue, Aug 31, 2021 at 8:14 AM JING ZHANG  wrote:

> Hi,
> All types of state also have a method clear() that clears the state for
> the currently active key, i.e. the key of the input element.
> Could we call the `clear()` method directly to remove the state under the
> specified key?
>
> Best,
> JING ZHANG
>
>
> narasimha  于2021年8月31日周二 上午9:44写道:
>
>> Hi,
>>
>> I have a use case where the keyed state is managed (create, reset) by
>> dynamically changing rules. New action "delete" has to be added.
>> Delete is to completely delete the keyed state, same as how StateTTL does
>> post expiration time.
>>
>> Use StateTTL?
>>
>> Initially used StateTTL, but it ended up in increasing the CPU usage even
>> with low traffic.
>> The state gets updated ~>50 times a second, and read multiple times in a
>> second, so the time against state entry is getting updated often.
>>
>> So I didn't find it to be a viable solution.
>>
>> Can someone please help on how Keyed State can be removed outside of
>> StateTTL.
>>
>> --
>> A.Narasimha Swamy
>>
>

-- 
A.Narasimha Swamy


Re: k8S HA mode

2021-08-30 Thread Yang Wang
Could you please share the full JobManager logs?

AFAIK, you attached exceptions are normal logs when the JobManager is
trying to acquire the configmap lock.

Best,
Yang

houssem  于2021年8月31日周二 上午4:36写道:

> Hello, thanks for the response
>
> I am using kubernetes standalone application mode not the native one.
>
> and this error happens randomly at some point while running the job.
>
> Also i am using just one replicas of the jobmanager
>
> here is some other logs::
>
>
> {"@timestamp":"2021-08-30T15:43:44.970+02:00","@version":"1","message":"Exception
> occurred while renewing lock: Unable to update
> ConfigMapLock","logger_name":"io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector",
> "thread_name":"pool-685-thread-1","level":"DEBUG","level_value":1,"stack_trace":"io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException:
> Unable to update ConfigMapLock
>
> io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock.update(ConfigMapLock.java:108)
>
>  
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.tryAcquireOrRenew(LeaderElector.java:156)
>
>  
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.renew(LeaderElector.java:120)
>
>  
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$null$1(LeaderElector.java:104)
>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>  
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
>  
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  java.lang.Thread.run(Thread.java:748)
>  Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
> Failure executing: PUT at:
>
> https://172.31.64.1/api/v1/namespaces/flink-pushavoo-flink-rec/configmaps/elifibre--jobmanager-leader
> .
>  Message: Operation cannot be fulfilled on configmaps
> \"elifibre--jobmanager-leader\": the object
> has been modified; please apply your changes to the latest version and try
> again.
>  Received status: Status(apiVersion=v1, code=409,
> details=StatusDetails(causes=[], group=null, kind=configmaps,
> name=elifibre--jobmanager-leader,
> retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status,
> message=Operation cannot be fulfilled on configmaps
>  \"elifibre--jobmanager-leader\": the
> object has been modified;
>  please apply your changes to the latest version and try again,
> metadata=ListMeta(_continue=null, remainingItemCount=null,
> resourceVersion=null, selfLink=null, additionalProperties={}),
> reason=Conflict, status=Failure, additionalProperties={}).
>
>  
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:568)
>
>  
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:507)
>
>  
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:471)
>
>  
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:430)
>
>  
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleReplace(OperationSupport.java:289)
>
>  
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleReplace(OperationSupport.java:269)
>
>  
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleReplace(BaseOperation.java:820)
>
>  
> io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$1(HasMetadataOperation.java:86)
>
>  
> io.fabric8.kubernetes.api.model.DoneableConfigMap.done(DoneableConfigMap.java:26)
>
>  
> io.fabric8.kubernetes.api.model.DoneableConfigMap.done(DoneableConfigMap.java:5)
>
>  
> io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:92)
>
>  
> io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:36)
>
>  
> io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock.update(ConfigMapLock.java:106)
>  ... 10 common frames omitted\n"}
>
>
> **
>
>
>
>
>
> On 2021/08/30 10:53:10, Roman Khachatryan  wrote:
> > Hello,
> >
> > Do I understand correctly that you are using native Kubernetes
> > deployment in application mode;
> > and the issue *only* happens if you set kubernetes-jobmanager-replicas
> > [1] to a value greater than 1?
> >
> > Does it happen during deployment or at some point while running the job?
> >
> > Could you share Flink and 

Re: flink run -d -m yarn-cluster 提交任务到yarn集群不成功

2021-08-30 Thread Yang Wang
export HADOOP_CLASSPATH=`hadoop classpath`

如上方式应该是没有问题的,你确认下这些目录下面的jar包是存在的,尤其是/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn/

Best,
Yang

龙逸尘  于2021年8月31日周二 上午11:02写道:

> Hi Wayne,
>
> 可以尝试下指定 HADOOP_CONF_DIR
> export HADOOP_CONF_DIR=/opt/flink/hadoop-conf/
>
> Wayne <1...@163.com> 于2021年8月28日周六 下午8:37写道:
>
> > 我的提交命令
> >
> >
> > ./bin/flink run -d -m yarn-cluster
> >
> >
> > 报错如下
> >  The program finished with the following exception:
> >
> >
> > java.lang.IllegalStateException: No Executor found. Please make sure to
> > export the HADOOP_CLASSPATH environment variable or have hadoop in your
> > classpath. For more information refer to the "Deployment" section of the
> > official Apache Flink documentation.
> > at
> >
> org.apache.flink.yarn.cli.FallbackYarnSessionCli.isActive(FallbackYarnSessionCli.java:41)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.validateAndGetActiveCommandLine(CliFrontend.java:1236)
> > at
> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:234)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> > at
> >
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> > at
> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> >
> >
> > 运行命令 hadoop classpath
> > @192 flink-1.12.2 % hadoop classpath
> >
> >
> /Users//local/hadoop/hadoop-3.2.2/etc/hadoop:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/common/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/common/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/mapreduce/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/mapreduce/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn/*:/Users//local/hadoop/hadoop-3.2.2
> > 反复配置HADOOP_CLASSPATH 无法生效 官网给出的
> > export HADOOP_CLASSPATH=`hadoop classpath`
> > 这个 hadoop classpath 具体配置到哪一级
> >
> >
> >
> >
> >
> >
> >
> >
>


Re: Flink performance with multiple operators reshuffling data

2021-08-30 Thread JING ZHANG
Hi Jason,
A job with multiple reshuffle data could be scalable under normal
circumstances.
But we should carefully avoid data skew. Because if input stream has data
skew, add more resources would not help.
Besides that, if we could adjust the order of the functions, we could put
the keyed process function with the lowest selectivity at the top. The
lower the ratio of output records number to input records number, the lower
the selectivity is.

Best,
JING ZHANG


Jason Liu  于2021年8月31日周二 上午8:12写道:

> Hi there,
>
> We have this use case where we need to have multiple keybys operators
> with its own MapState, all with different keys, in a single Flink app. This
> obviously means we'll be reshuffling our data a lot.
> Our TPS is around 1-2k, with ~2kb per event and we use Kinesis Data
> Analytics as the infrastructure (running roughly on ~128 KPU of hardware).
> I'm currently in the design phase of this system and just wondering if we
> can put the data through 4-5 keyed process functions all with different key
> bys and if it can be scalable with a large enough Flink cluster. I don't
> think we can get around this requirement much (other than replicating
> data). Alternatively, we can just run multiple small Flink clusters, each
> with its own unique keyBys but I'm not sure if or how much that'll help.
>  Thanks for any potential insights!
>
> -Jason
>


Re: Deploying Flink on Kubernetes with fractional CPU and different limits and requests

2021-08-30 Thread Yang Wang
Hi all,

I think it is a good improvement to support different resource requests and
limits. And it is very useful
especially for the CPU resource since it heavily depends on the upstream
workloads.

Actually, we(alibaba) have introduced some internal config options to
support this feature. WDYT?

// The prefix of Kubernetes resource limit factor. It should not be
less than 1. The resource
// could be cpu, memory, ephemeral-storage and all other types
supported by Kubernetes.
public static final String KUBERNETES_JOBMANAGER_RESOURCE_LIMIT_FACTOR_PREFIX =
"kubernetes.jobmanager.limit-factor.";
public static final String KUBERNETES_TASKMANAGER_RESOURCE_LIMIT_FACTOR_PREFIX =
"kubernetes.taskmanager.limit-factor.";


BTW, we already have an old ticket for this feature[1].


[1]. https://issues.apache.org/jira/browse/FLINK-15648

Best,
Yang

Alexis Sarda-Espinosa  于2021年8月26日周四
下午10:04写道:

> I think it would be nice if the task manager pods get their values from
> the configuration file only if the pod templates don’t specify any
> resources. That was the goal of supporting pod templates, right? Allowing
> more custom scenarios without letting the configuration options get bloated.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Denis Cosmin NUTIU 
> *Sent:* Donnerstag, 26. August 2021 15:55
> *To:* matth...@ververica.com
> *Cc:* user@flink.apache.org; danrtsey...@gmail.com
> *Subject:* Re: Deploying Flink on Kubernetes with fractional CPU and
> different limits and requests
>
>
>
> Hi Matthias,
>
>
>
> Thanks for getting back to me and for your time!
>
>
>
> We have some Flink jobs deployed on Kubernetes and running kubectl top pod
> gives the following result:
>
>
>
>
> NAMECPU(cores)   
> MEMORY(bytes)
> aa-78c8cb77d4-zlmpg  8m   1410Mi
> aa-taskmanager-2-2   32m  1066Mi
> bb-5f7b65f95c-jwb7t  7m   1445Mi
> bb-taskmanager-2-2   32m  1016Mi
> cc-54d967b55d-b567x   11m  514Mi
> cc-taskmanager-4-111m  496Mi
> dd-6fbc6b8666-krhlx   10m  535Mi
> dd-taskmanager-2-212m  522Mi
> xx-6845cf7986-p45lq 53m  526Mi
> xx-taskmanager-5-2  11m  507Mi
>
>
>
> During low workloads the jobs consume just about 100m CPU and during high
> workloads the CPU consumption increases to 500m-1000m. Having the ability
> to specify requests and limit separately would give us more deployment
> flexibility.
>
>
>
> Sincerely,
>
> Denis
>
>
>
> On Thu, 2021-08-26 at 14:22 +0200, Matthias Pohl wrote:
>
> *CAUTION:* This email originated from outside of our organization. Do not
> click links or open attachments unless you recognize the sender and know
> the content is safe.
>
> Hi Denis,
>
> I did a bit of digging: It looks like there is no way to specify them
> independently. You can find documentation about pod templates for
> TaskManager and JobManager [1]. But even there it states that for cpu and
> memory, the resource specs are overwritten by the Flink configuration. The
> code also reveals that limit and requests are set using the same value [2].
>
>
>
> I'm going to pull Yang Wang into this thread. I'm wondering whether there
> is a reason for that or whether it makes sense to create a Jira issue
> introducing more specific configuration parameters for limit and requests.
>
>
>
> Best,
> Matthias
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink
>
> [2]
> https://github.com/apache/flink/blob/f64261c91b195ecdcd99975b51de540db89a3f48/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java#L324-L332
>
>
>
> On Thu, Aug 26, 2021 at 11:17 AM Denis Cosmin NUTIU <
> dnu...@bitdefender.com> wrote:
>
> Hello,
>
> I've developed a Flink job and I'm trying to deploy it on a Kubernetes
> cluster using Flink Native.
>
> Setting kubernetes.taskmanager.cpu=0.5 and
> kubernetes.jobmanager.cpu=0.5 sets the requests and limits to 500m,
> which is correct, but I'd like to set the requests and limits to
> different values, something like:
>
> resources:
>   requests:
> memory: "1048Mi"
> cpu: "100m"
>   limits:
> memory: "2096Mi"
> cpu: "1000m"
>
> I've tried using pod templates from Flink 1.13 and manually patching
> the Kubernetes deployment file, the jobmanager gets spawned with the
> correct reousrce requests and limits but the taskmanagers get spawned
> with the defaults:
>
> Limits:
>   cpu: 1
>   memory:  1728Mi
> Requests:
>   cpu: 1
>   memory:  1728Mi
>
> Is there any way I could set the requests/limits for the CPU/Memory to
> different values when deploying Flink in Kubernetes? If not, would it
> make sense to request this as a feature?
>
> Thanks in advance!
>
> Denis
>
>


Re: flink run -d -m yarn-cluster 提交任务到yarn集群不成功

2021-08-30 Thread 龙逸尘
Hi Wayne,

可以尝试下指定 HADOOP_CONF_DIR
export HADOOP_CONF_DIR=/opt/flink/hadoop-conf/

Wayne <1...@163.com> 于2021年8月28日周六 下午8:37写道:

> 我的提交命令
>
>
> ./bin/flink run -d -m yarn-cluster
>
>
> 报错如下
>  The program finished with the following exception:
>
>
> java.lang.IllegalStateException: No Executor found. Please make sure to
> export the HADOOP_CLASSPATH environment variable or have hadoop in your
> classpath. For more information refer to the "Deployment" section of the
> official Apache Flink documentation.
> at
> org.apache.flink.yarn.cli.FallbackYarnSessionCli.isActive(FallbackYarnSessionCli.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.validateAndGetActiveCommandLine(CliFrontend.java:1236)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:234)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
>
> 运行命令 hadoop classpath
> @192 flink-1.12.2 % hadoop classpath
>
> /Users//local/hadoop/hadoop-3.2.2/etc/hadoop:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/common/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/common/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/hdfs/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/mapreduce/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/mapreduce/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn/lib/*:/Users//local/hadoop/hadoop-3.2.2/share/hadoop/yarn/*:/Users//local/hadoop/hadoop-3.2.2
> 反复配置HADOOP_CLASSPATH 无法生效 官网给出的
> export HADOOP_CLASSPATH=`hadoop classpath`
> 这个 hadoop classpath 具体配置到哪一级
>
>
>
>
>
>
>
>


Re: flink oss ha

2021-08-30 Thread Yun Tang
Hi

这个看上去更像是oss配置的问题,你能使用目前配置的 oss.endpoint,accessKeyId以及accessKeySecret 
结合ossutil等工具访问对应的 oss://bucket-logcenter/flink-state/flink-session-recovery 么?

祝好
唐云

From: dker eandei 
Sent: Monday, August 30, 2021 12:36
To: user-zh@flink.apache.org 
Subject: 回复: flink oss ha

您好:
 附件是使用oss作高可用时的报错,以下是启动flink时的脚本:

../bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=flink-session-1 \
-Dkubernetes.container.image=test/flink:1.13.2-scala_2.12-oss \
-Dkubernetes.container.image.pull-policy=Always \
-Dkubernetes.namespace=flink-session \
-Dkubernetes.service-account=flink-session-sa \
-Dkubernetes.rest-service.exposed.type=ClusterIP \
-Dtaskmanager.numberOfTaskSlots=6 \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=2048m \
-Dkubernetes.jobmanager.cpu=1 \
-Dkubernetes.taskmanager.cpu=2 \
-Dfs.oss.endpoint="http://oss-.local; \
-Dfs.oss.accessKeyId="j0BAJ" \
-Dfs.oss.accessKeySecret="7mzTPiC4w" \

-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \

-Dhigh-availability.storageDir=oss://bucket-logcenter/flink-state/flink-session-recovery
 \

-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.2.jar
 \

-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.2.jar

-邮件原件-
发件人: Yun Tang 
发送时间: 2021年8月30日 11:36
收件人: user-zh@flink.apache.org
主题: Re: flink oss ha

Hi,
你好,图片无法加载,可以直接粘贴文字出来

祝好
唐云

From: dker eandei 
Sent: Friday, August 27, 2021 14:58
To: user-zh@flink.apache.org 
Subject: flink oss ha


您好:

看文档OSS可以用作 FsStatebackend,那么Flink on k8s 
做高可用时,high-availability.storageDir可以配置成oss吗,我试了下,报以下错误:

[cid:image002.png@01D79B53.F4C71E80]



从 Windows 
版邮件发送




Re: Flink performance with multiple operators reshuffling data

2021-08-30 Thread Caizhi Weng
Hi!

Key-by operations can scale with parallelisms. Flink will shuffle your
record to different sub-task according to the hash value of the key modulo
number of parallelism, so the more parallelism you have the faster Flink
can process data, unless there is a data skew.

Jason Liu  于2021年8月31日周二 上午8:12写道:

> Hi there,
>
> We have this use case where we need to have multiple keybys operators
> with its own MapState, all with different keys, in a single Flink app. This
> obviously means we'll be reshuffling our data a lot.
> Our TPS is around 1-2k, with ~2kb per event and we use Kinesis Data
> Analytics as the infrastructure (running roughly on ~128 KPU of hardware).
> I'm currently in the design phase of this system and just wondering if we
> can put the data through 4-5 keyed process functions all with different key
> bys and if it can be scalable with a large enough Flink cluster. I don't
> think we can get around this requirement much (other than replicating
> data). Alternatively, we can just run multiple small Flink clusters, each
> with its own unique keyBys but I'm not sure if or how much that'll help.
>  Thanks for any potential insights!
>
> -Jason
>


Re: flink run 提交任务到yarn 报Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

2021-08-30 Thread Caizhi Weng
Hi!

-C 要求提供的路径必须在集群的所有节点都能访问。如果提供的是 file://,那么集群的所有节点的这个路径下都要有对应文件。可以把文件放在 hdfs
上然后 -C 指定 hdfs:// 试试。

Wayne <1...@163.com> 于2021年8月30日周一 下午3:13写道:

> 我的执行命令是
>  flink run -m yarn-cluster -yd -yjm 1024m -ytm 1024m -ys 1 -ynm xxx -C
> file:///xxx/flink-connector-kafka_2.12-1.12.2.jar -C
> file:///xxx/flink-sql-avro-1.12.2.jar ...
> 提交到生产集群上提示
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:331)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:145)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:517)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> ~[?:1.8.0_292]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> ~[?:1.8.0_292]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> ~[?:1.8.0_292]
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_292]
> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_292]
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
> ~[?:1.8.0_292]
> at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
> ~[?:1.8.0_292]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:317)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> ... 6 more
> 2
>
>
> 任务可以提交到yarn上,说明 -C 

Re: Delete Keyed State outside of StateTTL

2021-08-30 Thread JING ZHANG
Hi,
All types of state also have a method clear() that clears the state for the
currently active key, i.e. the key of the input element.
Could we call the `clear()` method directly to remove the state under the
specified key?

Best,
JING ZHANG


narasimha  于2021年8月31日周二 上午9:44写道:

> Hi,
>
> I have a use case where the keyed state is managed (create, reset) by
> dynamically changing rules. New action "delete" has to be added.
> Delete is to completely delete the keyed state, same as how StateTTL does
> post expiration time.
>
> Use StateTTL?
>
> Initially used StateTTL, but it ended up in increasing the CPU usage even
> with low traffic.
> The state gets updated ~>50 times a second, and read multiple times in a
> second, so the time against state entry is getting updated often.
>
> So I didn't find it to be a viable solution.
>
> Can someone please help on how Keyed State can be removed outside of
> StateTTL.
>
> --
> A.Narasimha Swamy
>


Delete Keyed State outside of StateTTL

2021-08-30 Thread narasimha
Hi,

I have a use case where the keyed state is managed (create, reset) by
dynamically changing rules. New action "delete" has to be added.
Delete is to completely delete the keyed state, same as how StateTTL does
post expiration time.

Use StateTTL?

Initially used StateTTL, but it ended up in increasing the CPU usage even
with low traffic.
The state gets updated ~>50 times a second, and read multiple times in a
second, so the time against state entry is getting updated often.

So I didn't find it to be a viable solution.

Can someone please help on how Keyed State can be removed outside of
StateTTL.

-- 
A.Narasimha Swamy


退订

2021-08-30 Thread xiaobo77
退订

Flink performance with multiple operators reshuffling data

2021-08-30 Thread Jason Liu
Hi there,

We have this use case where we need to have multiple keybys operators
with its own MapState, all with different keys, in a single Flink app. This
obviously means we'll be reshuffling our data a lot.
Our TPS is around 1-2k, with ~2kb per event and we use Kinesis Data
Analytics as the infrastructure (running roughly on ~128 KPU of hardware).
I'm currently in the design phase of this system and just wondering if we
can put the data through 4-5 keyed process functions all with different key
bys and if it can be scalable with a large enough Flink cluster. I don't
think we can get around this requirement much (other than replicating
data). Alternatively, we can just run multiple small Flink clusters, each
with its own unique keyBys but I'm not sure if or how much that'll help.
 Thanks for any potential insights!

-Jason


Re: k8S HA mode

2021-08-30 Thread houssem
Hello, thanks for the response

I am using kubernetes standalone application mode not the native one.

and this error happens randomly at some point while running the job.

Also i am using just one replicas of the jobmanager

here is some other logs::


{"@timestamp":"2021-08-30T15:43:44.970+02:00","@version":"1","message":"Exception
 occurred while renewing lock: Unable to update 
ConfigMapLock","logger_name":"io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector",
"thread_name":"pool-685-thread-1","level":"DEBUG","level_value":1,"stack_trace":"io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException:
 Unable to update ConfigMapLock
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock.update(ConfigMapLock.java:108)
 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.tryAcquireOrRenew(LeaderElector.java:156)
 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.renew(LeaderElector.java:120)
 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$null$1(LeaderElector.java:104)
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 java.lang.Thread.run(Thread.java:748)
 Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
executing: PUT at:
 
https://172.31.64.1/api/v1/namespaces/flink-pushavoo-flink-rec/configmaps/elifibre--jobmanager-leader.
 Message: Operation cannot be fulfilled on configmaps 
\"elifibre--jobmanager-leader\": the object has 
been modified; please apply your changes to the latest version and try again.
 Received status: Status(apiVersion=v1, code=409, 
details=StatusDetails(causes=[], group=null, kind=configmaps, 
name=elifibre--jobmanager-leader, 
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
message=Operation cannot be fulfilled on configmaps
 \"elifibre--jobmanager-leader\": the object 
has been modified;
 please apply your changes to the latest version and try again, 
metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Conflict, 
status=Failure, additionalProperties={}).
 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:568)
 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:507)
 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:471)
 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:430)
 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleReplace(OperationSupport.java:289)
 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleReplace(OperationSupport.java:269)
 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleReplace(BaseOperation.java:820)
 
io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$1(HasMetadataOperation.java:86)
 
io.fabric8.kubernetes.api.model.DoneableConfigMap.done(DoneableConfigMap.java:26)
 
io.fabric8.kubernetes.api.model.DoneableConfigMap.done(DoneableConfigMap.java:5)
 
io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:92)
 
io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:36)
 
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock.update(ConfigMapLock.java:106)
 ... 10 common frames omitted\n"}

**





On 2021/08/30 10:53:10, Roman Khachatryan  wrote: 
> Hello,
> 
> Do I understand correctly that you are using native Kubernetes
> deployment in application mode;
> and the issue *only* happens if you set kubernetes-jobmanager-replicas
> [1] to a value greater than 1?
> 
> Does it happen during deployment or at some point while running the job?
> 
> Could you share Flink and Kubernetes versions and HA configuration
> [2]? (I'm assuming you're using Kubernetes for HA, not ZK).
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#kubernetes-jobmanager-replicas
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/
> 
> Regards,
> Roman
> 
> On Fri, Aug 27, 2021 at 2:31 PM mejri houssem  
> wrote:
> >
> > 

Re: Flink issues with Avro GenericRecord serialization

2021-08-30 Thread tarun joshi
An update on this , I see that `IndexedRecord` is part of Avro Library.
Please correct me If I am wrong in assuming that the "Pojo's generated by
Avro POJO generator must be implementing IndexedRecord interface"  It seems
either


   -  I should be parsing Stringified Json from AWS Kinesis directly into
   Avro.
   -  *Or* convert those GSON parsed POJOs into Avro compatible POJOs at
   Stream-time.

Please let me know if anyone has a better way to do this.


On Mon, 30 Aug 2021 at 10:13, tarun joshi <1985.ta...@gmail.com> wrote:

> Hey all,
> I am trying to write a simple pipeline to read
>
> Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
> -> for the purpose of writing Parquet files to AWS S3.
>
> 1) This is my SimpleMapper
>
> public class SimpleMapper extends RichMapFunction {
> private static final GsonBuilder gsonBuilder =
> new 
> GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();
>
> private static final Gson gson = gsonBuilder.create();
> private static final Schema schema = 
> ReflectData.get().getSchema(Response.class);
>
> @Override
> public GenericRecord map(String s) throws Exception {
>
> Response response = gson.fromJson(s, Response.class);
> GenericData.Record record = new GenericData.Record(schema);
> record.put(0, response);
>
> return record;
> }
>
> 2) This is my Job Definition
>
> public class ClickStreamPipeline implements Serializable {
>
> private static Schema schema = 
> ReflectData.get().getSchema(Response.class);
>
> public static void main(String args[]) throws Exception {
> final MultipleParameterTool params = 
> MultipleParameterTool.fromArgs(args);
> StreamExecutionEnvironment env = 
> getStreamExecutionEnvironment(params);
>
>
> FlinkKinesisConsumer kinesisConsumer =
> new FlinkKinesisConsumer<>(
> "web-clickstream", new SimpleStringSchema(), 
> getKafkaConsumerProperties());
>
> final StreamingFileSink streamingFileSink =
> StreamingFileSink.forBulkFormat(
> new 
> Path("s3://data-ingestion-pipeline/flink_pipeline/"),
> ParquetAvroWriters.forGenericRecord(schema))
> .withRollingPolicy(OnCheckpointRollingPolicy.build())
> .build();
>
> env.addSource(kinesisConsumer)
> .map(new SimpleMapper())
> .returns(new GenericRecordAvroTypeInfo(schema))
> .addSink(streamingFileSink);
>
> env.execute("Read files in streaming fashion");
> }
>
> private static StreamExecutionEnvironment getStreamExecutionEnvironment(
> MultipleParameterTool params) throws ClassNotFoundException {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setGlobalJobParameters(params);
>
> Class unmodColl = 
> Class.forName("java.util.Collections$UnmodifiableCollection");
> env.getConfig()
> .addDefaultKryoSerializer(unmodColl, 
> UnmodifiableCollectionsSerializer.class);
> env.enableCheckpointing(60_000L);
>
> return env;
> }
>
>
>
> The issue I am facing is failiing to serialize the Avro GenericRecord
> wrapped message
>
>- When I used a GenericRecordAvroTypeInfo(schema); to force use my
>Avro as preferred Serializer , I am getting the error below
>
>
> *  java.lang.ClassCastException: class  POJO> cannot be cast to class org.apache.avro.generic.IndexedRecord*
>
>
>
>- If I don't use the GenericRecordAvroTypeInfo and try to register my
>pojo with KryoSerializer , the serialization fails with NPE somewhere in my
>Schema class.Do I need to implement/register a proper Avro serializer with
>flink config?
>
> Thanks for the help!
>


Flink issues with Avro GenericRecord serialization

2021-08-30 Thread tarun joshi
Hey all,
I am trying to write a simple pipeline to read

Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro
-> for the purpose of writing Parquet files to AWS S3.

1) This is my SimpleMapper

public class SimpleMapper extends RichMapFunction {
private static final GsonBuilder gsonBuilder =
new
GsonBuilder().excludeFieldsWithoutExposeAnnotation().setPrettyPrinting();

private static final Gson gson = gsonBuilder.create();
private static final Schema schema =
ReflectData.get().getSchema(Response.class);

@Override
public GenericRecord map(String s) throws Exception {

Response response = gson.fromJson(s, Response.class);
GenericData.Record record = new GenericData.Record(schema);
record.put(0, response);

return record;
}

2) This is my Job Definition

public class ClickStreamPipeline implements Serializable {

private static Schema schema = ReflectData.get().getSchema(Response.class);

public static void main(String args[]) throws Exception {
final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);
StreamExecutionEnvironment env = getStreamExecutionEnvironment(params);


FlinkKinesisConsumer kinesisConsumer =
new FlinkKinesisConsumer<>(
"web-clickstream", new SimpleStringSchema(),
getKafkaConsumerProperties());

final StreamingFileSink streamingFileSink =
StreamingFileSink.forBulkFormat(
new
Path("s3://data-ingestion-pipeline/flink_pipeline/"),
ParquetAvroWriters.forGenericRecord(schema))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();

env.addSource(kinesisConsumer)
.map(new SimpleMapper())
.returns(new GenericRecordAvroTypeInfo(schema))
.addSink(streamingFileSink);

env.execute("Read files in streaming fashion");
}

private static StreamExecutionEnvironment getStreamExecutionEnvironment(
MultipleParameterTool params) throws ClassNotFoundException {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);

Class unmodColl =
Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig()
.addDefaultKryoSerializer(unmodColl,
UnmodifiableCollectionsSerializer.class);
env.enableCheckpointing(60_000L);

return env;
}



The issue I am facing is failiing to serialize the Avro GenericRecord
wrapped message

   - When I used a GenericRecordAvroTypeInfo(schema); to force use my Avro
   as preferred Serializer , I am getting the error below


*  java.lang.ClassCastException: class  cannot be cast to class org.apache.avro.generic.IndexedRecord*



   - If I don't use the GenericRecordAvroTypeInfo and try to register my
   pojo with KryoSerializer , the serialization fails with NPE somewhere in my
   Schema class.Do I need to implement/register a proper Avro serializer with
   flink config?

Thanks for the help!


Table API demo problem

2021-08-30 Thread Tatla, Manraj
Hello everyone,

I am learning Flink because at work we need stateful real time computations in 
a bot detection system.  This weekend, I have had much difficulty in getting 
the real time reporting API tutorial working.  
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/table_api/
In particular, every time I run docker-compose up -d, it does not work.  
Diagnosing this, I found the jobmanager service failing due to out of resource 
exceptions.  I have inspected my system resources, and found no cpu, memory, or 
disk issues.  I see a ton of error messages saying libjemalloc.so cannot be 
preloaded.  I am running on mac, and that seems to be a linux file.

Does anyone know the problem?

I apologize If this is a trivial issue and for the second email.

-Manraj



??????flink keyby????

2021-08-30 Thread JasonLee
hi


 KeyGroupStreamPartitioner#selectChannel .


Best
JasonLee
??2021??8??30?? 22:34??cs<58683...@qq.com.INVALID> ??
flink??keybykey??tasktask??id

Re: Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

2021-08-30 Thread Samir Vasani
HI,

Accessing file is not the problem.
If i put the file before starting the job then this job reads it
correctly but with if i add any file at runtime then it does not read this
newly added files.
Let me know if you need more information.

Thanks & Regards,
Samir Vasani



On Mon, Aug 30, 2021 at 8:03 PM Roman Khachatryan  wrote:

> Hi,
>
> If I understand correctly, the problem is accessing local files from
> Flink running in docker.
> Have you tried mounting the local directory into the container, for
> example as a bind mount [1]?
>
> [1]
> https://docs.docker.com/storage/bind-mounts/
>
> Regards,
> Roman
>
> On Mon, Aug 30, 2021 at 3:33 PM Samir Vasani 
> wrote:
> >
> > I have a requirement to read a file continously from a specific path.
> >
> > Means flink job should continously poll the specified location and read
> a file will arrive at this location at certains intervals .
> >
> > Example: my location on windows machine is C:/inputfiles get a file
> file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.
> >
> > To experimented this with below code .
> >
> > import org.apache.flink.api.common.functions.FlatMapFunction;
> > import org.apache.flink.api.common.io.FilePathFilter;
> > import org.apache.flink.api.java.io.TextInputFormat;
> > import org.apache.flink.core.fs.FileSystem;
> > import org.apache.flink.streaming.api.datastream.DataStream;
> > import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import
> org.apache.flink.streaming.api.functions.source.FileProcessingMode;
> > import org.apache.flink.util.Collector;
> >
> > import java.util.Arrays;
> > import java.util.List;
> >
> > public class ContinuousFileProcessingTest {
> >
> > public static void main(String[] args) throws Exception {
> >
> > final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> > env.enableCheckpointing(10);
> > String localFsURI = "D:\\FLink\\2021_01_01\\";
> > TextInputFormat format = new TextInputFormat(new
> org.apache.flink.core.fs.Path(localFsURI));
> > format.setFilesFilter(FilePathFilter.createDefaultFilter());
> > DataStream inputStream =
> > env.readFile(format, localFsURI,
> FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
> > SingleOutputStreamOperator soso =
> inputStream.map(String::toUpperCase);
> > soso.print();
> > soso.writeAsText("D:\\FLink\\completed",
> FileSystem.WriteMode.OVERWRITE);
> > env.execute("read and write");
> > }
> > }
> >
> >
> >
> >
> > I brought up flink cluster using flink's 1.9.2 and i was able to achieve
> my goal of readin file continously at some intervals.
> >
> > Flink's 1.9.2 version can bring up cluster on windows.
> >
> > But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And we
> used docker to bring cluster up on 1.12 (unlike 1.9.2).
> >
> > Unlike windows path i changed the file location as per docker location
> but the same above program in not running there.
> >
> > Need help to find the solution.
> >
> > Thanks in advance.
> >
> >
> > Thanks & Regards,
> > Samir Vasani
> >
>


Re: Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

2021-08-30 Thread Roman Khachatryan
Hi,

If I understand correctly, the problem is accessing local files from
Flink running in docker.
Have you tried mounting the local directory into the container, for
example as a bind mount [1]?

[1]
https://docs.docker.com/storage/bind-mounts/

Regards,
Roman

On Mon, Aug 30, 2021 at 3:33 PM Samir Vasani  wrote:
>
> I have a requirement to read a file continously from a specific path.
>
> Means flink job should continously poll the specified location and read a 
> file will arrive at this location at certains intervals .
>
> Example: my location on windows machine is C:/inputfiles get a file 
> file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.
>
> To experimented this with below code .
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.io.FilePathFilter;
> import org.apache.flink.api.java.io.TextInputFormat;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
> import org.apache.flink.util.Collector;
>
> import java.util.Arrays;
> import java.util.List;
>
> public class ContinuousFileProcessingTest {
>
> public static void main(String[] args) throws Exception {
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(10);
> String localFsURI = "D:\\FLink\\2021_01_01\\";
> TextInputFormat format = new TextInputFormat(new 
> org.apache.flink.core.fs.Path(localFsURI));
> format.setFilesFilter(FilePathFilter.createDefaultFilter());
> DataStream inputStream =
> env.readFile(format, localFsURI, 
> FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
> SingleOutputStreamOperator soso = 
> inputStream.map(String::toUpperCase);
> soso.print();
> soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE);
> env.execute("read and write");
> }
> }
>
>
>
>
> I brought up flink cluster using flink's 1.9.2 and i was able to achieve my 
> goal of readin file continously at some intervals.
>
> Flink's 1.9.2 version can bring up cluster on windows.
>
> But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And we used 
> docker to bring cluster up on 1.12 (unlike 1.9.2).
>
> Unlike windows path i changed the file location as per docker location but 
> the same above program in not running there.
>
> Need help to find the solution.
>
> Thanks in advance.
>
>
> Thanks & Regards,
> Samir Vasani
>


flink keyby????

2021-08-30 Thread cs
flink??keybykey??tasktask??id

Re: checkpoints/.../shared cleanup

2021-08-30 Thread Khachatryan Roman
Hi,

I think the documentation is correct. Once the job is stopped with
savepoint, any of its "regular" checkpoints are discarded, and as a
result any shared state gets unreferenced and is also discarded.
Savepoints currently do not have shared state.

Furthermore, the new job should have a new ID and therefore a new folder.
Are you referring to the old folders?

However, the removal process is asynchronous and the client doesn't
wait for all the artifacts to be removed.
Then the cluster will wait for removal to complete before termination.
Are you running Flink in session mode?

Regards,
Roman

On Fri, Aug 27, 2021 at 8:05 AM Alexey Trenikhun  wrote:
>
> "the shared subfolder still grows" - while upgrading job, we cancel job with 
> savepoint, my expectations that Flink will clean checkpoint  including shared 
> directory, since checkpoints are not reatained, then we start upgraded job 
> from savepoint, however when I look into shared folder I see older files from 
> previous version of job. This upgrade process repeated again, as result the 
> shared subfolder grows and grows
>
> Thanks,
> Alexey
> 
> From: Alexey Trenikhun 
> Sent: Thursday, August 26, 2021 6:37:27 PM
> To: Matthias Pohl 
> Cc: Flink User Mail List ; sjwies...@gmail.com 
> 
> Subject: Re: checkpoints/.../shared cleanup
>
> Hi Matthias,
>
> I don't use externalized checkpoints (from Flink UI Persist Checkpoints 
> Externally: Disabled), why do you think checkpoint(s) should be retained? It 
> kind of contradicts with documentation [1] - Checkpoints are by default not 
> retained and are only used to resume a job from failures.
>
> [1] - 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints
> Checkpoints | Apache Flink
> Checkpoints # Overview # Checkpoints make state in Flink fault tolerant by 
> allowing state and the corresponding stream positions to be recovered, 
> thereby giving the application the same semantics as a failure-free 
> execution. See Checkpointing for how to enable and configure checkpoints for 
> your program. Checkpoint Storage # When checkpointing is enabled, managed 
> state is persisted to ensure ...
> ci.apache.org
>
> Thanks,
> Alexey
> 
> From: Matthias Pohl 
> Sent: Thursday, August 26, 2021 5:42 AM
> To: Alexey Trenikhun 
> Cc: Flink User Mail List ; sjwies...@gmail.com 
> 
> Subject: Re: checkpoints/.../shared cleanup
>
> Hi Alexey,
> thanks for reaching out to the community. I have a question: What do you mean 
> by "the shared subfolder still grows"? As far as I understand, the shared 
> folder contains the state of incremental checkpoints. If you cancel the 
> corresponding job and start a new job from one of the retained incremental 
> checkpoints, it is required for the shared folder of the previous job to be 
> still around since it contains the state. The new job would then create its 
> own shared subfolder. Any new incremental checkpoints will write their state 
> into the new job's shared subfolder while still relying on shared state of 
> the previous job for older data. The RocksDB Backend is in charge of 
> consolidating the incremental state.
>
> Hence, you should be careful with removing the shared folder in case you're 
> planning to restart the job later on.
>
> I'm adding Seth to this thread. He might have more insights and/or correct my 
> limited knowledge of the incremental checkpoint process.
>
> Best,
> Matthias
>
> On Wed, Aug 25, 2021 at 1:39 AM Alexey Trenikhun  wrote:
>
> Hello,
> I use incremental checkpoints, not externalized, should content of 
> checkpoint/.../shared be removed when I cancel job  (or cancel with 
> savepoint). Looks like in our case shared continutes to grow...
>
> Thanks,
> Alexey


Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

2021-08-30 Thread Samir Vasani
I have a requirement to read a file continously from a specific path.

Means flink job should continously poll the specified location and read a
file will arrive at this location at certains intervals .

Example: my location on windows machine is C:/inputfiles get a file
file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.

To experimented this with below code .

import org.apache.flink.api.common.functions.FlatMapFunction;import
org.apache.flink.api.common.io.FilePathFilter;import
org.apache.flink.api.java.io.TextInputFormat;import
org.apache.flink.core.fs.FileSystem;import
org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.streaming.api.functions.source.FileProcessingMode;import
org.apache.flink.util.Collector;
import java.util.Arrays;import java.util.List;
public class ContinuousFileProcessingTest {
public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10);
String localFsURI = "D:\\FLink\\2021_01_01\\";
TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path(localFsURI));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
DataStream inputStream =
env.readFile(format, localFsURI,
FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
SingleOutputStreamOperator soso =
inputStream.map(String::toUpperCase);
soso.print();
soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE);
env.execute("read and write");
}
}



I brought up flink cluster using flink's 1.9.2 and i was able to
achieve my goal of readin file continously at some intervals.

Flink's 1.9.2 version can bring up cluster on windows.

But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And
we used docker to bring cluster up on 1.12 (unlike 1.9.2).

Unlike windows path i changed the file location as per docker location
but the same above program in not running there.

Need help to find the solution.

Thanks in advance.


Thanks & Regards,
Samir Vasani


Re: Flink sql CLI parsing avro format data error 解析avro数据失败

2021-08-30 Thread Timo Walther
Thanks for letting us now. I hope we can improve the Avro support in the 
1.15 release.


Maybe the `"name" : "KafkaAvroMessage", "namespace" : "xxx"` causes the 
exception then? Otherwise the schema looks identical.


Regards,
Timo

On 30.08.21 11:00, Wayne wrote:


I use the way of writing code to load the schema to read the data, and 
it can be parsed normally. I debug sql execution process and found that 
the schema generated by sql is indeed inconsistent with the original 
schema of avro, so the final judgment is that the sql was written 
incorrectly, resulting in no Piece together the same schema

The following is the schema generated by this sql:

{"type":"record","name":"record","fields":[{"name":"aaa","type":"string"},{"name":"bbb","type":["null","string"],"default":null},{"name":"ccc","type":["null","string"],"default":null},{"name":"ddd","type":"string"}]}





At 2021-08-30 15:03:49, "Timo Walther"  wrote:

Hi,

could it be that there is some corrupt record in your Kafka topic? Maybe 
you can read from a different offset to verify that. In general, I 
cannot spot an obivious mistake in your schema.


Regards,
Timo


On 28.08.21 14:32, Wayne wrote:


i have Apache Avro schema

我的avro schema 如下

|{ "type" : "record", "name" : "KafkaAvroMessage", "namespace" : "xxx", 
"fields" : [ { "name" : "aaa", "type" : "string" }, { "name" : "bbb", 
"type" : [ "null", "string" ], "default" : null },{ "name" : "ccc", 
"type" : [ "null", "string" ] }, { "name" : "ddd", "type" : "string", 
"default" : "" } ] } |


The sql worte is like this
我下的sql如下

|CREATE TABLE xxx ( `aaa` STRING NOT NULL, `bbb` STRING , `ccc` STRING , 
`ddd` STRING NOT NULL ) WITH( ... 'format' = 'avro' ); |


|
|

报错如下

Exception in thread "main" java.lang.RuntimeException: Failed to fetch 
next result at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) 
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) 
at 
org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117) 
at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350) 
at 
org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149) 
at 
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154) 
at com.stubhub.wyane.flink.avro.avroTest4.main(avroTest4.java:50) Caused 
by: java.io.IOException: Failed to fetch job execution result at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169) 
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118) 
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) 
... 6 more Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed. at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) 
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167) 
... 8 more Caused by: 
org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed. at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) 
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) 
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 
at 
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) 
at 
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) 
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:114) 
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:166) 
... 8 more Caused by: org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) 
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) 
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) 
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) 
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) 
at 

Re: Flink 从checkpoint恢复时,部分状态没有正确恢复

2021-08-30 Thread Benchao Li
这个问题已经在1.12中修复了,参考:
https://issues.apache.org/jira/browse/FLINK-18688

Benchao Li  于2021年8月30日周一 下午7:49写道:

> Hi xingxing,
>
> 看起来你可能也遇到了这个bug了。
> 我们遇到过一个bug是这样的,在group by的多个字段里面,如果有2个及以上变长字段的话,会导致底层的BinaryRow序列化
> 的结果不稳定,进而导致状态恢复会错误。
> 先解释下变长字段,这个指的是4byte存不下的数据类型,比如典型的varchar、list、map、row等等;
> 然后再解释下这个结果不稳定的原因,这个是因为在底层的代码生成里面有一个优化,会按照类型进行分组,然后进行优化,
> 但是这个分组的过程用的是一个HashMap[1],会导致字段顺序不是确定性的,有时候是这个顺序,有时候又是另外一个顺序,
> 导致最终的BinaryRow的序列化结果是不稳定的,进而导致无法从checkpoint恢复。
>
> 然后典型的多种变长类型,其实是varchar nullable 和 varchar not null 以及
> char(n),尤其是你这种用了很多常量字符串的场景,
> 容易产生后两种类型,在加上普通字段以及函数都会产生的第一种类型,就会触发这个bug了。
>
> [1]
> https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala#L92
>
> dixingxing  于2021年8月25日周三 下午8:57写道:
>
>> Hi Flink 社区:
>> 我们的Flink版本是1.9.2,用的是blink planer,我们今天遇到一个问题,目前没有定位到原因,现象如下:
>>
>> 手动重启任务时,指定了从一个固定的checkpoint恢复时,有一定的概率,一部分状态数据无法正常恢复,启动后Flink任务本身可以正常运行,且日志中没有明显的报错信息。
>> 具体现象是:type=realshow的数据没有从状态恢复,也就是从0开始累加,而type=show和type=click的数据是正常从状态恢复的。
>>
>>
>> SQL大致如下:
>> createview view1 as
>> select event_id, act_time, device_id
>> from table1
>> where `getStringFromJson`(`act_argv`, 'ispin', '') <>'1'
>> and event_id in
>> ('article_newest_list_show','article_newest_list_sight_show',
>> 'article_list_item_click', 'article_auto_video_play_click');
>>
>>
>> --天的数据
>> insertinto table2
>> select platform, type, `time`, count(1) as pv, hll_uv(device_id) as uv
>> from
>> (select'03'as platform, trim(casewhen event_id
>> ='article_newest_list_show'then'show'
>> when event_id ='article_newest_list_sight_show'then'realshow'
>> when event_id ='article_list_item_click'then'click'else''end) astype,
>> `date_parse`(`act_time`, '-MM-dd HH:mm:ss', 'MMdd') as `time`,
>> device_id
>> from view1
>> where event_id in
>> ('article_newest_list_show','article_newest_list_sight_show',
>> 'article_list_item_click')
>> unionall
>> select'03'as platform, 'click_total'astype,
>> `date_parse`(`act_time`, '-MM-dd HH:mm:ss', 'MMdd') as `time`,
>> device_id
>> from view1
>> where event_id in ('article_list_item_click',
>> 'article_auto_video_play_click'))a
>> groupby platform, type, `time`;
>>
>>
>> 期待大家的帮助与回复,希望能给些问题排查的思路!
>>
>>
>>
>>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li


Re: Flink 从checkpoint恢复时,部分状态没有正确恢复

2021-08-30 Thread Benchao Li
Hi xingxing,

看起来你可能也遇到了这个bug了。
我们遇到过一个bug是这样的,在group by的多个字段里面,如果有2个及以上变长字段的话,会导致底层的BinaryRow序列化
的结果不稳定,进而导致状态恢复会错误。
先解释下变长字段,这个指的是4byte存不下的数据类型,比如典型的varchar、list、map、row等等;
然后再解释下这个结果不稳定的原因,这个是因为在底层的代码生成里面有一个优化,会按照类型进行分组,然后进行优化,
但是这个分组的过程用的是一个HashMap[1],会导致字段顺序不是确定性的,有时候是这个顺序,有时候又是另外一个顺序,
导致最终的BinaryRow的序列化结果是不稳定的,进而导致无法从checkpoint恢复。

然后典型的多种变长类型,其实是varchar nullable 和 varchar not null 以及
char(n),尤其是你这种用了很多常量字符串的场景,
容易产生后两种类型,在加上普通字段以及函数都会产生的第一种类型,就会触发这个bug了。

[1]
https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala#L92

dixingxing  于2021年8月25日周三 下午8:57写道:

> Hi Flink 社区:
> 我们的Flink版本是1.9.2,用的是blink planer,我们今天遇到一个问题,目前没有定位到原因,现象如下:
>
> 手动重启任务时,指定了从一个固定的checkpoint恢复时,有一定的概率,一部分状态数据无法正常恢复,启动后Flink任务本身可以正常运行,且日志中没有明显的报错信息。
> 具体现象是:type=realshow的数据没有从状态恢复,也就是从0开始累加,而type=show和type=click的数据是正常从状态恢复的。
>
>
> SQL大致如下:
> createview view1 as
> select event_id, act_time, device_id
> from table1
> where `getStringFromJson`(`act_argv`, 'ispin', '') <>'1'
> and event_id in
> ('article_newest_list_show','article_newest_list_sight_show',
> 'article_list_item_click', 'article_auto_video_play_click');
>
>
> --天的数据
> insertinto table2
> select platform, type, `time`, count(1) as pv, hll_uv(device_id) as uv
> from
> (select'03'as platform, trim(casewhen event_id
> ='article_newest_list_show'then'show'
> when event_id ='article_newest_list_sight_show'then'realshow'
> when event_id ='article_list_item_click'then'click'else''end) astype,
> `date_parse`(`act_time`, '-MM-dd HH:mm:ss', 'MMdd') as `time`,
> device_id
> from view1
> where event_id in
> ('article_newest_list_show','article_newest_list_sight_show',
> 'article_list_item_click')
> unionall
> select'03'as platform, 'click_total'astype,
> `date_parse`(`act_time`, '-MM-dd HH:mm:ss', 'MMdd') as `time`,
> device_id
> from view1
> where event_id in ('article_list_item_click',
> 'article_auto_video_play_click'))a
> groupby platform, type, `time`;
>
>
> 期待大家的帮助与回复,希望能给些问题排查的思路!
>
>
>
>

-- 

Best,
Benchao Li


Re: KafkaFetcher [] - Committing offsets to Kafka failed.

2021-08-30 Thread Roman Khachatryan
Hi,

I think the preceding message that the consumer is not a member of the
group suggests that there is some connectivity issue.
Perhaps, heartbeats are timing out in which case you might want to
increase session.timeout.ms [1] and heartbeat.interval.ms.

[1]
https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms

Regards,
Roman

On Fri, Aug 27, 2021 at 11:43 AM Qingsheng Ren  wrote:
>
> Hi Hemant,
>
> One possible reason is that another Kafka consumer is using the same consumer 
> group id as the one in FlinkKafkaConsumer. You can try to use another 
> group.id in FlinkKafkaConsumer to validate this.
>
> If it’s not group id’s problem, there are some Kafka consumer metrics [1] 
> that might be helpful for debugging this, such as “time-between-poll-avg”, 
> “heartbeat-rate” and so forth, to check whether it’s poll interval’s problem 
> as suggested by Kafka’s exception. All Kafka consumer metrics are registered 
> under metric group “KafkaConsumer” in Flink’s metric system.
>
> Besides, it might be helpful to set logging level of 
> org.apache.kafka.clients.consumer to DEBUG or TRACE,  which can provide more 
> information about why offset commit is failed.
>
> Hope this can help you~
>
> [1] https://kafka.apache.org/documentation/#consumer_monitoring
>
> --
> Best Regards,
>
> Qingsheng Ren
> Email: renqs...@gmail.com
> On Aug 26, 2021, 10:25 PM +0800, bat man , wrote:
>
> Hi,
>
> I am using flink 12.1 to consume data from kafka in a streaming job. Using 
> the flink-connector-kafka_2.12:1.12.1. Kafka broker version is 2.2.1
>  In logs I see warnings like this -
>
> 2021-08-26 13:36:49,903 WARN 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher [] - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member.
> This means that the time between subsequent calls to poll() was longer than 
> the configured max.poll.interval.ms, which typically implies that the poll 
> loop is spending too much time message processing.
> You can address this either by increasing max.poll.interval.ms or by reducing 
> the maximum size of batches returned in poll() with max.poll.records.
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:840)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:790)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:910)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:890)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:258)
>
> I understand that this might not cause an issue as checkpointing is not 
> impacted, however metrics monitoring might as I am using burrow to monitor 
> group offsets. I have already tried to change below properties in kafka 
> producer configs -
>
> kafkaProps.setProperty("max.poll.interval.ms","90");
> kafkaProps.setProperty("max.poll.records","200");
> kafkaProps.setProperty("heartbeat.interval.ms","1000");
> kafkaProps.setProperty("request.timeout.ms","4");
> kafkaProps.setProperty("session.timeout.ms","1");
> But the warnings are still present in the logs.
>
> In addition I see this error just before this warn -
> ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - 
> [Consumer 

Re: k8S HA mode

2021-08-30 Thread Roman Khachatryan
Hello,

Do I understand correctly that you are using native Kubernetes
deployment in application mode;
and the issue *only* happens if you set kubernetes-jobmanager-replicas
[1] to a value greater than 1?

Does it happen during deployment or at some point while running the job?

Could you share Flink and Kubernetes versions and HA configuration
[2]? (I'm assuming you're using Kubernetes for HA, not ZK).

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#kubernetes-jobmanager-replicas
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/

Regards,
Roman

On Fri, Aug 27, 2021 at 2:31 PM mejri houssem  wrote:
>
> hello i am deploying a flink application cluster with kubernetes HA mode, but 
> i am facing this  recurrent problem and i didn't know how to solve it.
>
> Any help would be appreciated.
>
>
>
> this of the jobManager:
>
> {"@timestamp":"2021-08-27T14:19:42.447+02:00","@version":"1","message":"Exception
>  occurred while renewing lock: Unable to update 
> ConfigMapLock","logger_name":"io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector","thread_name":"pool-4092-thread-1","level":"DEBUG","level_value":1,"stack_trace":"io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException:
>  Unable to update ConfigMapLock\n\tat 
> io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock.update(ConfigMapLock.java:108)\n\tat
>  
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.tryAcquireOrRenew(LeaderElector.java:156)\n\tat
>  
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.renew(LeaderElector.java:120)\n\tat
>  
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$null$1(LeaderElector.java:104)\n\tat
>  
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
> java.util.concurrent.FutureTask.run(FutureT
>  ask.java:266)\n\tat 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
>  
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>  java.lang.Thread.run(Thread.java:748)\nCaused by: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://172.31.64.1/api/v1/namespaces/flink-pushavoo-flink-rec/configmaps/elifibre--jobmanager-leader.
>  Message: Operation cannot be fulfilled on configmaps 
> \"elifibre--jobmanager-leader\": the object 
> has been modified; please apply your changes to the latest version and try 
> again. Received status: Status(apiVersion=v1, code=409, 
> details=StatusDetails(causes=[], gro
>  up=null, kind=configmaps, 
> name=elifibre--jobmanager-leader, 
> retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
> message=Operation cannot be fulfilled on configmaps 
> \"elifibre--jobmanager-leader\": the object 
> has been modified; please apply your changes to the latest version and try 
> again, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Conflict, status=Failure, additionalProperties={}).\n\tat 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:568)\n\tat
>  
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:507)\n\tat
>  
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:471)\n\tat
>  
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:430)\n\tat
>  io.fabric8.kubernet
>  
> es.client.dsl.base.OperationSupport.handleReplace(OperationSupport.java:289)\n\tat
>  
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleReplace(OperationSupport.java:269)\n\tat
>  
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleReplace(BaseOperation.java:820)\n\tat
>  
> io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$1(HasMetadataOperation.java:86)\n\tat
>  
> io.fabric8.kubernetes.api.model.DoneableConfigMap.done(DoneableConfigMap.java:26)\n\tat
>  
> io.fabric8.kubernetes.api.model.DoneableConfigMap.done(DoneableConfigMap.java:5)\n\tat
>  
> io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:92)\n\tat
>  
> io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:36)\n\tat
>  
> io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock.update(ConfigMapLock.java:106)\n\t...
>  

回复: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

2021-08-30 Thread wukon...@foxmail.com
我目前大概会采用 Shuo Cheng 提到的, 使用先sink 到Mysql, 再启一个任务 cdc mysql 中的表,这样能保证插入成功后的数据。

我目前使用的是flink 1.12 版本 如果是多端sink 比如 sink db 同时sink kafka ,flink 在sink db 
失败,依旧会sink kafka 但是会因为异常,发生tm 重启,会根据自定义重启策略,一直到最后 整个job fail over 掉。



wukon...@foxmail.com
 
发件人: 东东
发送时间: 2021-08-30 16:50
收件人: user-zh
主题: Re:Re: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
 
 
 
对于Exactly-Once模式下的checkpoint,如果sink端支持两段式事务,应该就可以做到一个sink失败,整个checkpoint失败的。
 
 
不过JDBC sink支持Exactly-Once是1.13.0以后的事情了,建议检查一下你的版本和配置
 
 
 
 
 
 
 
 
 
在 2021-08-30 16:27:24,"wukon...@foxmail.com"  写道:
>Hi: 
> 我理解这种方式, 目前我只是想让先插入数据到Mysql 然后再通过通知到下游,来查询Mysql 进行数据etl 不知道大家如何通过SQL来实现这一逻辑
>
>
>
>wukon...@foxmail.com
> 
>发件人: Shuo Cheng
>发送时间: 2021-08-30 10:19
>收件人: user-zh
>主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
>Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
>表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
>Kafka sink.
> 
>On 8/26/21, jie han  wrote:
>> HI:
>> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>>
>> 悟空  于2021年8月26日周四 下午1:54写道:
>>
>>> 我目前用的是flink-connector-kafka_2.11和flink-connector-jdbc_2.11,
>>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。
>>> 但是接着sink Kafka 是成功的,Kafka端 我开启了'sink.semantic' = 'exactly-once',
>>> 同时下游consumer 使用--isolation-level read_committed
>>> 读取,依旧能成功读取到数据,说明sink
>>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>>
>>>
>>>
>>>
>>> --原始邮件--
>>> 发件人:
>>>   "user-zh"
>>> <
>>> tsreape...@gmail.com;
>>> 发送时间:2021年8月26日(星期四) 中午1:25
>>> 收件人:"user-zh">>
>>> 主题:Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>>
>>>
>>>
>>> Hi!
>>>
>>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>>> db
>>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>>
>>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>>> Flink CDC connector[1]
>>>
>>> [1] https://github.com/ververica/flink-cdc-connectors
>>>
>>> 悟空 >>
>>>  能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>>>  加入的,然后执行execute()方法
>>> 
>>> 
>>> 
>>> 
>>>  --nbsp;原始邮件nbsp;--
>>>  发件人:
>>> 
>>> "user-zh"
>>> 
>>> <
>>>  fskm...@gmail.comgt;;
>>>  发送时间:nbsp;2021年8月26日(星期四) 中午12:36
>>>  收件人:nbsp;"user-zh">> 
>>>  主题:nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>> 
>>> 
>>> 
>>>  说的是 statement set [1] 吗 ?
>>> 
>>>  [1]
>>> 
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>>> 
>>> 
>>> ;
>>>  悟空 >> 
>>>  gt; hi all:amp;nbsp;
>>>  gt; amp;nbsp; amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
>>> 目前遇到一个问题, 我现在想实现
>>>  在一个事务里 先将kafka
>>>  gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>>>  gt; amp;nbsp; amp;nbsp;语句类似这种:
>>>  gt; amp;nbsp; amp;nbsp;insert into
>>> db_table_sinkamp;nbsp;select *
>>>  fromamp;nbsp;
>>>  gt; kafka_source_table;
>>>  gt; amp;nbsp; amp;nbsp;insert into kafka_table_sink
>>> select * from
>>>  kafka_source_table;
>>>  gt;
>>>  gt;
>>>  gt; amp;nbsp; 请问flink SQL 有实现方式吗?
>>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>>>  程序没有挂掉。
>>


Re:Re: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

2021-08-30 Thread 东东



对于Exactly-Once模式下的checkpoint,如果sink端支持两段式事务,应该就可以做到一个sink失败,整个checkpoint失败的。


不过JDBC sink支持Exactly-Once是1.13.0以后的事情了,建议检查一下你的版本和配置









在 2021-08-30 16:27:24,"wukon...@foxmail.com"  写道:
>Hi: 
> 我理解这种方式, 目前我只是想让先插入数据到Mysql 然后再通过通知到下游,来查询Mysql 进行数据etl 不知道大家如何通过SQL来实现这一逻辑
>
>
>
>wukon...@foxmail.com
> 
>发件人: Shuo Cheng
>发送时间: 2021-08-30 10:19
>收件人: user-zh
>主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
>Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
>表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
>Kafka sink.
> 
>On 8/26/21, jie han  wrote:
>> HI:
>> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>>
>> 悟空  于2021年8月26日周四 下午1:54写道:
>>
>>> 我目前用的是flink-connector-kafka_2.11和flink-connector-jdbc_2.11,
>>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。
>>> 但是接着sink Kafka 是成功的,Kafka端 我开启了'sink.semantic' = 'exactly-once',
>>> 同时下游consumer 使用--isolation-level read_committed
>>> 读取,依旧能成功读取到数据,说明sink
>>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>>
>>>
>>>
>>>
>>> --原始邮件--
>>> 发件人:
>>>   "user-zh"
>>> <
>>> tsreape...@gmail.com;
>>> 发送时间:2021年8月26日(星期四) 中午1:25
>>> 收件人:"user-zh">>
>>> 主题:Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>>
>>>
>>>
>>> Hi!
>>>
>>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>>> db
>>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>>
>>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>>> Flink CDC connector[1]
>>>
>>> [1] https://github.com/ververica/flink-cdc-connectors
>>>
>>> 悟空 >>
>>>  能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>>>  加入的,然后执行execute()方法
>>> 
>>> 
>>> 
>>> 
>>>  --nbsp;原始邮件nbsp;--
>>>  发件人:
>>> 
>>> "user-zh"
>>> 
>>> <
>>>  fskm...@gmail.comgt;;
>>>  发送时间:nbsp;2021年8月26日(星期四) 中午12:36
>>>  收件人:nbsp;"user-zh">> 
>>>  主题:nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>> 
>>> 
>>> 
>>>  说的是 statement set [1] 吗 ?
>>> 
>>>  [1]
>>> 
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>>> 
>>> 
>>> ;
>>>  悟空 >> 
>>>  gt; hi all:amp;nbsp;
>>>  gt; amp;nbsp; amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
>>> 目前遇到一个问题, 我现在想实现
>>>  在一个事务里 先将kafka
>>>  gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>>>  gt; amp;nbsp; amp;nbsp;语句类似这种:
>>>  gt; amp;nbsp; amp;nbsp;insert into
>>> db_table_sinkamp;nbsp;select *
>>>  fromamp;nbsp;
>>>  gt; kafka_source_table;
>>>  gt; amp;nbsp; amp;nbsp;insert into kafka_table_sink
>>> select * from
>>>  kafka_source_table;
>>>  gt;
>>>  gt;
>>>  gt; amp;nbsp; 请问flink SQL 有实现方式吗?
>>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>>>  程序没有挂掉。
>>


Re: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

2021-08-30 Thread wukon...@foxmail.com
Hi: 
 我理解这种方式, 目前我只是想让先插入数据到Mysql 然后再通过通知到下游,来查询Mysql 进行数据etl 不知道大家如何通过SQL来实现这一逻辑



wukon...@foxmail.com
 
发件人: Shuo Cheng
发送时间: 2021-08-30 10:19
收件人: user-zh
主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
Kafka sink.
 
On 8/26/21, jie han  wrote:
> HI:
> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>
> 悟空  于2021年8月26日周四 下午1:54写道:
>
>> 我目前用的是flink-connector-kafka_2.11和flink-connector-jdbc_2.11,
>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。
>> 但是接着sink Kafka 是成功的,Kafka端 我开启了'sink.semantic' = 'exactly-once',
>> 同时下游consumer 使用--isolation-level read_committed
>> 读取,依旧能成功读取到数据,说明sink
>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>
>>
>>
>>
>> --原始邮件--
>> 发件人:
>>   "user-zh"
>> <
>> tsreape...@gmail.com;
>> 发送时间:2021年8月26日(星期四) 中午1:25
>> 收件人:"user-zh">
>> 主题:Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>
>>
>>
>> Hi!
>>
>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>> db
>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>
>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>> Flink CDC connector[1]
>>
>> [1] https://github.com/ververica/flink-cdc-connectors
>>
>> 悟空 >
>>  能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>>  加入的,然后执行execute()方法
>> 
>> 
>> 
>> 
>>  --nbsp;原始邮件nbsp;--
>>  发件人:
>> 
>> "user-zh"
>> 
>> <
>>  fskm...@gmail.comgt;;
>>  发送时间:nbsp;2021年8月26日(星期四) 中午12:36
>>  收件人:nbsp;"user-zh"> 
>>  主题:nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>> 
>> 
>> 
>>  说的是 statement set [1] 吗 ?
>> 
>>  [1]
>> 
>> 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>> 
>> 
>> ;
>>  悟空 > 
>>  gt; hi all:amp;nbsp;
>>  gt; amp;nbsp; amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
>> 目前遇到一个问题, 我现在想实现
>>  在一个事务里 先将kafka
>>  gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>>  gt; amp;nbsp; amp;nbsp;语句类似这种:
>>  gt; amp;nbsp; amp;nbsp;insert into
>> db_table_sinkamp;nbsp;select *
>>  fromamp;nbsp;
>>  gt; kafka_source_table;
>>  gt; amp;nbsp; amp;nbsp;insert into kafka_table_sink
>> select * from
>>  kafka_source_table;
>>  gt;
>>  gt;
>>  gt; amp;nbsp; 请问flink SQL 有实现方式吗?
>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>>  程序没有挂掉。
>


Re: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

2021-08-30 Thread wukon...@foxmail.com
能具体说下如何实现吗? 我用cdc 能实现什么,我现在想让两个Insert Sql 保持到一个事务里, 要么全成功,要么全失败,目前查看Flink 文档 
并没有发现相关的解释



wukon...@foxmail.com
 
发件人: jie han
发送时间: 2021-08-26 21:36
收件人: user-zh
主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
HI:
可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
 
悟空  于2021年8月26日周四 下午1:54写道:
 
> 我目前用的是flink-connector-kafka_2.11和flink-connector-jdbc_2.11,
> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。
> 但是接着sink Kafka 是成功的,Kafka端 我开启了'sink.semantic' = 'exactly-once',
> 同时下游consumer 使用--isolation-level read_committed 读取,依旧能成功读取到数据,说明sink
> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> tsreape...@gmail.com;
> 发送时间:2021年8月26日(星期四) 中午1:25
> 收件人:"user-zh"
> 主题:Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>
>
>
> Hi!
>
> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入 db
> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>
> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
> Flink CDC connector[1]
>
> [1] https://github.com/ververica/flink-cdc-connectors
>
> 悟空 
>  能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>  加入的,然后执行execute()方法
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  fskm...@gmail.comgt;;
>  发送时间:nbsp;2021年8月26日(星期四) 中午12:36
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
> 
> 
> 
>  说的是 statement set [1] 吗 ?
> 
>  [1]
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
> 
> 
> ;
>  悟空  
>  gt; hi all:amp;nbsp;
>  gt; amp;nbsp; amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
> 目前遇到一个问题, 我现在想实现
>  在一个事务里 先将kafka
>  gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>  gt; amp;nbsp; amp;nbsp;语句类似这种:
>  gt; amp;nbsp; amp;nbsp;insert into
> db_table_sinkamp;nbsp;select *
>  fromamp;nbsp;
>  gt; kafka_source_table;
>  gt; amp;nbsp; amp;nbsp;insert into kafka_table_sink
> select * from
>  kafka_source_table;
>  gt;
>  gt;
>  gt; amp;nbsp; 请问flink SQL 有实现方式吗?
> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>  程序没有挂掉。


Re: Flink sql CLI parsing avro format data error 解析avro数据失败

2021-08-30 Thread Timo Walther

Hi,

could it be that there is some corrupt record in your Kafka topic? Maybe 
you can read from a different offset to verify that. In general, I 
cannot spot an obivious mistake in your schema.


Regards,
Timo


On 28.08.21 14:32, Wayne wrote:


i have Apache Avro schema

我的avro schema 如下

|{ "type" : "record", "name" : "KafkaAvroMessage", "namespace" : "xxx", 
"fields" : [ { "name" : "aaa", "type" : "string" }, { "name" : "bbb", 
"type" : [ "null", "string" ], "default" : null },{ "name" : "ccc", 
"type" : [ "null", "string" ] }, { "name" : "ddd", "type" : "string", 
"default" : "" } ] } |


The sql worte is like this
我下的sql如下

|CREATE TABLE xxx ( `aaa` STRING NOT NULL, `bbb` STRING , `ccc` STRING , 
`ddd` STRING NOT NULL ) WITH( ... 'format' = 'avro' ); |


|
|

报错如下

Exception in thread "main" java.lang.RuntimeException: Failed to fetch 
next result at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) 
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) 
at 
org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117) 
at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350) 
at 
org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149) 
at 
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154) 
at com.stubhub.wyane.flink.avro.avroTest4.main(avroTest4.java:50) Caused 
by: java.io.IOException: Failed to fetch job execution result at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169) 
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118) 
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) 
... 6 more Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed. at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) 
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167) 
... 8 more Caused by: 
org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed. at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) 
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) 
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 
at 
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) 
at 
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) 
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:114) 
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:166) 
... 8 more Caused by: org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) 
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) 
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) 
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) 
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) 
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) 
at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) 
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) 
at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) 
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) 
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) 
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) 
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 

flink run 提交任务到yarn 报Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

2021-08-30 Thread Wayne
我的执行命令是
 flink run -m yarn-cluster -yd -yjm 1024m -ytm 1024m -ys 1 -ynm xxx -C 
file:///xxx/flink-connector-kafka_2.12-1.12.2.jar -C 
file:///xxx/flink-sql-avro-1.12.2.jar ...
提交到生产集群上提示 
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:331)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:145)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:517)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) 
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) 
~[?:1.8.0_292]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_292]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_292]
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_292]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_292]
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986) 
~[?:1.8.0_292]
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850) 
~[?:1.8.0_292]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160) 
~[?:1.8.0_292]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) 
~[?:1.8.0_292]
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) 
~[?:1.8.0_292]
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) 
~[?:1.8.0_292]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) 
~[?:1.8.0_292]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) 
~[?:1.8.0_292]
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) 
~[?:1.8.0_292]
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) 
~[?:1.8.0_292]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) 
~[?:1.8.0_292]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) 
~[?:1.8.0_292]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) 
~[?:1.8.0_292]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) 
~[?:1.8.0_292]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:317)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
... 6 more
2


任务可以提交到yarn上,说明 -C 这个命令起作用了,但是为啥yarn上报错,是我这个命令写的不对么

退订

2021-08-30 Thread 罗海芳




| |
罗海芳
|
|
15678617...@163.com
|
签名由网易邮箱大师定制