Hi Manas,

you can use static variable but you need to make sure that the logic to fill the static variable is accessible and executed in all JVMs.

I assume `pipeline.properties` is in your JAR that you submit to the cluster right? Then you should be able to access it through a singleton pattern instead of a static variable access.

Regards,
Timo


On 22.10.20 14:17, Manas Kale wrote:
Sorry, I messed up the code snippet in the earlier mail. The correct one is :

public static void main(String[] args) {
        Properties prop =new Properties();

InputStream is = 
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
prop.load(is);

HashMap<String, String> strMap =new HashMap<>();

strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

new Config(strMap);

...

}

public class Config {

public static StringCONFIG_TOPIC;

publicConfig(HashMap<String, String> s) {

     CONFIG_TOPIC = s.get("CONFIG_TOPIC");

}

}

The value of CONFIG_TOPIC in a minicluster is properly loaded but null when run on a cluster.


On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <manaskal...@gmail.com <mailto:manaskal...@gmail.com>> wrote:

    Hi Timo,
    Thank you for the explanation, I can start to see why I was getting
    an exception.
    Are you saying that I cannot use static variables at all when trying
    to deploy to a cluster? I would like the variables to remain static
    and not be instance-bound as they are accessed from multiple classes.
    Based on my understanding of what you said, I implemented the
    following pattern:

    public static void main(String[] args) {
            Properties prop =new Properties();

    InputStream is = 
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
    prop.load(is);

    strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

    new Config(strMap, longMap);

    ...

    }

    public class Config {

    public static StringCONFIG_TOPIC;
    public static StringCONFIG_KAFKA;

    public Config(HashMap<String, String> s) {
         CONFIG_TOPIC = s.get("CONFIG_TOPIC");
         CONFIG_KAFKA = s.get("CONFIG_KAFKA");

    }

    }

    This produces the same issue. With the easier solution that you
    listed, are you implying I use multiple instances or a singleton
    pattern of some sort?

    On Thu, Oct 22, 2020 at 1:23 PM Timo Walther <twal...@apache.org
    <mailto:twal...@apache.org>> wrote:

        Hi Manas,

        you need to make sure to differentiate between what Flink calls
        "pre-flight phase" and "cluster phase".

        The pre-flight phase is were the pipeline is constructed and all
        functions are instantiated. They are then later serialized and
        send to
        the cluster.

        If you are reading your properties file in the `main()` method
        and store
        something in static variables, the content is available locally
        where
        the pipeline is constructed (e.g. in the client) but when the
        function
        instances are send to the cluster. Those static variables are fresh
        (thus empty) in the cluster JVMs. You need to either make sure
        that the
        properties file is read from each task manager again, or easier:
        pass
        the parameters as constructor parameters into the instances such
        that
        they are shipped together with the function itself.

        I hope this helps.

        Regards,
        Timo


        On 22.10.20 09:24, Manas Kale wrote:
         > Hi,
         > I am trying to write some data to a kafka topic and I have
        the following
         > situation:
         >
         > monitorStateStream
         >
         >     .process(new
        IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
         >
         >     /... // Stream that outputs elements of type IDAP2Alarm/
         >
         > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
         > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
         >
         > private static <T extends IDAP2JSONOutput>
        FlinkKafkaProducer<T> getFlinkKafkaProducer(String servers,
        String topic) {
         >     Properties properties =new Properties();
         >     properties.setProperty("bootstrap.servers", servers);
         >     return new FlinkKafkaProducer<T>(topic,
         >           (element, timestamp) -> element.serializeForKafka(),
         >           properties,
         >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
         > }
         >
         > /*
         > This interface is used to indicate that a class may be output
        to Kafka.
         > Since Kafka treats all
         > data as bytes, classes that implement this interface have to
        provide an
         > implementation for the
         > serializeForKafka() method.
         > */
         > public interface IDAP2JSONOutput {
         >
         >      // Implement serialization logic in this method.
         > ProducerRecord<byte[],byte[]> serializeForKafka();
         >
         > }
         >
         > public class IDAP2Alarmextends Tuple5<...>implements
        IDAP2JSONOutput{
         >
         > private final LoggerLOGGER =
        LoggerFactory.getLogger(IDAP2Alarm.class);
         >
         > @Override
         > public ProducerRecord<byte[],byte[]> serializeForKafka() {
         >      byte[] rawValue;
         >      byte[] rawKey;
         >      String k = getMonitorFeatureKey().getMonitorName() ;
         >      ...
         >
         >      rawValue = val.getBytes();
         >
         >      LOGGER.info("value of alarms topic from idap2 alarm : " +
         > Config.ALARMS_TOPIC);
         >
         > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey,
        rawValue); // Line 95
         > }
         >
         > }
         >
         >
         > Config.ALARMS_TOPIC is a static string that is read from a
        properties
         > file. When I run this code on my IDE minicluster, it runs
        great with no
         > problems. But when I submit it as a jar to the cluster, I get
        the
         > following error:
         >
         > Caused by: java.lang.IllegalArgumentException: Topic cannot
        be null.
         >      at
         >
        
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)

         > ~[flink_POC-0.1.jar:?]
         >      at
         >
        
org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)

         > ~[flink_POC-0.1.jar:?]
         > *    at
         >
        
flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)

         > ~[flink_POC-0.1.jar:?]*
         >      at
         >
        
flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)

         > ~[flink_POC-0.1.jar:?]
         >      at
         >
        
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)

         > ~[flink_POC-0.1.jar:?]
         >      at
         >
        
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)

         > ~[flink_POC-0.1.jar:?]
         >      at
         >
        
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         > *at
         >
        
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)

         > ~[flink_POC-0.1.jar:?]*
         > *    at
         >
        
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)

         > ~[flink_POC-0.1.jar:?]*
         >      at
         >
        
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at org.apache.flink.streaming.runtime.io
        <http://org.apache.flink.streaming.runtime.io>
         >
        
<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at org.apache.flink.streaming.runtime.io
        <http://org.apache.flink.streaming.runtime.io>
         >
        
<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at org.apache.flink.streaming.runtime.io
        <http://org.apache.flink.streaming.runtime.io>
         >
        
<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
         >
        
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)

         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
        org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at
        org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
         > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
         >      at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
         >
         > Apparently Config.ALARM_TOPIC is being evaluated as null.
        Also, the
         > LOGGER statement in IDAP2Alarm above is never printed when
        running on
         > Flink cluster. In order to verify if the correct value of
         > Config.ALARM_TOPIC is read from configuration file, I printed
        it from
         > Config class - and it prints correctly. So my questions are:
         >
         >   * Why does this work on a minicluster but not when
        submitted as a jar
         >     to a normal cluster? I am using Flink v1.11.0 in both my
        POM file
         >     and the cluster runtime.
         >   * Why does the LOGGER line not get printed even though
        execution
         >     definitely reached it (as seen from the stacktrace)?
         >
         > Thank you,
         > Manas Kale


Reply via email to