[jira] [Created] (FLINK-35720) flink-connector-kafka fails to restore from checkpoint when using an EXACTLY_ONCE sink

2024-06-28 Thread Nicolas Ferrario (Jira)
Nicolas Ferrario created FLINK-35720:


 Summary: flink-connector-kafka fails to restore from checkpoint 
when using an EXACTLY_ONCE sink
 Key: FLINK-35720
 URL: https://issues.apache.org/jira/browse/FLINK-35720
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
 Environment: Flink 1.17, 1.18, 1.19

Kubernetes

flink-connector-kafka 3.1, 3.2
Reporter: Nicolas Ferrario


Hi team, I noticed that the Kafka Client was changed from version 3.2.4 to to 
3.4.0 in Flink Connector Kafka 3.1.

This introduced a bug that prevents Flink from restoring from a checkpoint due 
to the usage of Reflection to get field {{txnPartitionMap}} when a Producer 
with Exactly Once semantics is used.

Another side-effect is that a downgrade from 3.1 to 3.0.2 (the last working 
version) is not possible, as there was a version increase (from 1 to 2) in the 
Source state serializer, effectively forcing a full bootstrap losing all state.

*All flink-connector-kafka versions starting from 3.1 are affected.*

How to reproduce:
 # Start a pipeline with an EXACTLY ONCE Kafka Sink
 # Wait for a checkpoint
 # Stop the and restore from checkpoint
 # The sink fails to restore state, causing an exception

The code-line causing the exception: 
[https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L310]

POM 3.0: https://github.com/apache/flink-connector-kafka/blob/v3.0/pom.xml#L53

POM 3.1: https://github.com/apache/flink-connector-kafka/blob/v3.1/pom.xml#L54

Exception:
{quote}2024-06-18 19:48:17
java.lang.IllegalStateException: Failed to commit KafkaCommittable
Unknown macro: \{producerId=67946, epoch=0, 
transactionalId=smsc-toolbox-enrichment-toolbox-sms-0-438}
at 
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
at 
org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
at 
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:126)
at 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
at 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
at 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
at 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:285)
at 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:272)
at 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:310)
at 
org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:143)
at 
org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:72)
at java.base/java.util.Optional.orElseGet(Unknown Source)
at 
org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:72)
... 16 more
Caused by: java.lang.NoSuchFieldException: txnPartitionMap
at java.base/java.lang.Class.getDeclaredField(Unknown Source)
at 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:281)
... 22 more
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28057) LD_PRELOAD is hardcoded to x64 on flink-docker

2022-06-14 Thread Nicolas Ferrario (Jira)
Nicolas Ferrario created FLINK-28057:


 Summary: LD_PRELOAD is hardcoded to x64 on flink-docker
 Key: FLINK-28057
 URL: https://issues.apache.org/jira/browse/FLINK-28057
 Project: Flink
  Issue Type: Bug
  Components: flink-docker
Affects Versions: 1.15.0
Reporter: Nicolas Ferrario


Right now docker-entrypoint is using this:

{code:sh}
maybe_enable_jemalloc() {
if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
export LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so
fi
}
{code}

I propose we use this instead:
{code:sh}
maybe_enable_jemalloc() {
if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
# Maybe use export LD_PRELOAD=$LD_PRELOAD:/usr/lib/$(uname 
-i)-linux-gnu/libjemalloc.so
if [[ `uname -i` == 'aarch64' ]]; then
export 
LD_PRELOAD=$LD_PRELOAD:/usr/lib/aarch64-linux-gnu/libjemalloc.so
else
export 
LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so
fi
fi
}
{code}

https://github.com/apache/flink-docker/pull/117



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-21825) AvroInputFormat doesn't honor -1 length on FileInputSplits

2021-03-16 Thread Nicolas Ferrario (Jira)
Nicolas Ferrario created FLINK-21825:


 Summary: AvroInputFormat doesn't honor -1 length on FileInputSplits
 Key: FLINK-21825
 URL: https://issues.apache.org/jira/browse/FLINK-21825
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.12.2
Reporter: Nicolas Ferrario


FileInputSplit documentation says a length of *-1* means that the whole file 
should be read, however AvroInputFormat expects the exact size.

[https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java#L50]
{code:java}
/**
  * Constructs a split with host information.
  *  
  * @param num the number of this input split
  * @param file the file name
  * @param start the position of the first byte in the file to process
  * @param length the number of bytes in the file to process (-1 is flag for 
"read whole file")
  * @param hosts the list of hosts containing the block, possibly 
null
  */
  public FileInputSplit(int num, Path file, long start, long length, String[] 
hosts){code}
[https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java#L141]

 
{code:java}
private DataFileReader initReader(FileInputSplit split) throws IOException {
DatumReader datumReader;

if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
datumReader = new GenericDatumReader();
} else {
datumReader =

org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(
avroValueType)
? new SpecificDatumReader(avroValueType)
: new ReflectDatumReader(avroValueType);
}
if (LOG.isInfoEnabled()) {
LOG.info("Opening split {}", split);
}

SeekableInput in =
new FSDataInputStreamWrapper(
stream,

split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DataFileReader dataFileReader =
(DataFileReader) DataFileReader.openReader(in, datumReader);

if (LOG.isDebugEnabled()) {
LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
}

end = split.getStart() + split.getLength(); <--- THIS LINE
recordsReadSinceLastSync = 0;
return dataFileReader;
}
{code}
This could be fixed either by updating the documentation, or by honoring -1 in 
AvroInputFormat.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)