Re: Kubernetes operator listing jobs TimeoutException

2023-06-07 Thread Shammon FY
Hi Evgeniy,

>From the following exception message:

at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
at
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:469)
at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:392)
at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:306)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$37(RestClusterClient.java:931)
at
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)

It seems that the client tried to submit a job to the flink cluster through
the rest api failed, maybe you need to provide more information such as
config of k8s for the job and community can help better analyze problems.


Best,
Shammon FY

On Wed, Jun 7, 2023 at 11:35 PM Evgeniy Lyutikov 
wrote:

> Hello.
> We use Kubernetes operator 1.4.0, operator serves about 50 jobs, but
> sometimes there are errors in the logs that are reflected in the metrics
> (FlinkDeployment.JmDeploymentStatus.READY.Count). What is the reason for
> such errors?
>
>
> 2023-06-07 15:28:27,601 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][job-name/job-name] Starting reconciliation
> 2023-06-07 15:28:27,602 o.a.f.k.o.s.FlinkResourceContextFactory [INFO
> ][job-name/job-name] Getting service for job-name
> 2023-06-07 15:28:27,602 o.a.f.k.o.o.JobStatusObserver  [INFO
> ][job-name/job-name] Observing job status
> 2023-06-07 15:28:39,623 o.a.f.s.n.i.n.c.AbstractChannel [WARN ]
> Force-closing a channel whose registration task was not accepted by an
> event loop: [id: 0xd494f516]
> java.util.concurrent.RejectedExecutionException: event executor terminated
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
> at
> org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:469)
> at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:392)
> at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:306)
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$null$37(RestClusterClient.java:931)
> at
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
> at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> 2023-06-07 15:28:39,624 o.a.f.s.n.i.n.u.c.D.rejectedExecution [ERROR]
> Failed to submit a listener notification task. Event loop shut down?
> java.util.concurrent.RejectedExecutionException: event executor terminated
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
> at
> 

Re: pyflink1.17 中文乱码

2023-06-07 Thread Shammon FY
Hi,

你是怎么运行的?是不是中文的文件编码格式不对?

Best,
Shammon FY


On Thu, Jun 8, 2023 at 10:07 AM yidan zhao  wrote:

> 可以描述再详细点
>
> 1  于2023年6月7日周三 19:55写道:
> >
> > 老师们好,pyflink运行官网例子 wordcount 。把单词改成中文 乱码
> >
> >
> >
> >
> >
>


Re: pyflink1.17 中文乱码

2023-06-07 Thread yidan zhao
可以描述再详细点

1  于2023年6月7日周三 19:55写道:
>
> 老师们好,pyflink运行官网例子 wordcount 。把单词改成中文 乱码
>
>
>
>
>


Re: [ANNOUNCE] Apache flink-connector-pulsar v3.0.1 released

2023-06-07 Thread Neng Lu
Thank you very much for coordinating this!

I think we also need to release 4.0.1 to fix the pulsar-client OAuth2 issue.

On Wed, Jun 7, 2023 at 1:48 AM Leonard Xu  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> flink-connector-pulsar v3.0.1.
> This release is compatible with Flink 1.16.x series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352640
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Leonard



-- 
Best Regards,
Neng


Kubernetes operator listing jobs TimeoutException

2023-06-07 Thread Evgeniy Lyutikov
Hello.
We use Kubernetes operator 1.4.0, operator serves about 50 jobs, but sometimes 
there are errors in the logs that are reflected in the metrics 
(FlinkDeployment.JmDeploymentStatus.READY.Count). What is the reason for such 
errors?


2023-06-07 15:28:27,601 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][job-name/job-name] Starting reconciliation
2023-06-07 15:28:27,602 o.a.f.k.o.s.FlinkResourceContextFactory [INFO 
][job-name/job-name] Getting service for job-name
2023-06-07 15:28:27,602 o.a.f.k.o.o.JobStatusObserver  [INFO 
][job-name/job-name] Observing job status
2023-06-07 15:28:39,623 o.a.f.s.n.i.n.c.AbstractChannel [WARN ] Force-closing a 
channel whose registration task was not accepted by an event loop: [id: 
0xd494f516]
java.util.concurrent.RejectedExecutionException: event executor terminated
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81)
at 
org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86)
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323)
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155)
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139)
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
at 
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:469)
at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:392)
at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:306)
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$37(RestClusterClient.java:931)
at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2023-06-07 15:28:39,624 o.a.f.s.n.i.n.u.c.D.rejectedExecution [ERROR] Failed to 
submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:499)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
 

PyFlink Error JAR files

2023-06-07 Thread Kadiyala, Ruthvik via user
Hi,

