Re: serviceAccount permissions issue for high availability in operator 1.1

2022-09-08 Thread Yang Wang
Since the flink-kubernetes-operator is using native K8s integration[1] by
default, you need to give the permissions of pod and deployment as well as
ConfigMap.

You could find more information about the RBAC here[2].

[1].
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/
[2].
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/operations/rbac/

Best,
Yang

Javier Vegas  于2022年9月7日周三 04:17写道:

> I am migrating a HA standalone Kubernetes app to use the Flink operator.
> The HA store is S3 using IRSA so the app needs to run with a serviceAccount
> that is authorized to access S3. In standalone mode HA worked once I gave
> the account permissions to edit configMaps. But when trying the operator
> with the custom serviceAccount, I am getting this error:
>
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing:
> GET at:
> https://172.20.0.1/apis/apps/v1/namespaces/MYNAMESPACE/deployments/MYAPPNAME.
> Message: Forbidden!Configured service account doesn't have access. Service
> account may have been revoked. deployments.apps "MYAPPNAME" is forbidden:
> User "system:serviceaccount:MYNAMESPACE:MYSERVICEACCOUNT" cannot get
> resource "deployments" in API group "apps" in the namespace "MYNAMESPACE".
>
>
> Does the serviceAccount needs additional permissions beside configMap edit
> to be able to run HA using the operator?
>
> Thanks,
>
> Javier Vegas
>


Re: [Flink 1.15.1 - Application mode native k8s Exception] - Exception occurred while acquiring lock 'ConfigMapLock

2022-09-08 Thread Yang Wang
You are right. Starting multiple JobManagers could help when the pod is
deleted and there's not enough resources in the cluster to start a new one.
For most cases, the JobManager container will be restarted locally without
scheduling a new Kubernetes pod[1].

The "already exists" error comes from the fabric8 Kubernetes-client. It is
somewhat reasonable because a same name ConfigMap might be already created
manually beforehand.
In the Flink use case, we could simply ignore this error.

For the first exception "*Caused by: java.io.FileNotFoundException:
/opt/flink/.kube/config (No such file or directory)*", I think you need to
share the full log file of all the JobManagers.

[1].
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-policy


Best,
Yang


Tamir Sagi  于2022年9月8日周四 14:28写道:

