Apache Flink + Java 17 error module

2023-11-13 Thread patricia lee
Hi,


I upgraded the project to Flink 1.18.0 and Java 17. I am also using
flink-kafka-connector 3.0.1-1.18 from mvn repository.

However, running it shows error:

Unable to make field private final java.lang.Object[]
java.util.Arrays$ArrayList.a accessible: module java.base does not "opens
java.util" to unnamed module @2d928643.



I already added this VM options in my IntelliJ but error still persists.

--add-opens java.base/java.util=ALL-UNNAMED --add-opens
java.base/java.lang=ALL-UNNAMED


Re: Handling default fields in Avro messages using Flink SQL

2023-11-13 Thread Hang Ruan
Hi, Dale.

I think there are two choices to try.
1. As the reply in #22427[1], use the SQL function `COALESCE`.
2. Modify the code in Avro format by yourself.

There is some work to do for the choice 2. First, you need to pass the
default value in Schema, which does not contain the default value now. Then
you need to modify the AvroRowDataDeserializationSchema to return the
default value when the field is null.

Best,
Hang

[1]  https://issues.apache.org/jira/browse/FLINK-22427

Dale Lane  于2023年11月14日周二 01:33写道:

> I have a Kafka topic with events produced using an Avro schema like this:
>
>
>
> {
>
> "namespace": "demo.avro",
>
> "type": "record",
>
> "name": "MySimplifiedRecreate",
>
> "fields": [
>
> {
>
> "name": "favouritePhrase",
>
> "type": "string",
>
> "default": "Hello World"
>
> },
>
> {
>
> "name": "favouriteNumber",
>
> "type": "int",
>
> "default": 42
>
> },
>
> {
>
> "name": "isItTrue",
>
> "type": "boolean"
>
> }
>
> ]
>
> }
>
>
>
> I want to use the default values in the same way that I do in other Kafka
> consumers. (Specifically, that when a message on the topic is missing a
> value for one of these properties, the default value is used instead).
>
>
>
> e.g.
>
>
>
> CREATE TABLE `simplified-recreate`
>
> (
>
> `favouritePhrase`   STRING DEFAULT 'Hello World',
>
> `favouriteNumber`   INT DEFAULT 42,
>
> `isItTrue`  BOOLEAN NOT NULL
>
> )
>
> WITH (
>
> 'connector' = 'kafka',
>
> 'format' = 'avro',
>
> ...
>
>
>
> As far as I can see, DEFAULT isn’t available in Flink SQL. (Although I can
> see it was considered before in a different context -
> https://issues.apache.org/jira/browse/FLINK-22427 )
>
>
>
> Is there another way to *process events with missing properties where the
> schema identifies the correct default*?
>
>
>
> Kind regards
>
>
>
> Dale
>
>
>
>
>
>
>
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


dependency error with latest Kafka connector

2023-11-13 Thread guenterh.lists

Hello

I'm getting a dependency error when using the latest Kafka connector in 
a Scala project.


Using the 1.17.1 Kafka connector compilation is ok.

With

"org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18"

I get
[error] (update) sbt.librarymanagement.ResolveException: Error 
downloading org.apache.flink:flink-connector-base:

[error]   Not found
[error]   Not found
[error]   not found: 
/home/swissbib/.ivy2/local/org.apache.flink/flink-connector-base/ivys/ivy.xml
[error]   not found: 
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-base//flink-connector-base-.pom


Seems Maven packaging is not correct.

My sbt build file:

ThisBuild / scalaVersion := "3.3.0"
val flinkVersion = "1.18.0"
val postgresVersion = "42.2.2"

lazy val root = (project in file(".")).settings(
  name := "flink-scala-proj",
  libraryDependencies ++= Seq(
    "org.flinkextended" %% "flink-scala-api" % "1.17.1_1.1.0",
    "org.apache.flink" % "flink-clients" % flinkVersion % Provided,
    "org.apache.flink" % "flink-connector-files" % flinkVersion % Provided,

  "org.apache.flink" % "flink-connector-kafka" % "1.17.1",
  //"org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18",

  //"org.apache.flink" % "flink-connector-jdbc" % "3.1.1-1.17",
  //"org.postgresql" % "postgresql" % postgresVersion,
  "org.apache.flink" % "flink-connector-files" % flinkVersion % Provided,
  //"org.apache.flink" % "flink-connector-base" % flinkVersion % Provided
  )
)



Thanks!

--
Günter Hipler
https://openbiblio.social/@vog61
https://twitter.com/vog61



Handling default fields in Avro messages using Flink SQL

2023-11-13 Thread Dale Lane
I have a Kafka topic with events produced using an Avro schema like this:

{
"namespace": "demo.avro",
"type": "record",
"name": "MySimplifiedRecreate",
"fields": [
{
"name": "favouritePhrase",
"type": "string",
"default": "Hello World"
},
{
"name": "favouriteNumber",
"type": "int",
"default": 42
},
{
"name": "isItTrue",
"type": "boolean"
}
]
}

I want to use the default values in the same way that I do in other Kafka 
consumers. (Specifically, that when a message on the topic is missing a value 
for one of these properties, the default value is used instead).

e.g.

CREATE TABLE `simplified-recreate`
(
`favouritePhrase`   STRING DEFAULT 'Hello World',
`favouriteNumber`   INT DEFAULT 42,
`isItTrue`  BOOLEAN NOT NULL
)
WITH (
'connector' = 'kafka',
'format' = 'avro',
...

As far as I can see, DEFAULT isn’t available in Flink SQL. (Although I can see 
it was considered before in a different context - 
https://issues.apache.org/jira/browse/FLINK-22427 )

Is there another way to process events with missing properties where the schema 
identifies the correct default?

Kind regards

Dale




Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


Re: Offset lost with AT_LEAST_ONCE kafka delivery guarantees

2023-11-13 Thread Gabriele Modena
Hey Alexander,

Thanks for the feedback and apologies for my late reply.

This validates my understanding of AT_LEAST_ONCE wrt the kafka producer.

I tried to reproduce the issue, but came back empty handed. As you
pointed out the culprit could be a call to an external,
non-idempotent, api.

I'll follow up might we stumble upon this issue again.

On Thu, Oct 26, 2023 at 9:55 PM Alexander Fedulov
 wrote:
>
> * to clarify: by different output I mean that for the same input message the 
> output message could be slightly smaller due to the abovementioned factors 
> and fall into the allowed size range without causing any failures
>
> On Thu, 26 Oct 2023 at 21:52, Alexander Fedulov  
> wrote:
>>
>> Your expectations are correct. In case of AT_LEAST_ONCE  Flink will wait for 
>> all outstanding records in the Kafka buffers to be acknowledged before 
>> marking the checkpoint successful (=also recording the offsets of the 
>> sources). That said, there might be other factors involved that could lead 
>> to a different output even when reading the same data from the sources - 
>> such as using using processing time (instead of event time) or doing some 
>> sort of lookup calls to external systems. If you absolutely cannot think of 
>> a scenario where this could be the case for your application, please try to 
>> reproduce the error reliably - this is something that needs to be further 
>> looked into.
>>
>> Best,
>> Alexander Fedulov
>>
>> On Mon, 23 Oct 2023 at 19:11, Gabriele Modena  wrote:
>>>
>>> Hey folks,
>>>
>>> We currently run (py) flink 1.17 on k8s (managed by flink k8s
>>> operator), with HA and checkpointing (fixed retries policy). We
>>> produce into Kafka with AT_LEAST_ONCE delivery guarantee.
>>>
>>> Our application failed when trying to produce a message larger than
>>> Kafka's message larger than message.max.bytes. This offset was never
>>> going to be committed, so Flink HA was not able to recover the
>>> application.
>>>
>>> Upon a manual restart, it looks like the offending offset has been
>>> lost: it was not picked after rewinding to the checkpointed offset,
>>> and it was not committed to Kafka. I would have expected this offset
>>> to not have made it past the KafkaProducer commit checkpoint barrier,
>>> and that the app would fail again on it.
>>>
>>> I understand that there are failure scenarios that could end in data
>>> loss when Kafka delivery guarantee is set to EXACTLY_ONCE and kafka
>>> expires an uncommitted transaction.
>>>
>>> However, it's unclear to me if other corner cases would apply to
>>> AT_LEAST_ONCE guarantees. Upon broker failure and app restarts, I
>>> would expect duplicate messages but no data loss. What I can see as a
>>> problem is that this commit was never going to happen.
>>>
>>> Is this expected behaviour? Am I missing something here?
>>>
>>> Cheers,
>>> --
>>> Gabriele Modena (he / him)
>>> Staff Software Engineer
>>> Wikimedia Foundation



-- 
Gabriele Modena (he / him)
Staff Software Engineer
Wikimedia Foundation


Re: Kafka Topic Permissions Failure

2023-11-13 Thread Hang Ruan
Hi, Razin.

It seems like the issue you shared is a different problem from yours. They
have different error messages.
Have you ever tried to consume this topic using the Kafka java client[1] by
yourself to make sure you could access the topic normally?

Best,
Hang

[1] https://developer.confluent.io/get-started/java/#build-consumer

Razin Bouzar via user  于2023年11月11日周六 04:33写道:

> Hello,
>
> We seem to be encountering a bug wherein we see the error
> TopicAuthorizationException: Not authorized to access topics: [topic] for
> a topic wave R/W/D access to the topic and are using the correct principal
> (MTLS auth). The only somewhat related FLINK bug I found was FLINK-27041
> . Some partitions are
> empty, but we are running a newer version of Flink (1.16.1).
>
> Are there any suggestions on what else to check?
>
>
> *Full stack trace:*
>
> java.lang.RuntimeException: One or more fetchers have encountered
> exception
> at org.apache.flink.connector.base.source.reader.fetcher.
> SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
> at org.apache.flink.connector.base.source.reader.SourceReaderBase
> .getNextFetch(SourceReaderBase.java:169)
> at org.apache.flink.connector.base.source.reader.SourceReaderBase
> .pollNext(SourceReaderBase.java:130)
> at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(
> SourceOperator.java:385)
> at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(
> StreamTaskSourceInput.java:68)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:542)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:231)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:831)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:780)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:935)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
> 914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
> unexpected exception while polling the records
> at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
> .runOnce(SplitFetcher.java:150)
> at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(
> SplitFetcher.java:105)
> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors
> .java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1128)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:628)
> ... 1 more
> Caused by: org.apache.kafka.common.errors.TopicAuthorizationException:
>
> --
> RAZIN BOUZAR
> Monitoring Cloud | Salesforce
> Mobile: 317-502-8995
>
> 
>