Hi Manas,

that is a good point. Feel free to open an issue for this. It is not the first time that your question appeared on the mailing list.

Regards,
Timo

On 23.10.20 07:22, Manas Kale wrote:
Hi Timo,
I figured it out, thanks a lot for your help.
Are there any articles detailing the pre-flight and cluster phases? I couldn't find anything on ci.apache.org/projects/flink <http://ci.apache.org/projects/flink> and I think this behaviour should be documented as a warning/note.


On Thu, Oct 22, 2020 at 6:44 PM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Manas,

    you can use static variable but you need to make sure that the logic to
    fill the static variable is accessible and executed in all JVMs.

    I assume `pipeline.properties` is in your JAR that you submit to the
    cluster right? Then you should be able to access it through a singleton
    pattern instead of a static variable access.

    Regards,
    Timo


    On 22.10.20 14:17, Manas Kale wrote:
     > Sorry, I messed up the code snippet in the earlier mail. The
    correct one
     > is :
     >
     > public static void main(String[] args) {
     >         Properties prop =new Properties();
     >
     > InputStream is =
    Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
     > prop.load(is);
     >
     > HashMap<String, String> strMap =new HashMap<>();
     >
     > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
     >
     > new Config(strMap);
     >
     > ...
     >
     > }
     >
     > public class Config {
     >
     > public static StringCONFIG_TOPIC;
     >
     > publicConfig(HashMap<String, String> s) {
     >
     >      CONFIG_TOPIC = s.get("CONFIG_TOPIC");
     >
     > }
     >
     > }
     >
     > The value of CONFIG_TOPIC in a minicluster is properly loaded but
    null
     > when run on a cluster.
     >
     >
     > On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <manaskal...@gmail.com
    <mailto:manaskal...@gmail.com>
     > <mailto:manaskal...@gmail.com <mailto:manaskal...@gmail.com>>> wrote:
     >
     >     Hi Timo,
     >     Thank you for the explanation, I can start to see why I was
    getting
     >     an exception.
     >     Are you saying that I cannot use static variables at all when
    trying
     >     to deploy to a cluster? I would like the variables to remain
    static
     >     and not be instance-bound as they are accessed from multiple
    classes.
     >     Based on my understanding of what you said, I implemented the
     >     following pattern:
     >
     >     public static void main(String[] args) {
     >             Properties prop =new Properties();
     >
     >     InputStream is =
    Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
     >     prop.load(is);
     >
     >     strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
     >
     >     new Config(strMap, longMap);
     >
     >     ...
     >
     >     }
     >
     >     public class Config {
     >
     >     public static StringCONFIG_TOPIC;
     >     public static StringCONFIG_KAFKA;
     >
     >     public Config(HashMap<String, String> s) {
     >          CONFIG_TOPIC = s.get("CONFIG_TOPIC");
     >          CONFIG_KAFKA = s.get("CONFIG_KAFKA");
     >
     >     }
     >
     >     }
     >
     >     This produces the same issue. With the easier solution that you
     >     listed, are you implying I use multiple instances or a singleton
     >     pattern of some sort?
     >
     >     On Thu, Oct 22, 2020 at 1:23 PM Timo Walther
    <twal...@apache.org <mailto:twal...@apache.org>
     >     <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
     >
     >         Hi Manas,
     >
     >         you need to make sure to differentiate between what Flink
    calls
     >         "pre-flight phase" and "cluster phase".
     >
     >         The pre-flight phase is were the pipeline is constructed
    and all
     >         functions are instantiated. They are then later
    serialized and
     >         send to
     >         the cluster.
     >
     >         If you are reading your properties file in the `main()`
    method
     >         and store
     >         something in static variables, the content is available
    locally
     >         where
     >         the pipeline is constructed (e.g. in the client) but when the
     >         function
     >         instances are send to the cluster. Those static variables
    are fresh
     >         (thus empty) in the cluster JVMs. You need to either make
    sure
     >         that the
     >         properties file is read from each task manager again, or
    easier:
     >         pass
     >         the parameters as constructor parameters into the
    instances such
     >         that
     >         they are shipped together with the function itself.
     >
     >         I hope this helps.
     >
     >         Regards,
     >         Timo
     >
     >
     >         On 22.10.20 09:24, Manas Kale wrote:
     >          > Hi,
     >          > I am trying to write some data to a kafka topic and I have
     >         the following
     >          > situation:
     >          >
     >          > monitorStateStream
     >          >
     >          >     .process(new
>  IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
     >          >
     >          >     /... // Stream that outputs elements of type
    IDAP2Alarm/
     >          >
     >          > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
     >          > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
     >          >
     >          > private static <T extends IDAP2JSONOutput>
     >         FlinkKafkaProducer<T> getFlinkKafkaProducer(String servers,
     >         String topic) {
     >          >     Properties properties =new Properties();
     >          >     properties.setProperty("bootstrap.servers", servers);
     >          >     return new FlinkKafkaProducer<T>(topic,
     >          >           (element, timestamp) ->
    element.serializeForKafka(),
     >          >           properties,
     >          >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
     >          > }
     >          >
     >          > /*
     >          > This interface is used to indicate that a class may be
    output
     >         to Kafka.
     >          > Since Kafka treats all
     >          > data as bytes, classes that implement this interface
    have to
     >         provide an
     >          > implementation for the
     >          > serializeForKafka() method.
     >          > */
     >          > public interface IDAP2JSONOutput {
     >          >
     >          >      // Implement serialization logic in this method.
     >          > ProducerRecord<byte[],byte[]> serializeForKafka();
     >          >
     >          > }
     >          >
     >          > public class IDAP2Alarmextends Tuple5<...>implements
     >         IDAP2JSONOutput{
     >          >
     >          > private final LoggerLOGGER =
     >         LoggerFactory.getLogger(IDAP2Alarm.class);
     >          >
     >          > @Override
     >          > public ProducerRecord<byte[],byte[]> serializeForKafka() {
     >          >      byte[] rawValue;
     >          >      byte[] rawKey;
     >          >      String k = getMonitorFeatureKey().getMonitorName() ;
     >          >      ...
     >          >
     >          >      rawValue = val.getBytes();
     >          >
     >          >      LOGGER.info("value of alarms topic from idap2
    alarm : " +
     >          > Config.ALARMS_TOPIC);
     >          >
     >          > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey,
     >         rawValue); // Line 95
     >          > }
     >          >
     >          > }
     >          >
     >          >
     >          > Config.ALARMS_TOPIC is a static string that is read from a
     >         properties
     >          > file. When I run this code on my IDE minicluster, it runs
     >         great with no
     >          > problems. But when I submit it as a jar to the
    cluster, I get
     >         the
     >          > following error:
     >          >
     >          > Caused by: java.lang.IllegalArgumentException: Topic
    cannot
     >         be null.
     >          >      at
     >          >
>  org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
     >
     >          > ~[flink_POC-0.1.jar:?]
     >          >      at
     >          >
>  org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
     >
     >          > ~[flink_POC-0.1.jar:?]
     >          > *    at
     >          >
>  flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
     >
     >          > ~[flink_POC-0.1.jar:?]*
     >          >      at
     >          >
>  flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
     >
     >          > ~[flink_POC-0.1.jar:?]
     >          >      at
     >          >
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
     >
     >          > ~[flink_POC-0.1.jar:?]
     >          >      at
     >          >
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
     >
     >          > ~[flink_POC-0.1.jar:?]
     >          >      at
     >          >
>  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          > *at
     >          >
>  flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
     >
     >          > ~[flink_POC-0.1.jar:?]*
     >          > *    at
     >          >
>  flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
     >
     >          > ~[flink_POC-0.1.jar:?]*
     >          >      at
     >          >
>  org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at org.apache.flink.streaming.runtime.io
    <http://org.apache.flink.streaming.runtime.io>
     >         <http://org.apache.flink.streaming.runtime.io>
     >          >
>  <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at org.apache.flink.streaming.runtime.io
    <http://org.apache.flink.streaming.runtime.io>
     >         <http://org.apache.flink.streaming.runtime.io>
     >          >
>  <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at org.apache.flink.streaming.runtime.io
    <http://org.apache.flink.streaming.runtime.io>
     >         <http://org.apache.flink.streaming.runtime.io>
     >          >
>  <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >          >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
     >
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at
     >         org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
     >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
     >          >      at java.lang.Thread.run(Thread.java:748)
    ~[?:1.8.0_242]
     >          >
     >          > Apparently Config.ALARM_TOPIC is being evaluated as null.
     >         Also, the
     >          > LOGGER statement in IDAP2Alarm above is never printed when
     >         running on
     >          > Flink cluster. In order to verify if the correct value of
     >          > Config.ALARM_TOPIC is read from configuration file, I
    printed
     >         it from
     >          > Config class - and it prints correctly. So my
    questions are:
     >          >
     >          >   * Why does this work on a minicluster but not when
     >         submitted as a jar
     >          >     to a normal cluster? I am using Flink v1.11.0 in
    both my
     >         POM file
     >          >     and the cluster runtime.
     >          >   * Why does the LOGGER line not get printed even though
     >         execution
     >          >     definitely reached it (as seen from the stacktrace)?
     >          >
     >          > Thank you,
     >          > Manas Kale
     >


Reply via email to