Please find below the code I have been using to consume a Kafka Stream that is 
hosted on confluent. It returns an error regarding the jar files. Please find 
the error below the code snippet. Let me know what I am doing wrong. I am 
running this on Docker with Flink Version: 1.7.1.

Code:

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
import glob
import os
import sys
import logging

# Set up the execution environment
env = StreamExecutionEnvironment.get_execution_environment()

logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

# the sql connector for kafka is used here as it's a fat jar and could avoid 
dependency issues
env.add_jars("file://flink-sql-connector-kafka-1.17.1.jar")
env.add_classpaths("file://flink-sql-connector-kafka-1.17.1.jar")
env.add_jars("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar")
env.add_classpaths("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar")

# Set up the Confluent Cloud Kafka configuration
kafka_config = {
'bootstrap.servers': 'bootstrap-server',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.jaas.config': 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="API_KEY" password="API_SECRET";'
}

topic = 'TOPIC_NAME'

deserialization_schema = JsonRowDeserializationSchema.Builder() \
.type_info(Types.ROW([Types.INT(), Types.STRING()])) \
.build()

# Set up the Kafka consumer properties
consumer_props = {
'bootstrap.servers': kafka_config['bootstrap.servers'],
'security.protocol': kafka_config['security.protocol'],
'sasl.mechanism': kafka_config['sasl.mechanism'],
'sasl.jaas.config': kafka_config['sasl.jaas.config'],
'group.id': 'python-group-1'
}

# Create a Kafka consumer
kafka_consumer = FlinkKafkaConsumer(
topics = topic,  # Kafka topic
deserialization_schema = deserialization_schema,
properties = consumer_props,  # Consumer properties
)
kafka_consumer.set_start_from_earliest()
# Add the Kafka consumer as a source to the execution environment
stream = env.add_source(kafka_consumer)

# Define your data processing logic here
# For example, you can print the stream to the console
stream.print()

# Execute the job
env.execute()

Error:

Traceback (most recent call last):
  File "/home/pyflink/test.py", line 45, in 
