Re: Regarding source Parallelism and usage of coflatmap transformation

2016-05-09 Thread Biplob Biswas
Hi,

Regarding 1) Thanks a lot for the ParallelSourceFunction, i completely
missed that I was using a SourceFunction instead.

Regarding 2) the example works and i can see what is happening there, now
when i increase the parallelism i understand the corresponding change as to
how the data is fed back to the iterator.

What I want to ask next is, is there a way to send back a group of
data-points at once? something like an array of some object? if yes what
would be the type given to 'withFeedbackType' parameter? 

Again thank you so much for such a detailed explanation and example

Regards
Biplob Biswas


Aljoscha Krettek wrote
> Hi,
> regarding 1)  the source needs to implement the ParallelSourceFunction or
> RichParallelSourceFunction interface to allow it to have a higher
> parallelism than 1.
> 
> regarding 2) I wrote a small example that showcases how to do it:
> 
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> 
> DataStream
> 
>  mainInput = env.fromElements("Hello", "Zwei", "drei");
> DataStream
> 
>  initialIterateInput = env.fromElements("bcast 1", "bcast
> 2", "bcast 3");
> 
> 
> IterativeStream.ConnectedIterativeStreams<String, String> iteration
> =
> 
> mainInput.iterate().withFeedbackType(BasicTypeInfo.STRING_TYPE_INFO);
> 
> SingleOutputStreamOperator
> 
>  iterateHead = iteration
> .flatMap(new CoFlatMapFunction<String, String, String>() {
> @Override
> public void flatMap1(String value, Collector
> 
>  out)
> throws Exception {
> Thread.sleep(1000);
> System.out.println("SEEING FROM INPUT 1: " + value);
> 
> out.collect(value);
> }
> 
> @Override
> public void flatMap2(String value, Collector
> 
>  out)
> throws Exception {
> Thread.sleep(1000);
> System.out.println("SEEING FROM INPUT 2: " + value);
> 
> out.collect(value);
> 
> }
> });
> 
> iteration.closeWith(initialIterateInput.broadcast().union(iterateHead.broadcast()));
> 
> iterateHead.map(new MapFunction<String, String>() {
> @Override
> public String map(String value) throws Exception {
> System.out.println("SEEING OUTPUT FROM ITERATION: " + value);
> return value;
> }
> });
> 
> I inserted Thread.sleep(1000) so that you can observe what is happening.
> If
> you remove them it iterates too fast.
> 
> Cheers,
> Aljoscha
> 
> 
> On Thu, 5 May 2016 at 20:43 Biplob Biswas <

> revolutionisme@

> > wrote:
> 
>> Hi,
>>
>> I have 2 different questions, both influencing each other in a way.
>>
>> *1)* I am getting a stream of tuples from a data generator using the
>> following statements,
>> "env.addSource(new DataStreamGenerator(filePath));"
>>
>> This generator reads a line from the file and splits it into different
>> attributes and returns the entire thing as an object.
>> My problem here is that the parallelism of this data source is by default
>> 1
>> and if i force it to change by using setParallelism, i get the error
>> message
>> "Source: 1 is not a parallel source" so when I search for I get this from
>> the flink website
>> "collection data sources can not be executed in parallel ( parallelism =
>> 1)."
>>
>> So my question is, can I read my data source(which is currently a file)
>> in
>> any other such that the parallelism is not restricted to 1?
>>
>> *2)* I need to connect 2 datasources over an iteration, for example :
>> "points.iterate().withFeedbackType(Centroid.class);"
>> and run coflatmap transformation, my question is, can I already broadcast
>> some content of centroid type before the
>> ptct.closewith(Centroid.broadcast()) send the data back to the iterator?
>>
>> For example, I tried this but i cant see anything in the map functions
>> /centroidStream.broadcast();
>> ConnectedIterativeStreams<Point, Centroid> ptct=
>> tuples.iterate().withFeedbackType(MicroCluster.class);
>> DataStream
> 
>  updatedcentroids = ptct.flatMap(new MyCoFlatmap())
>> inputsAndMicroCluster.closeWith(updatedcentroids .broadcast());/
>>
>>
>> but i can't see the centroids already broadcasted by
>> centroidStream.broadcast() in the map functions.
>>
>> Any kind of help is hugely appreciated.
>>
>> Thanks and Regards
>> Biplob Biswas
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-coflatmap-transformation-tp6721.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-broadcast-over-iteration-in-Flink-Streaming-tp6721p6785.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Regarding source Parallelism and usage of coflatmap transformation

2016-05-09 Thread Aljoscha Krettek
Hi,
regarding 1)  the source needs to implement the ParallelSourceFunction or
RichParallelSourceFunction interface to allow it to have a higher
parallelism than 1.