> Hey Yang,
>
> Thank you for fast response.
>
> I get your point but, assuming 3 Job managers are up, in case the leader
> fails, one of the other 2 should become the new leader, no?
>
> If the cluster fails, the new leader should handle that.
>
> Another scenario could be that the Job manager stops(get killed by k8s
> due to memory, CPU limitations, bugs etc...)  while TMs are still
> operating, and the cluster is active. In some cases,  due to resources
> limitation, k8s will not be able to get a new instance right away, until
> auto-scale takes place(The pod remains in pending state). It seems like we
> do achieve resilience by having HA enabled in Native k8s mode.
>
> What do you think?
>
> Given that you are running multiple JobManagers, it does not matter for
> the "already exists" exception during leader election.
>
> ​Should we ignore such error? if so , it should be a warning then
>
> What about the 1st error we encountered regarding the kube/config file
> exception?
>
>
> Thank you so much,
> Best,
> Tamir
>
> --
> *From:* Yang Wang 
> *Sent:* Thursday, September 8, 2022 7:08 AM
> *To:* Tamir Sagi 
> *Cc:* user@flink.apache.org ; Lihi Peretz <
> lihi.per...@niceactimize.com>
> *Subject:* Re: [Flink 1.15.1 - Application mode native k8s Exception] -
> Exception occurred while acquiring lock 'ConfigMapLock
>
>
> *EXTERNAL EMAIL*
>
>
> Given that you are running multiple JobManagers, it does not matter for
> the "already exists" exception during leader election.
>
> BTW, I think running multiple JobManagers does not take enough advantages
> when deploying Flink on Kubernetes. Because a new JobManager will be
> started immediately once the old one crashed.
> And Flink JobManager always needs to recover the job from the latest
> checkpoint no matter how many JobManager are running.
>
> Best,
> Yang
>
> Tamir Sagi  于2022年9月5日周一 21:48写道:
>
> Hey Yang,
>
> The flink-conf.yaml submitted to the cluster does not contain 
> "kubernetes.config.file"
> at all.
> In addition, I verified flink config maps under cluster's namespace do not
> contain "kubernetes.config.file".
>
> In addition, we also noticed the following exception (appears to happen
> sporadically)
>
> 2022-09-04T21:06:35,231][Error] {} [i.f.k.c.e.l.LeaderElector]: Exception
> occurred while acquiring lock 'ConfigMapLock: dev-0-flink-jobs -
> data-agg-events-insertion-cluster-config-map
> (fa3dbbc5-1753-46cd-afaf-0baf8ff0947f)'
> io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException:
> Unable to create ConfigMapLock
>
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure
> executing: POST at:
> https://172.20.0.1/api/v1/namespaces/dev-0-flink-jobs/configmaps.
> Message: configmaps "data-agg-events-insertion-cluster-config-map" already
> exists.
>
> Log file is enclosed.
>
> Thanks,
> Tamir.
>
> --
> *From:* Yang Wang 
> *Sent:* Monday, September 5, 2022 3:03 PM
> *To:* Tamir Sagi 
> *Cc:* user@flink.apache.org ; Lihi Peretz <
> lihi.per...@niceactimize.com>
> *Subject:* Re: [Flink 1.15.1 - Application mode native k8s Exception] -
> Exception occurred while acquiring lock 'ConfigMapLock
>
>
> *EXTERNAL EMAIL*
>
>
> Could you please check whether the "kubernetes.config.file" is configured
> to /opt/flink/.kube/config in the Flink configmap?
> It should be removed before creating the Flink configmap.
>
> Best,
> Yang
>
> Tamir Sagi  于2022年9月4日周日 18:08写道:
>
> Hey All,
>
> We recently updated to Flink 1.15.1. We deploy stream cluster in
> Application mode in Native K8S.(Deployed on Amazon EKS).  The cluster is
> configured with Kubernetes HA Service, Minimum 3 replicas of Job manager
> and pod-template which is configured with topologySpreadConstraints to
> enable distribution across different availability zones.
> HA storage directory is on S3.
>
> The cluster is deployed and running properly, however, after a while we
> noticed the following exception in Job manager instance(the log file is
> enclosed)
>
> 2022-09-04T02:05:33,097][Error] {} [i.f.k.c.e.l.LeaderElector]: Exception
> occurred while acquiring lock 'ConfigMapLock: dev-0-flink-jobs -

Re: RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Salva Alcántara
Yeah, that would be an option. but it would be just nicer if I could simply
skip events which fail to be serialized without prepending any operator to
the sink since, conceptually, that is not really part of the pipeline but
more about handling serialization errors.

If I'm not mistaken, what I'm asking is entirely possible when
reading/deserializing messages (without having to append a filter to
discard invalid messages to be sent downstream). E.g., from the generic
DeserializationSchema interface:


   // To be overridden by the user
   T deserialize(byte[] message) throws IOException;

@PublicEvolving
default void deserialize(byte[] message, Collector out) throws
IOException {
T deserialize = deserialize(message);
if (deserialize != null) {
out.collect(deserialize);
}
}
```

The more specialized interface KafkaDeserializationSchema is implemented
identically:

```
// To be overridden by the user
T deserialize(ConsumerRecord record) throws Exception;