kafka_consumer = FlinkKafkaConsumer(
  File 
"/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py",
 line 203, in __init__
j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, 
deserialization_schema,
  File 
"/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py",
 line 161, in _get_kafka_consumer
j_flink_kafka_consumer = j_consumer_clz(topics,
  File "/usr/local/lib/python3.10/dist-packages/pyflink/util/exceptions.py", 
line 185, in wrapped_call
raise TypeError(
TypeError: Could not found the Java class 
'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java 
dependencies could be specified via command line argument '--jarfile' or the 
config option 'pipeline.jars'



Cheers & Regards,
Ruthvik Kadiyala





RE: Parquet decoding exception - Flink 1.16.x

2023-06-07 Thread Kamal Mittal via user
Hello,

Metrices link given in below mail doesn’t give any way to create metrices for 
source function right?

I am using below Flink API to read/decode parquet data, query is where 
exception can be caught for error like “decoding exception” from internal 
parquet API like “AvroParquetReader” and create metrices for corrupt records?

FileSource.FileSourceBuilder source = 
FileSource.forRecordStreamFormat(streamformat, path); //streamformat is of type 
- AvroParquetRecordFormat

Please suggest.

Rgds,
Kamal

From: Martijn Visser 
Sent: 07 June 2023 03:39 PM
To: Kamal Mittal 
Cc: Kamal Mittal via user 
Subject: Re: Raise alarm for corrupt records

Hi Kamal,

Documentation on the metrics can be found at 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/

Best regards,

Martijn

On Wed, Jun 7, 2023 at 10:13 AM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello,

Thanks for quick reply.

I am using parquet encoder/decoder and during decoding if any corrupt record 
comes then need to raise alarm and maintain metrices visible over Flink 
Metrices GUI.

So any custom metrices can be created in Flink? Please give some reference of 
any such documentation.

Rgds,
Kamal

From: Martijn Visser mailto:martijnvis...@apache.org>>
Sent: 07 June 2023 12:31 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: Raise alarm for corrupt records

Hi Kamal,

No, but it should be straightforward to create metrics or events for these 
types of situations and integrate them with your own alerting solution.

Best regards,

Martijn

On Wed, Jun 7, 2023 at 8:25 AM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello Community,

Is there any way Flink provides out of box to raise alarm for corrupt records 
(e.g. due to decoding failure) in between of running data pipeline and send 
this alarm to outside of task manager process?

Rgds,
Kamal


Re: flink on yarn rocksdb内存超用

2023-06-07 Thread Hangxiang Yu
Hi, 目前对RocksDB使用的内存是没有严格限制住的,可以参考这个 ticket:
https://issues.apache.org/jira/browse/FLINK-15532
如果要定位到内存使用情况,可以先看一些粗的Metrics:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics
如果要再细致定位到单 instance 内部 RocksDB 的详细内存使用情况,可能需要用 malloc
的prof工具了,比如Jemalloc的Jeprof:
https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling

On Wed, Jun 7, 2023 at 4:58 PM crazy <2463829...@qq.com.invalid> wrote:

> Hi, 大佬们好,
>   请教下有个应用使用的flink1.13.5 on
> yarn,状态后端用的是rocksdb,任务运行一段时间就会内存超用,把overhead调大一些好像能缓解一些,请问有描述这类问题的相关issue吗?如何定位是哪部分内存超了呢?感谢
>
>
> crazy
> 2463829...@qq.com
>
>
>
> 



-- 
Best,
Hangxiang.


Re: Custom Counter on Flink File Source

2023-06-07 Thread Hang Ruan
Hi, Kirti.

We could find these information in the 1.18 release wiki page[1].

Its timeline is as follows.
Feature Freeze: July 11, 2023, end of business CEST
Release: End of September 2023

Best,
Hang

[1]
https://cwiki.apache.org/confluence/display/FLINK/1.18+Release#id-1.18Release-Summary

Kirti Dhar Upadhyay K  于2023年6月7日周三
15:49写道:

> Thanks Hang.
>
> Any expected date for Flink 1.18.0 release?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>
> *From:* Hang Ruan 
> *Sent:* 07 June 2023 07:34
> *To:* Kirti Dhar Upadhyay K 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Custom Counter on Flink File Source
>
>
>
> Hi, Kirti Dhar Upadhyay K.
>
>
>
> I check the FLIP-274[1]. This issue will be released in the 1.18.0. It is
> not contained in any release now.
>
>
>
> Best,
>
> Hang
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator
>
>
>
> Kirti Dhar Upadhyay K  于2023年6月7日周三
> 02:51写道:
>
> Hi Hang,
>
>
>
> Thanks for reply.
>
> I tried using SplitEnumeratorContext passed in
> AbstractFileSource#createEnumerator but resulted as NullPointerException.
>
> As SplitEnumeratorContext provides its implementation as
> SourceCoordinatorContext having metricGroup() as below-
>
>
>
>
>
> @Override
>
> *public* SplitEnumeratorMetricGroup metricGroup() {
>
> *return* *null*;
>
> }
>
>
>
> Am I doing any mistake?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>
> *From:* Hang Ruan 
> *Sent:* 06 June 2023 08:12
> *To:* Kirti Dhar Upadhyay K 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Custom Counter on Flink File Source
>
>
>
> Hi, Kirti Dhar Upadhyay K.
>
>
>
> We could get the metric group from the context, like `SourceReaderContext`
> and `SplitEnumeratorContext`. These contexts could be found when creating
> readers and enumerators. See `AbstractFileSource#createReader` and
> `AbstractFileSource#createEnumerator`.
>
>
>
> Best,
>
> Hang
>
>
>
> Kirti Dhar Upadhyay K via user  于2023年6月5日周一 22:57
> 写道:
>
> Hi Community,
>
>
>
> I am trying to add a new counter for number of files collected on Flink
> File Source.
>
> Referring the doc
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I
> understand how to add a new counter on any operator.
>
>
>
> *this.*counter *=* *getRuntimeContext().*getMetricGroup*().*counter*(*
> "myCounter"*);*
>
>
>
> But not able to get this RuntimeContext on FileSource.
>
> Can someone give some clue on this?
>
>
>
> Regards,
>
> Kirti Dhar
>
>


pyflink1.17 中文乱码

2023-06-07 Thread 1
老师们好,pyflink运行官网例子 wordcount 。把单词改成中文 乱码







Re: Raise alarm for corrupt records

2023-06-07 Thread Martijn Visser
Hi Kamal,

Documentation on the metrics can be found at
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/

Best regards,

Martijn

On Wed, Jun 7, 2023 at 10:13 AM Kamal Mittal via user 
wrote:

> Hello,
>
>
>
> Thanks for quick reply.
>
>
>
> I am using parquet encoder/decoder and during decoding if any corrupt
> record comes then need to raise alarm and maintain metrices visible over
> Flink Metrices GUI.
>
>
>
> So any custom metrices can be created in Flink? Please give some reference
> of any such documentation.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Martijn Visser 
> *Sent:* 07 June 2023 12:31 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Raise alarm for corrupt records
>
>
>
> Hi Kamal,
>
>
>
> No, but it should be straightforward to create metrics or events for these
> types of situations and integrate them with your own alerting solution.
>
>
>
> Best regards,
>
>
>
> Martijn
>
>
>
> On Wed, Jun 7, 2023 at 8:25 AM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello Community,
>
>
>
> Is there any way Flink provides out of box to raise alarm for corrupt
> records (e.g. due to decoding failure) in between of running data pipeline
> and send this alarm to outside of task manager process?
>
>
>
> Rgds,
>
> Kamal
>
>


[ANNOUNCE] Apache flink-connector-pulsar v3.0.1 released

2023-06-07 Thread Leonard Xu
The Apache Flink community is very happy to announce the release of Apache 
flink-connector-pulsar v3.0.1. 
This release is compatible with Flink 1.16.x series.

Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352640

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Regards,
Leonard

RE: Raise alarm for corrupt records

2023-06-07 Thread Kamal Mittal via user
Hello,

Thanks for quick reply.

I am using parquet encoder/decoder and during decoding if any corrupt record 
comes then need to raise alarm and maintain metrices visible over Flink 
Metrices GUI.

So any custom metrices can be created in Flink? Please give some reference of 
any such documentation.

Rgds,
Kamal

From: Martijn Visser 
Sent: 07 June 2023 12:31 PM
To: Kamal Mittal 
Cc: user@flink.apache.org
Subject: Re: Raise alarm for corrupt records

Hi Kamal,

No, but it should be straightforward to create metrics or events for these 
types of situations and integrate them with your own alerting solution.

Best regards,

Martijn

On Wed, Jun 7, 2023 at 8:25 AM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello Community,

Is there any way Flink provides out of box to raise alarm for corrupt records 
(e.g. due to decoding failure) in between of running data pipeline and send 
this alarm to outside of task manager process?

Rgds,
Kamal


RE: Custom Counter on Flink File Source

2023-06-07 Thread Kirti Dhar Upadhyay K via user
Thanks Hang.
Any expected date for Flink 1.18.0 release?

Regards,
Kirti Dhar

From: Hang Ruan 
Sent: 07 June 2023 07:34
To: Kirti Dhar Upadhyay K 
Cc: user@flink.apache.org
Subject: Re: Custom Counter on Flink File Source

Hi, Kirti Dhar Upadhyay K.

I check the FLIP-274[1]. This issue will be released in the 1.18.0. It is not 
contained in any release now.

Best,
Hang

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator

Kirti Dhar Upadhyay K 
mailto:kirti.k.dhar.upadh...@ericsson.com>> 
于2023年6月7日周三 02:51写道:
Hi Hang,

Thanks for reply.
I tried using SplitEnumeratorContext passed in 
AbstractFileSource#createEnumerator but resulted as NullPointerException.
As SplitEnumeratorContext provides its implementation as 
SourceCoordinatorContext having metricGroup() as below-


@Override
public SplitEnumeratorMetricGroup metricGroup() {
return null;
}

Am I doing any mistake?

Regards,
Kirti Dhar

From: Hang Ruan mailto:ruanhang1...@gmail.com>>
Sent: 06 June 2023 08:12
To: Kirti Dhar Upadhyay K 
mailto:kirti.k.dhar.upadh...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: Custom Counter on Flink File Source

Hi, Kirti Dhar Upadhyay K.

We could get the metric group from the context, like `SourceReaderContext` and 
`SplitEnumeratorContext`. These contexts could be found when creating readers 
and enumerators. See `AbstractFileSource#createReader` and 
`AbstractFileSource#createEnumerator`.

Best,
Hang

Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> 于2023年6月5日周一 22:57写道:
Hi Community,

I am trying to add a new counter for number of files collected on Flink File 
Source.
Referring the doc  
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I 
understand how to add a new counter on any operator.

this.counter = getRuntimeContext().getMetricGroup().counter("myCounter");

But not able to get this RuntimeContext on FileSource.
Can someone give some clue on this?

Regards,
Kirti Dhar


Re: Raise alarm for corrupt records

2023-06-07 Thread Martijn Visser
Hi Kamal,

No, but it should be straightforward to create metrics or events for these
types of situations and integrate them with your own alerting solution.

Best regards,

Martijn

On Wed, Jun 7, 2023 at 8:25 AM Kamal Mittal via user 
wrote:

> Hello Community,
>
>
>
> Is there any way Flink provides out of box to raise alarm for corrupt
> records (e.g. due to decoding failure) in between of running data pipeline
> and send this alarm to outside of task manager process?
>
>
>
> Rgds,
>
> Kamal
>


Raise alarm for corrupt records

2023-06-07 Thread Kamal Mittal via user
Hello Community,

Is there any way Flink provides out of box to raise alarm for corrupt records 
(e.g. due to decoding failure) in between of running data pipeline and send 
this alarm to outside of task manager process?

Rgds,
Kamal