Hi Timo,
I figured it out, thanks a lot for your help.
Are there any articles detailing the pre-flight and cluster phases? I
couldn't find anything on ci.apache.org/projects/flink and I think this
behaviour should be documented as a warning/note.


On Thu, Oct 22, 2020 at 6:44 PM Timo Walther <twal...@apache.org> wrote:

> Hi Manas,
>
> you can use static variable but you need to make sure that the logic to
> fill the static variable is accessible and executed in all JVMs.
>
> I assume `pipeline.properties` is in your JAR that you submit to the
> cluster right? Then you should be able to access it through a singleton
> pattern instead of a static variable access.
>
> Regards,
> Timo
>
>
> On 22.10.20 14:17, Manas Kale wrote:
> > Sorry, I messed up the code snippet in the earlier mail. The correct one
> > is :
> >
> > public static void main(String[] args) {
> >         Properties prop =new Properties();
> >
> > InputStream is =
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> > prop.load(is);
> >
> > HashMap<String, String> strMap =new HashMap<>();
> >
> > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >
> > new Config(strMap);
> >
> > ...
> >
> > }
> >
> > public class Config {
> >
> > public static StringCONFIG_TOPIC;
> >
> > publicConfig(HashMap<String, String> s) {
> >
> >      CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >
> > }
> >
> > }
> >
> > The value of CONFIG_TOPIC in a minicluster is properly loaded but null
> > when run on a cluster.
> >
> >
> > On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <manaskal...@gmail.com
> > <mailto:manaskal...@gmail.com>> wrote:
> >
> >     Hi Timo,
> >     Thank you for the explanation, I can start to see why I was getting
> >     an exception.
> >     Are you saying that I cannot use static variables at all when trying
> >     to deploy to a cluster? I would like the variables to remain static
> >     and not be instance-bound as they are accessed from multiple classes.
> >     Based on my understanding of what you said, I implemented the
> >     following pattern:
> >
> >     public static void main(String[] args) {
> >             Properties prop =new Properties();
> >
> >     InputStream is =
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> >     prop.load(is);
> >
> >     strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >
> >     new Config(strMap, longMap);
> >
> >     ...
> >
> >     }
> >
> >     public class Config {
> >
> >     public static StringCONFIG_TOPIC;
> >     public static StringCONFIG_KAFKA;
> >
> >     public Config(HashMap<String, String> s) {
> >          CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >          CONFIG_KAFKA = s.get("CONFIG_KAFKA");
> >
> >     }
> >
> >     }
> >
> >     This produces the same issue. With the easier solution that you
> >     listed, are you implying I use multiple instances or a singleton
> >     pattern of some sort?
> >
> >     On Thu, Oct 22, 2020 at 1:23 PM Timo Walther <twal...@apache.org
> >     <mailto:twal...@apache.org>> wrote:
> >
> >         Hi Manas,
> >
> >         you need to make sure to differentiate between what Flink calls
> >         "pre-flight phase" and "cluster phase".
> >
> >         The pre-flight phase is were the pipeline is constructed and all
> >         functions are instantiated. They are then later serialized and
> >         send to
> >         the cluster.
> >
> >         If you are reading your properties file in the `main()` method
> >         and store
> >         something in static variables, the content is available locally
> >         where
> >         the pipeline is constructed (e.g. in the client) but when the
> >         function
> >         instances are send to the cluster. Those static variables are
> fresh
> >         (thus empty) in the cluster JVMs. You need to either make sure
> >         that the
> >         properties file is read from each task manager again, or easier:
> >         pass
> >         the parameters as constructor parameters into the instances such
> >         that
> >         they are shipped together with the function itself.
> >
> >         I hope this helps.
> >
> >         Regards,
> >         Timo
> >
> >
> >         On 22.10.20 09:24, Manas Kale wrote:
> >          > Hi,
> >          > I am trying to write some data to a kafka topic and I have
> >         the following
> >          > situation:
> >          >
> >          > monitorStateStream
> >          >
> >          >     .process(new
> >
>  IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
> >          >
> >          >     /... // Stream that outputs elements of type IDAP2Alarm/
> >          >
> >          > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
> >          > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
> >          >
> >          > private static <T extends IDAP2JSONOutput>
> >         FlinkKafkaProducer<T> getFlinkKafkaProducer(String servers,
> >         String topic) {
> >          >     Properties properties =new Properties();
> >          >     properties.setProperty("bootstrap.servers", servers);
> >          >     return new FlinkKafkaProducer<T>(topic,
> >          >           (element, timestamp) -> element.serializeForKafka(),
> >          >           properties,
> >          >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> >          > }
> >          >
> >          > /*
> >          > This interface is used to indicate that a class may be output
> >         to Kafka.
> >          > Since Kafka treats all
> >          > data as bytes, classes that implement this interface have to
> >         provide an
> >          > implementation for the
> >          > serializeForKafka() method.
> >          > */
> >          > public interface IDAP2JSONOutput {
> >          >
> >          >      // Implement serialization logic in this method.
> >          > ProducerRecord<byte[],byte[]> serializeForKafka();
> >          >
> >          > }
> >          >
> >          > public class IDAP2Alarmextends Tuple5<...>implements
> >         IDAP2JSONOutput{
> >          >
> >          > private final LoggerLOGGER =
> >         LoggerFactory.getLogger(IDAP2Alarm.class);
> >          >
> >          > @Override
> >          > public ProducerRecord<byte[],byte[]> serializeForKafka() {
> >          >      byte[] rawValue;
> >          >      byte[] rawKey;
> >          >      String k = getMonitorFeatureKey().getMonitorName() ;
> >          >      ...
> >          >
> >          >      rawValue = val.getBytes();
> >          >
> >          >      LOGGER.info("value of alarms topic from idap2 alarm : " +
> >          > Config.ALARMS_TOPIC);
> >          >
> >          > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey,
> >         rawValue); // Line 95
> >          > }
> >          >
> >          > }
> >          >
> >          >
> >          > Config.ALARMS_TOPIC is a static string that is read from a
> >         properties
> >          > file. When I run this code on my IDE minicluster, it runs
> >         great with no
> >          > problems. But when I submit it as a jar to the cluster, I get
> >         the
> >          > following error:
> >          >
> >          > Caused by: java.lang.IllegalArgumentException: Topic cannot
> >         be null.
> >          >      at
> >          >
> >
>  
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
> >
> >          > ~[flink_POC-0.1.jar:?]
> >          >      at
> >          >
> >
>  
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
> >
> >          > ~[flink_POC-0.1.jar:?]
> >          > *    at
> >          >
> >
>  flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
> >
> >          > ~[flink_POC-0.1.jar:?]*
> >          >      at
> >          >
> >
>  
> flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
> >
> >          > ~[flink_POC-0.1.jar:?]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
> >
> >          > ~[flink_POC-0.1.jar:?]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> >
> >          > ~[flink_POC-0.1.jar:?]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          > *at
> >          >
> >
>  flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
> >
> >          > ~[flink_POC-0.1.jar:?]*
> >          > *    at
> >          >
> >
>  flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
> >
> >          > ~[flink_POC-0.1.jar:?]*
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at org.apache.flink.streaming.runtime.io
> >         <http://org.apache.flink.streaming.runtime.io>
> >          >
> >         <http://org.apache.flink.streaming.runtime.io
> >.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at org.apache.flink.streaming.runtime.io
> >         <http://org.apache.flink.streaming.runtime.io>
> >          >
> >         <http://org.apache.flink.streaming.runtime.io
> >.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at org.apache.flink.streaming.runtime.io
> >         <http://org.apache.flink.streaming.runtime.io>
> >          >
> >         <http://org.apache.flink.streaming.runtime.io
> >.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >         org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >         org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
> >          >
> >          > Apparently Config.ALARM_TOPIC is being evaluated as null.
> >         Also, the
> >          > LOGGER statement in IDAP2Alarm above is never printed when
> >         running on
> >          > Flink cluster. In order to verify if the correct value of
> >          > Config.ALARM_TOPIC is read from configuration file, I printed
> >         it from
> >          > Config class - and it prints correctly. So my questions are:
> >          >
> >          >   * Why does this work on a minicluster but not when
> >         submitted as a jar
> >          >     to a normal cluster? I am using Flink v1.11.0 in both my
> >         POM file
> >          >     and the cluster runtime.
> >          >   * Why does the LOGGER line not get printed even though
> >         execution
> >          >     definitely reached it (as seen from the stacktrace)?
> >          >
> >          > Thank you,
> >          > Manas Kale
> >
>
>

Reply via email to