default void deserialize(ConsumerRecord message,
Collector out)
throws Exception {
T deserialized = deserialize(message);
if (deserialized != null) {
out.collect(deserialized);
}
}
```

So, one can simply return `null` in the overridden `deserialize` method and
those messages will be automatically filtered out.

If instead one uses the more recent KafkaRecordDeserializationSchema
interface, then

```
void deserialize(ConsumerRecord record, Collector out)
throws IOException;
```

it's possible to simply not call `out.collect` on those records that want
to be skipped.

In general, this gives a flexibility which is lost when writing/serializing
messages, resulting in a somewhat inconsistent/asymmetric behaviour when
one looks at the KafkaWriter used by the KafkaSink:

```
@Override
public void write(IN element, Context context) throws IOException {
final ProducerRecord record =
recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
currentProducer.send(record, deliveryCallback);
numRecordsSendCounter.inc();
}
```

where it's not possible to skip records if desired. On the other hand it's
not currently possible to pass a custom writer to the KafkaSink with a
different behaviour, e.g.,

```
@Override
public void write(IN element, Context context) throws IOException {
final ProducerRecord record =
recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
if (record != null) { // skip null records
  currentProducer.send(record, deliveryCallback);
  numRecordsSendCounter.inc();
}
}
```

Isn't the above implementation more consistent with the deserializaton case
(and also more powerful) than the current one?


On Thu, Sep 8, 2022 at 10:56 PM Alexander Fedulov 
wrote:

> Can't you add a flatMap function just before the Sink that does exactly
> this verification and filters out everything that is not supposed to be
> sent downstream?
>
> Best,
> Alexander Fedulov
>
> On Thu, Sep 8, 2022 at 6:15 PM Salva Alcántara 
> wrote:
>
>> Sorry I meant do nothing when the serialize method returns null...
>>
>> On 2022/09/08 15:52:48 Salva Alcántara wrote:
>> > I guess one possibility would be to extend/override the `write` method
>> of
>> > the KafkaWriter:
>> >
>> >
>> https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197
>> >
>> > ```
>> > @Override
>> > public void write(IN element, Context context) throws IOException {
>> > final ProducerRecord record =
>> > recordSerializer.serialize(element, kafkaSinkContext,
>> > context.timestamp());
>> > currentProducer.send(record, deliveryCallback);
>> > numRecordsSendCounter.inc();
>> > }
>> > ```
>> >
>> > so that it does nothing when the IN element is null. Would this be the
>> only
>> > way, really?
>> >
>> > On 2022/09/08 10:48:07 Salva Alcántara wrote:
>> > > Hi! Is there a way to skip/discard messages when using the KafkaSink,
>> so
>> > > that if for some reason messages are malformed they can simply be
>> > > discarded? I tried by returning null in the corresponding KafkaWriter
>> but
>> > > that raises an exception:
>> > >
>> > > ```
>> > > java.lang.NullPointerException
>> > > at
>> > >
>> >
>> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
>> > > at
>> > >
>> >
>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
>> > > at
>> > >
>> >
>> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
>> > > at
>> > >
>> >
>> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
>> > > ```
>> > >
>> > > What would be the way to handle this?
>> > >
>> > > On the other 

Re: [Flink Kubernetes Operator] FlinkSessionJob crd spec jarURI

2022-09-08 Thread Yang Wang
Given that the "local://" schema means the jar is available in the
image/container of JobManager, so it could only be supported in the K8s
application mode.

If you configure the jarURI to "file://" schema for session cluster, it
means that this jar file should be available in the
flink-kubernetes-operator container.
You could mount a PV for the flink-kuberentes-operator and then use "*kubectl
cp*" to copy a file into the pod.

Or you could specify a "http://; path for jarURI.

Best,
Yang

Vignesh Kumar Kathiresan via user  于2022年9月7日周三
09:31写道:

> Hi,
>
> Have a session cluster deployed in kubernetes. Trying to submit a job
> following the example given in docs.
>
> When I give
> 1) spec.job.jarURI:
> local:///opt/flink/examples/streaming/StateMachineExample.jar
>
> getting
> Error:  org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'local'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded. For a full list of supported file systems,
> please see
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
> 2) when I change the scheme to file
> spec.job.jarURI:
> file:///opt/flink/examples/streaming/StateMachineExample.jar
>
> getting
>  Error:  java.io.FileNotFoundException:
> /opt/flink/examples/streaming/TopSpeedWindowing.jar (No such file or
> directory)
>
> anything that I am missing. From the docs I can gather that I do not need
> any extra fs plugin for referencing a local file system jar.
>


??????????????null??????????????????????????????

2022-09-08 Thread Asahi Lee
2022-09-09 11:36:42,866 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   
[] - Source: HiveSource-ods_jt_hrs.ods_hrmis_HR_EMPL_Education (1/1) 
(2a68412dab3602a1eeda5a750b308e23) switched from RUNNING to FAILED on 
container_1658144991761_106260_01_02 @ hhny-cdh05 (dataPort=45015).
java.lang.RuntimeException: null
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:91)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: java.lang.NullPointerException
at 
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:118) 
~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:132)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 

Re: RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Alexander Fedulov
Can't you add a flatMap function just before the Sink that does exactly
this verification and filters out everything that is not supposed to be
sent downstream?

Best,
Alexander Fedulov

On Thu, Sep 8, 2022 at 6:15 PM Salva Alcántara 
wrote:

> Sorry I meant do nothing when the serialize method returns null...
>
> On 2022/09/08 15:52:48 Salva Alcántara wrote:
> > I guess one possibility would be to extend/override the `write` method of
> > the KafkaWriter:
> >
> >
> https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197
> >
> > ```
> > @Override
> > public void write(IN element, Context context) throws IOException {
> > final ProducerRecord record =
> > recordSerializer.serialize(element, kafkaSinkContext,
> > context.timestamp());
> > currentProducer.send(record, deliveryCallback);
> > numRecordsSendCounter.inc();
> > }
> > ```
> >
> > so that it does nothing when the IN element is null. Would this be the
> only
> > way, really?
> >
> > On 2022/09/08 10:48:07 Salva Alcántara wrote:
> > > Hi! Is there a way to skip/discard messages when using the KafkaSink,
> so
> > > that if for some reason messages are malformed they can simply be
> > > discarded? I tried by returning null in the corresponding KafkaWriter
> but
> > > that raises an exception:
> > >
> > > ```
> > > java.lang.NullPointerException
> > > at
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> > > at
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> > > at
> > >
> >
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> > > ```
> > >
> > > What would be the way to handle this?
> > >
> > > On the other hand, that seems a bit asymmetric in the sense that when
> > > reading messages, if the deserializer returns null, then that message
> is
> > > simply ignored, see, e.g., from
> > >
> >
> https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html
> > > :
> > >
> > > ```
> > > T deserialize(String topic,
> > >   byte[] data)
> > > Deserialize a record value from a byte array into a value or object.
> > > Parameters:
> > > topic - topic associated with the data
> > > data - serialized bytes; may be null; implementations are recommended
> to
> > > handle null by returning a value or null rather than throwing an
> > exception.
> > > Returns:
> > > deserialized typed data; may be null
> > > ```
> > >
> >
>


Re: Cassandra sink with Flink 1.15

2022-09-08 Thread Lars Skjærven
Thanks !

For reference, solved with mapping to Flink tuples.



On Wed, Sep 7, 2022 at 2:27 PM Chesnay Schepler  wrote:

> Are you running into this in the IDE, or when submitting the job to a
> Flink cluster?
>
> If it is the first, then you're probably affected by the Scala-free Flink
> efforts. Either add an explicit dependency on flink-streaming-scala or
> migrate to Flink tuples.
>
> On 07/09/2022 14:17, Lars Skjærven wrote:
>
> Hello,
>
> When upgrading from 1.14 to 1.15 we bumped into a type issue when
> attempting to sink to Cassandra (scala 2.12.13). This was working nicely in
> 1.14. Any tip is highly appreciated.
>
> Using a MapFunction() to generate the stream of tuples:
>
> CassandraSink
>  .addSink(
> mystream.map(new ToTupleMapper)
>   )...
>
> Exception: No support for the type of the given DataStream:
> GenericType
>
> Or with a lambda function:
>
> CassandraSink
>  .addSink(
> mystream.map((v: MyCaseClass => (v.key v.someLongValue))
>   )...
>
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
> The generic type parameters of 'Tuple2' are missing. In many cases lambda
> methods don't provide enough information for automatic type extraction when
> Java generics are involved. An easy workaround is to use an (anonymous)
> class instead that implements the
> 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise
> the type has to be specified explicitly using type information.
>
>
>
>
>
>


[NOTICE] Blog post regarding Akka's licensing change

2022-09-08 Thread Chesnay Schepler

Hello,

You may have heard about a recent change to the licensing of Akka.
We just published a blog-post regarding this change and what it means 
for Flink.


https://flink.apache.org/news/2022/09/08/akka-license-change.html

TL;DR:

Flink is not in any immediate danger and we will ensure that users are 
not affected by this change.
The licensing of Flink will not change; it will stay Apache-licensed and 
will only contain dependencies that are compatible with it.

We will not use Akka versions with the new license.

Regards,
Chesnay


RE: RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Salva Alcántara
Sorry I meant do nothing when the serialize method returns null...

On 2022/09/08 15:52:48 Salva Alcántara wrote:
> I guess one possibility would be to extend/override the `write` method of
> the KafkaWriter:
>
>
https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197
>
> ```
> @Override
> public void write(IN element, Context context) throws IOException {
> final ProducerRecord record =
> recordSerializer.serialize(element, kafkaSinkContext,
> context.timestamp());
> currentProducer.send(record, deliveryCallback);
> numRecordsSendCounter.inc();
> }
> ```
>
> so that it does nothing when the IN element is null. Would this be the
only
> way, really?
>
> On 2022/09/08 10:48:07 Salva Alcántara wrote:
> > Hi! Is there a way to skip/discard messages when using the KafkaSink, so
> > that if for some reason messages are malformed they can simply be
> > discarded? I tried by returning null in the corresponding KafkaWriter
but
> > that raises an exception:
> >
> > ```
> > java.lang.NullPointerException
> > at
> >
>
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> > at
> >
>
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> > at
> >
>
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> > at
> >
>
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> > ```
> >
> > What would be the way to handle this?
> >
> > On the other hand, that seems a bit asymmetric in the sense that when
> > reading messages, if the deserializer returns null, then that message is
> > simply ignored, see, e.g., from
> >
>
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html
> > :
> >
> > ```
> > T deserialize(String topic,
> >   byte[] data)
> > Deserialize a record value from a byte array into a value or object.
> > Parameters:
> > topic - topic associated with the data
> > data - serialized bytes; may be null; implementations are recommended to
> > handle null by returning a value or null rather than throwing an
> exception.
> > Returns:
> > deserialized typed data; may be null
> > ```
> >
>


RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Salva Alcántara
I guess one possibility would be to extend/override the `write` method of
the KafkaWriter:

https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197

```
@Override
public void write(IN element, Context context) throws IOException {
final ProducerRecord record =
recordSerializer.serialize(element, kafkaSinkContext,
context.timestamp());
currentProducer.send(record, deliveryCallback);
numRecordsSendCounter.inc();
}
```

so that it does nothing when the IN element is null. Would this be the only
way, really?

On 2022/09/08 10:48:07 Salva Alcántara wrote:
> Hi! Is there a way to skip/discard messages when using the KafkaSink, so
> that if for some reason messages are malformed they can simply be
> discarded? I tried by returning null in the corresponding KafkaWriter but
> that raises an exception:
>
> ```
> java.lang.NullPointerException
> at
>
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> at
>
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> at
>
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> at
>
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> ```
>
> What would be the way to handle this?
>
> On the other hand, that seems a bit asymmetric in the sense that when
> reading messages, if the deserializer returns null, then that message is
> simply ignored, see, e.g., from
>
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html
> :
>
> ```
> T deserialize(String topic,
>   byte[] data)
> Deserialize a record value from a byte array into a value or object.
> Parameters:
> topic - topic associated with the data
> data - serialized bytes; may be null; implementations are recommended to
> handle null by returning a value or null rather than throwing an
exception.
> Returns:
> deserialized typed data; may be null
> ```
>


KafkaSource watermarkLag metrics per topic per partition

2022-09-08 Thread Alexey Trenikhun
Hello,
Is there way to configure Flink to expose watermarLag metric per topic per 
partition? I think it could be useful to detect data skew between partitions

Thanks,
Alexey


Re: 触发savepoint后, source算子会从对应offset处停止消费吗?

2022-09-08 Thread yh z
hi, 在我的理解里,savePoint 的作用和 checkPoint 是类似的,只是在 flink 1.16 以前 savePoint
只支持全量的 savePoint,底层都是采用的 barrier 实现机制。但是在1.16的规划文档里(
https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints),savepoint
也将支持增量的模式。
当 savepoint 触发时, source 会去保存状态,是会停止消费的。

郑 致远  于2022年9月8日周四 19:39写道:

> 各位大神好.
> 请教
> savepoint 也是用 barrier机制实现的吗?
> savepoint 触发的时候,  source算子会停止从kafka消费吗?
>


触发savepoint后, source算子会从对应offset处停止消费吗?

2022-09-08 Thread 郑 致远
各位大神好.
请教
savepoint 也是用 barrier机制实现的吗?
savepoint 触发的时候,  source算子会停止从kafka消费吗?


Re: [NOTICE] Switch docker image base to Eclipse Temurin

2022-09-08 Thread Chesnay Schepler

At first glance this might happen when an older docker version is used:

https://github.com/adoptium/temurin-build/issues/2974

You may need to upgrade to Docker 20.10.5+.

On 08/09/2022 12:33, Sigalit Eliazov wrote:

Hi all,

We pulled the new image and we are facing an issue to start the job 
manager pod.
we are using version 1.14.5-java11 and the cluster is started using 
flink operator


the error is

[ERROR] Could not get JVM parameters and dynamic configurations properly.

[ERROR] Raw output from BashJavaUtils:

[0.011s][warning][os,thread] Failed to start thread "VM Thread" - 
pthread_create failed (EPERM) for attributes: stacksize: 1024k, 
guardsize: 4k, detached.


Error occurred during initialization of VM.


we have tried to change the jvm args of by setting -Xms256m -Xmx1g

but it did not help


any guidance will be appreciated

Thanks

Sigalit


On Mon, Sep 5, 2022 at 1:21 PM Chesnay Schepler  
wrote:


* September 7th

On 05/09/2022 11:27, Chesnay Schepler wrote:
> On Wednesday, September 9th, the Flink 1.14.5/1.15.2 Docker images
> will switch bases
>
> FROM openjdk:8/11-jar (Debian-based)
> TO eclipse-temurin:8/11-jre-jammy (Ubuntu-based)
>
> due to the deprecation of the OpenJDK images.
>
> Users that customized the images are advised to check for breaking
> changes.
>
> The source Dockerfile for the new images is available at
>

https://github.com/apache/flink-docker/tree/4794f9425513fb4c0b55ec1efd629e8eb7e5d8c5.

>




Skip malformed messages with the KafkaSink

2022-09-08 Thread Salva Alcántara
Hi! Is there a way to skip/discard messages when using the KafkaSink, so
that if for some reason messages are malformed they can simply be
discarded? I tried by returning null in the corresponding KafkaWriter but
that raises an exception:

```
java.lang.NullPointerException
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
at
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
```

What would be the way to handle this?

On the other hand, that seems a bit asymmetric in the sense that when
reading messages, if the deserializer returns null, then that message is
simply ignored, see, e.g., from
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html
:

```
T deserialize(String topic,
  byte[] data)