regarding 2) I wrote a small example that showcases how to do it:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStream mainInput = env.fromElements("Hello", "Zwei", "drei");
DataStream initialIterateInput = env.fromElements("bcast 1", "bcast
2", "bcast 3");


IterativeStream.ConnectedIterativeStreams iteration =

mainInput.iterate().withFeedbackType(BasicTypeInfo.STRING_TYPE_INFO);

SingleOutputStreamOperator iterateHead = iteration
.flatMap(new CoFlatMapFunction() {
@Override
public void flatMap1(String value, Collector out)
throws Exception {
Thread.sleep(1000);
System.out.println("SEEING FROM INPUT 1: " + value);

out.collect(value);
}

@Override
public void flatMap2(String value, Collector out)
throws Exception {
Thread.sleep(1000);
System.out.println("SEEING FROM INPUT 2: " + value);

out.collect(value);

}
});

iteration.closeWith(initialIterateInput.broadcast().union(iterateHead.broadcast()));

iterateHead.map(new MapFunction() {
@Override
public String map(String value) throws Exception {
System.out.println("SEEING OUTPUT FROM ITERATION: " + value);
return value;
}
});

I inserted Thread.sleep(1000) so that you can observe what is happening. If
you remove them it iterates too fast.

Cheers,
Aljoscha


On Thu, 5 May 2016 at 20:43 Biplob Biswas  wrote:

> Hi,
>
> I have 2 different questions, both influencing each other in a way.
>
> *1)* I am getting a stream of tuples from a data generator using the
> following statements,
> "env.addSource(new DataStreamGenerator(filePath));"
>
> This generator reads a line from the file and splits it into different
> attributes and returns the entire thing as an object.
> My problem here is that the parallelism of this data source is by default 1
> and if i force it to change by using setParallelism, i get the error
> message
> "Source: 1 is not a parallel source" so when I search for I get this from
> the flink website
> "collection data sources can not be executed in parallel ( parallelism =
> 1)."
>
> So my question is, can I read my data source(which is currently a file) in
> any other such that the parallelism is not restricted to 1?
>
> *2)* I need to connect 2 datasources over an iteration, for example :
> "points.iterate().withFeedbackType(Centroid.class);"
> and run coflatmap transformation, my question is, can I already broadcast
> some content of centroid type before the
> ptct.closewith(Centroid.broadcast()) send the data back to the iterator?
>
> For example, I tried this but i cant see anything in the map functions
> /centroidStream.broadcast();
> ConnectedIterativeStreams ptct=
> tuples.iterate().withFeedbackType(MicroCluster.class);
> DataStream updatedcentroids = ptct.flatMap(new MyCoFlatmap())
> inputsAndMicroCluster.closeWith(updatedcentroids .broadcast());/
>
>
> but i can't see the centroids already broadcasted by
> centroidStream.broadcast() in the map functions.
>
> Any kind of help is hugely appreciated.
>
> Thanks and Regards
> Biplob Biswas
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-coflatmap-transformation-tp6721.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Regarding source Parallelism and usage of coflatmap transformation

2016-05-05 Thread Biplob Biswas
Hi,

I have 2 different questions, both influencing each other in a way.

*1)* I am getting a stream of tuples from a data generator using the
following statements,
"env.addSource(new DataStreamGenerator(filePath));"

This generator reads a line from the file and splits it into different
attributes and returns the entire thing as an object. 
My problem here is that the parallelism of this data source is by default 1
and if i force it to change by using setParallelism, i get the error message
"Source: 1 is not a parallel source" so when I search for I get this from
the flink website
"collection data sources can not be executed in parallel ( parallelism =
1)."

So my question is, can I read my data source(which is currently a file) in
any other such that the parallelism is not restricted to 1?

*2)* I need to connect 2 datasources over an iteration, for example :
"points.iterate().withFeedbackType(Centroid.class);"
and run coflatmap transformation, my question is, can I already broadcast
some content of centroid type before the
ptct.closewith(Centroid.broadcast()) send the data back to the iterator? 

For example, I tried this but i cant see anything in the map functions
/centroidStream.broadcast();
ConnectedIterativeStreams ptct=
tuples.iterate().withFeedbackType(MicroCluster.class);
DataStream updatedcentroids = ptct.flatMap(new MyCoFlatmap())
inputsAndMicroCluster.closeWith(updatedcentroids .broadcast());/


but i can't see the centroids already broadcasted by
centroidStream.broadcast() in the map functions.

Any kind of help is hugely appreciated.

Thanks and Regards
Biplob Biswas



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-coflatmap-transformation-tp6721.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.