disable console output
Hi, I'm having problems when working with flink (local mode) and travis-ci. The console output gives raises to big logs files (>4MB). How can I disable from my Java code (through the Configuration object) the progress messages displayed in console? Thanks, Andres
How to create a stream of data batches
Hi, I'm trying to code some machine learning algorithms on top of flink such as a variational Bayes learning algorithms. Instead of working at a data element level (i.e. using map transformations), it would be far more efficient to work at a "batch of elements" levels (i.e. I get a batch of elements and I produce some output). I could code that using "mapPartition" function. But I can not control the size of the partition, isn't? Is there any way to transform a stream (or DataSet) of elements in a stream (or DataSet) of data batches with the same size? Thanks for your support, Andres
Convergence Criterion in IterativeDataSet
Hi, I trying to implement some machine learning algorithms that involve several iterations until convergence (to a fixed point). My idea is to use a IterativeDataSet with an Aggregator which produces the result (i.e. a set of parameters defining the model). >From the interface "ConvergenceCriterion", I can understand that the convergence criterion only depends on the result of the aggregator in the current iteration (as happens with the DoubleZeroConvergence class). However, it is more usual to test convergence by comparing the result of the aggregator in the current iteration with the result of the aggregator in the previous iteration (one usually stops when both results are similar enough and we have converged to a fixed point). I guess this functionality is not included yet. And this is because the convergence criteria of flink implementations of K-Means and Linear Regression is to stop after a fixed number of iterations. Am I wrong? Regards Andres
Re: Bug broadcasting objects (serialization issue)
Hi, I get a new similar bug when broadcasting a list of integers if this list is made unmodifiable, elements = Collections.unmodifiableList(elements); I include this code to reproduce the result, public class WordCountExample { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?"); List elements = new ArrayList(); elements.add(0); elements = Collections.unmodifiableList(elements); DataSet set = env.fromElements(new TestClass(elements)); DataSet<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .withBroadcastSet(set, "set") .groupBy(0) .sum(1); wordCounts.print(); } public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { for (String word : line.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } public static class TestClass implements Serializable { private static final long serialVersionUID = -2932037991574118651L; List integerList; public TestClass(List integerList){ this.integerList=integerList; } } } Thanks for your support, Andres On 2/9/15 11:17, Andres R. Masegosa wrote: > Hi, > > I get a bug when trying to broadcast a list of integers created with the > primitive "Arrays.asList(...)". > > For example, if you try to run this "wordcount" example, you can > reproduce the bug. > > > public class WordCountExample { > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > DataSet text = env.fromElements( > "Who's there?", > "I think I hear them. Stand, ho! Who's there?"); > > List elements = Arrays.asList(0, 0, 0); > > DataSet set = env.fromElements(new TestClass(elements)); > > DataSet<Tuple2<String, Integer>> wordCounts = text > .flatMap(new LineSplitter()) > .withBroadcastSet(set, "set") > .groupBy(0) > .sum(1); > > wordCounts.print(); > } > > public static class LineSplitter implements FlatMapFunction<String, > Tuple2<String, Integer>> { > @Override > public void flatMap(String line, Collector<Tuple2<String, > Integer>> out) { > for (String word : line.split(" ")) { > out.collect(new Tuple2<String, Integer>(word, 1)); > } > } > } > > public static class TestClass implements Serializable { > private static final long serialVersionUID = -2932037991574118651L; > > List integerList; > public TestClass(List integerList){ > this.integerList=integerList; > } > > > } > } > > > However, if instead of using the primitive "Arrays.asList(...)", we use > instead the ArrayList<> constructor, there is any problem > > > Regards, > Andres >
Bug broadcasting objects (serialization issue)
Hi, I get a bug when trying to broadcast a list of integers created with the primitive "Arrays.asList(...)". For example, if you try to run this "wordcount" example, you can reproduce the bug. public class WordCountExample { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?"); List elements = Arrays.asList(0, 0, 0); DataSet set = env.fromElements(new TestClass(elements)); DataSet> wordCounts = text .flatMap(new LineSplitter()) .withBroadcastSet(set, "set") .groupBy(0) .sum(1); wordCounts.print(); } public static class LineSplitter implements FlatMapFunction > { @Override public void flatMap(String line, Collector > out) { for (String word : line.split(" ")) { out.collect(new Tuple2 (word, 1)); } } } public static class TestClass implements Serializable { private static final long serialVersionUID = -2932037991574118651L; List integerList; public TestClass(List integerList){ this.integerList=integerList; } } } However, if instead of using the primitive "Arrays.asList(...)", we use instead the ArrayList<> constructor, there is any problem Regards, Andres