Deserialize a record value from a byte array into a value or object.
Parameters:
topic - topic associated with the data
data - serialized bytes; may be null; implementations are recommended to
handle null by returning a value or null rather than throwing an exception.
Returns:
deserialized typed data; may be null
```


Re: [NOTICE] Switch docker image base to Eclipse Temurin

2022-09-08 Thread Sigalit Eliazov
Hi all,

We pulled the new image and we are facing an issue to start the job manager
pod.
we are using version 1.14.5-java11 and the cluster is started using flink
operator

the error is

[ERROR] Could not get JVM parameters and dynamic configurations properly.

[ERROR] Raw output from BashJavaUtils:

[0.011s][warning][os,thread] Failed to start thread "VM Thread" -
pthread_create failed (EPERM) for attributes: stacksize: 1024k, guardsize:
4k, detached.

Error occurred during initialization of VM.


we have tried to change the jvm args of by setting  -Xms256m -Xmx1g

but it did not help


any guidance will be appreciated

Thanks

Sigalit


On Mon, Sep 5, 2022 at 1:21 PM Chesnay Schepler  wrote:

> * September 7th
>
> On 05/09/2022 11:27, Chesnay Schepler wrote:
> > On Wednesday, September 9th, the Flink 1.14.5/1.15.2 Docker images
> > will switch bases
> >
> > FROM openjdk:8/11-jar (Debian-based)
> > TO eclipse-temurin:8/11-jre-jammy (Ubuntu-based)
> >
> > due to the deprecation of the OpenJDK images.
> >
> > Users that customized the images are advised to check for breaking
> > changes.
> >
> > The source Dockerfile for the new images is available at
> >
> https://github.com/apache/flink-docker/tree/4794f9425513fb4c0b55ec1efd629e8eb7e5d8c5.
>
> >
>
>
>


Re:Re: Re: 关于keyby()如何保留原并行度的问题

2022-09-08 Thread haishui
定时器需要keyedStateBackend,所以必须是KeyedStream才能使用定时器。
如果让上游数据不改变subTask可以考虑DataStreamUtils#reinterpretAsKeyedStream方法,这是一个实验功能,见[1]。需要保证原来的DataStream已经是按key分组过的。


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/














在 2022-09-08 16:31:36,"junjie.m...@goupwith.com"  写道:
>
>请问ProcessFunction中有onTimer方法,但是使用时提示必须是KeyedStream才能使用定时器,是否有不用转keyedStream就可以使用Timer的Function类。
>
> 
>发件人: r pp
>发送时间: 2022-09-08 16:14
>收件人: user-zh
>主题: Re: 关于keyby()如何保留原并行度的问题
>keyby() 时,还没有选好分组呢,这个只是告诉flink 要根据什么分组,所有也没有Runtime...
> 
>junjie.m...@goupwith.com  于2022年9月8日周四 14:17写道:
> 
>> hi:
>> flink keyby()时能否获取到subTask的编号,根据编号分组,让上游数据可以继续保持原有的数据依然在同一个subTask中进行后续计算。
>>
>> 在AbstractRichFunction类中this.getRuntimeContext().getIndexOfThisSubtask()可以获取编号,但是keyby()的KeySelector类中没有getRuntimeContext()方法。
>>
>>
> 
>-- 
>Best,
>  pp


Re: Re: 关于keyby()如何保留原并行度的问题

2022-09-08 Thread junjie.m...@goupwith.com

请问ProcessFunction中有onTimer方法,但是使用时提示必须是KeyedStream才能使用定时器,是否有不用转keyedStream就可以使用Timer的Function类。

 
发件人: r pp
发送时间: 2022-09-08 16:14
收件人: user-zh
主题: Re: 关于keyby()如何保留原并行度的问题
keyby() 时,还没有选好分组呢,这个只是告诉flink 要根据什么分组,所有也没有Runtime...
 
junjie.m...@goupwith.com  于2022年9月8日周四 14:17写道:
 
> hi:
> flink keyby()时能否获取到subTask的编号,根据编号分组,让上游数据可以继续保持原有的数据依然在同一个subTask中进行后续计算。
>
> 在AbstractRichFunction类中this.getRuntimeContext().getIndexOfThisSubtask()可以获取编号,但是keyby()的KeySelector类中没有getRuntimeContext()方法。
>
>
 
-- 
Best,
  pp


Re: 关于keyby()如何保留原并行度的问题

2022-09-08 Thread r pp
keyby() 时,还没有选好分组呢,这个只是告诉flink 要根据什么分组,所有也没有Runtime...

junjie.m...@goupwith.com  于2022年9月8日周四 14:17写道:

> hi:
> flink keyby()时能否获取到subTask的编号,根据编号分组,让上游数据可以继续保持原有的数据依然在同一个subTask中进行后续计算。
>
> 在AbstractRichFunction类中this.getRuntimeContext().getIndexOfThisSubtask()可以获取编号,但是keyby()的KeySelector类中没有getRuntimeContext()方法。
>
>

-- 
Best,
  pp


Re: 关于flink table store的疑问

2022-09-08 Thread r pp
应该是为了 流批一体 。不丢数据

Kyle Zhang  于2022年9月8日周四 08:37写道:

> Hi all,
>   看table
> store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目?
>
> Best.
>


-- 
Best,
  pp


Re: [Flink 1.15.1 - Application mode native k8s Exception] - Exception occurred while acquiring lock 'ConfigMapLock

2022-09-08 Thread Tamir Sagi
Hey Yang,

Thank you for fast response.

I get your point but, assuming 3 Job managers are up, in case the leader fails, 
one of the other 2 should become the new leader, no?

If the cluster fails, the new leader should handle that.

Another scenario could be that the Job manager stops(get killed by k8s due to 
memory, CPU limitations, bugs etc...)  while TMs are still operating, and the 
cluster is active. In some cases,  due to resources limitation, k8s will not be 
able to get a new instance right away, until auto-scale takes place(The pod 
remains in pending state). It seems like we do achieve resilience by having HA 
enabled in Native k8s mode.

What do you think?

Given that you are running multiple JobManagers, it does not matter for the 
"already exists" exception during leader election.
​Should we ignore such error? if so , it should be a warning then

What about the 1st error we encountered regarding the kube/config file 
exception?


Thank you so much,
Best,
Tamir



From: Yang Wang 
Sent: Thursday, September 8, 2022 7:08 AM
To: Tamir Sagi 
Cc: user@flink.apache.org ; Lihi Peretz 

Subject: Re: [Flink 1.15.1 - Application mode native k8s Exception] - Exception 
occurred while acquiring lock 'ConfigMapLock


EXTERNAL EMAIL


Given that you are running multiple JobManagers, it does not matter for the 
"already exists" exception during leader election.

BTW, I think running multiple JobManagers does not take enough advantages when 
deploying Flink on Kubernetes. Because a new JobManager will be started 
immediately once the old one crashed.
And Flink JobManager always needs to recover the job from the latest checkpoint 
no matter how many JobManager are running.

Best,
Yang

Tamir Sagi mailto:tamir.s...@niceactimize.com>> 
于2022年9月5日周一 21:48写道:
Hey Yang,

The flink-conf.yaml submitted to the cluster does not contain 
"kubernetes.config.file" at all.
In addition, I verified flink config maps under cluster's namespace do not 
contain "kubernetes.config.file".

In addition, we also noticed the following exception (appears to happen 
sporadically)

2022-09-04T21:06:35,231][Error] {} [i.f.k.c.e.l.LeaderElector]: Exception 
occurred while acquiring lock 'ConfigMapLock: dev-0-flink-jobs - 
data-agg-events-insertion-cluster-config-map 
(fa3dbbc5-1753-46cd-afaf-0baf8ff0947f)'
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException:
 Unable to create ConfigMapLock

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
executing: POST at: 
https://172.20.0.1/api/v1/namespaces/dev-0-flink-jobs/configmaps. Message: 
configmaps "data-agg-events-insertion-cluster-config-map" already exists.

Log file is enclosed.

Thanks,
Tamir.



From: Yang Wang mailto:danrtsey...@gmail.com>>
Sent: Monday, September 5, 2022 3:03 PM
To: Tamir Sagi mailto:tamir.s...@niceactimize.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>; Lihi Peretz 
mailto:lihi.per...@niceactimize.com>>
Subject: Re: [Flink 1.15.1 - Application mode native k8s Exception] - Exception 
occurred while acquiring lock 'ConfigMapLock


EXTERNAL EMAIL


Could you please check whether the "kubernetes.config.file" is configured to 
/opt/flink/.kube/config in the Flink configmap?
It should be removed before creating the Flink configmap.

Best,
Yang

Tamir Sagi mailto:tamir.s...@niceactimize.com>> 
于2022年9月4日周日 18:08写道:
Hey All,

We recently updated to Flink 1.15.1. We deploy stream cluster in Application 
mode in Native K8S.(Deployed on Amazon EKS).  The cluster is configured with 
Kubernetes HA Service, Minimum 3 replicas of Job manager and pod-template which 
is configured with topologySpreadConstraints to enable distribution across 
different availability zones.
HA storage directory is on S3.

The cluster is deployed and running properly, however, after a while we noticed 
the following exception in Job manager instance(the log file is enclosed)

2022-09-04T02:05:33,097][Error] {} [i.f.k.c.e.l.LeaderElector]: Exception 
occurred while acquiring lock 'ConfigMapLock: dev-0-flink-jobs - 
data-agg-events-insertion-cluster-config-map 
(b6da2ae2-ad2b-471c-801e-ea460a348fab)'
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for 
kind: [ConfigMap]  with name: [data-agg-events-insertion-cluster-config-map]  
in namespace: [dev-0-flink-jobs]  failed.
Caused by: java.io.FileNotFoundException: /opt/flink/.kube/config (No such file 
or directory)
  at java.io.FileInputStream.open0(Native Method) ~[?:?]
  at java.io.FileInputStream.open(Unknown Source) ~[?:?]
  at java.io.FileInputStream.(Unknown Source) ~[?:?]
  at 
org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.dataformat.yaml.YAMLFactory.createParser(YAMLFactory.java:354)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 

关于keyby()如何保留原并行度的问题

2022-09-08 Thread junjie.m...@goupwith.com
hi:
flink keyby()时能否获取到subTask的编号,根据编号分组,让上游数据可以继续保持原有的数据依然在同一个subTask中进行后续计算。
在AbstractRichFunction类中this.getRuntimeContext().getIndexOfThisSubtask()可以获取编号,但是keyby()的KeySelector类中没有getRuntimeContext()方法。