Just a guess, but probably our logging initialisation changes the global
log level (see conf/log4j.properties). DataStream.collect() executes the
program along with creating a local Flink "cluster" (if you are testing
locally / in an IDE) and initializing logging, among other things.

Please comment the first line out and uncomment the following one to
read like this:
==========
# This affects logging for both user code and Flink
#log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=INFO
==========


Nico

On 13/01/18 13:52, j...@vooght.de wrote:
> Hello,
> I am learning Flink and using the docker image along with the AMIDST
> library for this.
> Below is a sample task from AMIDST which provides INFO output up until I
> reach updateModel(). I pasted the short method as well and wonder what
> prevents the Logger from
> 
>         //Set-up Flink session
>         env = ExecutionEnvironment.getExecutionEnvironment();
>         env.getConfig().disableSysoutLogging();
>         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
> 
>         //generate a random dataset
>         DataFlink<DataInstance> dataFlink = new
> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
> 
>         //Creates a DAG with the NaiveBayes structure for the random
> dataset
>         DAG dag =
> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
> "DiscreteVar4");
>         LOG.info(dag.toString());
> 
>         //Create the Learner object
>         ParameterLearningAlgorithm learningAlgorithmFlink = new
> ParallelMaximumLikelihood();
> 
>         //Learning parameters
>         learningAlgorithmFlink.setBatchSize(10);
>         learningAlgorithmFlink.setDAG(dag);
> 
>         //Initialize the learning process
>         learningAlgorithmFlink.initLearning();
> 
>         //Learn from the flink data
>         LOG.info("BEFORE UPDATEMODEL");
>         learningAlgorithmFlink.updateModel(dataFlink);
>         LOG.info("AFTER UPDATEMODEL");
> 
>         //Print the learnt Bayes Net
>         BayesianNetwork bn =
> learningAlgorithmFlink.getLearntBayesianNetwork();
>         LOG.info(bn.toString());
> 
> 
> Below is the updateModel method.
> 
>     public double updateModel(DataFlink<DataInstance> dataUpdate) {
>         try {
>             Configuration config = new Configuration();
>             config.setString(BN_NAME, this.dag.getName());
>             config.setBytes(EFBN_NAME,
> Serialization.serializeObject(efBayesianNetwork));
> 
>             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>             this.sumSS = dataset.map(new SufficientSatisticsMAP())
>                     .withParameters(config)
>                     .reduce(new SufficientSatisticsReduce())
>                     .collect().get(0);
> 
>             //Add the prior
>             sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
> 
>             JobExecutionResult result =
> dataset.getExecutionEnvironment().getLastJobExecutionResult();
> 
>             numInstances =
> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
> 
>             numInstances++;//Initial counts
> 
>         }catch(Exception ex){
>             throw new UndeclaredThrowableException(ex);
>         }
> 
>         return this.getLogMarginalProbability();
>     }
> 
> 
> Not sure why LOG.info past that method are not output to the console.
> TIA
> JP

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to