Hi,
I am trying to write some data to a kafka topic and I have the following
situation:

monitorStateStream

   .process(new
IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)

   *... // Stream that outputs elements of type IDAP2Alarm*

.addSink(getFlinkKafkaProducer(ALARMS_KAFKA, Config.ALARMS_TOPIC)).name(
ALARM_SINK).uid(ALARM_SINK);

private static <T extends IDAP2JSONOutput> FlinkKafkaProducer<T>
getFlinkKafkaProducer(String servers, String topic) {
   Properties properties = new Properties();
   properties.setProperty("bootstrap.servers", servers);
   return new FlinkKafkaProducer<T>(topic,
         (element, timestamp) -> element.serializeForKafka(),
         properties,
         FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}

/*
    This interface is used to indicate that a class may be output to
Kafka. Since Kafka treats all
    data as bytes, classes that implement this interface have to
provide an implementation for the
    serializeForKafka() method.
 */
public interface IDAP2JSONOutput {

    // Implement serialization logic in this method.
    ProducerRecord<byte[], byte[]> serializeForKafka();

}

public class IDAP2Alarm extends Tuple5<...> implements  IDAP2JSONOutput{

private final Logger LOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);

@Override
public ProducerRecord<byte[], byte[]> serializeForKafka() {
    byte[] rawValue;
    byte[] rawKey;
    String k = getMonitorFeatureKey().getMonitorName() ;
    ...

    rawValue = val.getBytes();

    LOGGER.info("value of alarms topic from idap2 alarm : " +
Config.ALARMS_TOPIC);

    return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey,
rawValue); // Line 95
}

}


Config.ALARMS_TOPIC is a static string that is read from a properties file.
When I run this code on my IDE minicluster, it runs great with no problems.
But when I submit it as a jar to the cluster, I get the following error:

Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(
ProducerRecord.java:71) ~[flink_POC-0.1.jar:?]
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(
ProducerRecord.java:133) ~[flink_POC-0.1.jar:?]
*    at
flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
~[flink_POC-0.1.jar:?]*
    at flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(
StreamingJob.java:62) ~[flink_POC-0.1.jar:?]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:854) ~[flink_POC-0.1.jar:?]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:99) ~[flink_POC-0.1.jar:?]
    at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:56) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:52) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:30) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect
(TimestampedCollector.java:53) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    *at
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
~[flink_POC-0.1.jar:?]*
*    at
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
~[flink_POC-0.1.jar:?]*
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator
.processElement(KeyedProcessOperator.java:85) ~[flink-dist_2.11-1.11.0.jar:
1.11.0]
    at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:161) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.0
.jar:1.11.0]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.0.jar:1.11
.0]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.0.jar:
1.11.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:345) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.0.jar:1.11
.0]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.0.jar:1.11
.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:558) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:530) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]

Apparently Config.ALARM_TOPIC is being evaluated as null. Also, the LOGGER
statement in IDAP2Alarm above is never printed when running on Flink
cluster. In order to verify if the correct value of Config.ALARM_TOPIC is
read from configuration file, I printed it from Config class - and it
prints correctly. So my questions are:

   - Why does this work on a minicluster but not when submitted as a jar to
   a normal cluster? I am using Flink v1.11.0 in both my POM file and the
   cluster runtime.
   - Why does the LOGGER line not get printed even though execution
   definitely reached it (as seen from the stacktrace)?

Thank you,
Manas Kale

Reply via email to