can flink do streaming from data sources other than Kafka?

2017-09-06 Thread kant kodali
Hi All,

I am wondering if Flink can do streaming from data sources other than
Kafka. For example can Flink do streaming from a database like Cassandra,
HBase, MongoDb to sinks like says Elastic search or Kafka.

Also for out of core stateful streaming. Is RocksDB the only option? Can I
use some other key value store that has SQL interface (since RocksDB
doesn't)?

Thanks,
kant


Additional data read inside dataset transformations

2017-09-06 Thread eSKa

Hello,
I will describe my use case shortly with steps for easier understanding:
1) currently my job is loading data from parquet files using
HadoopInputFormat along with AvroParquetInputFormat, with current approach:
AvroParquetInputFormat inputFormat = new
AvroParquetInputFormat();
AvroParquetInputFormat.setAvroReadSchema(job, schema);
AvroParquetInputFormat.setUnboundRecordFilter(job,
recordFilterClass);
HadoopInputFormat hadoopInputFormat =
HadoopInputs.createHadoopInput(inputFormat, Void.class, GenericRecord.class,
job);
return environment.createInput(hadoopInputFormat);
2) data is loaded into DataSource and after various transformations is
grouped by my "user_id" key,
3) in GroupReduceFunction I am dealing with values for given user,
4) for each group in reduce function I am extracting the key (which has been
used for earlier grouping) and would like to read additional data (parquet
files from HDFS for specific key extracted before), which are required for
further grouped data processing
5) after processing inside reduce function, I would like to store results in
parquet files using AvroParquerWriter class.


My question is how additional data loading inside reduce function (or any
other transformation) can be achieved in step number 4). 
In my perfect scenario I would like to use HadoopInputFormat (just like for
loading initial data in first step), however I am missing environment
context here (probably?). Is there any way to achieve this or this scenarios
is completely wrong and therefore badly designed?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: dynamically partitioned stream

2017-09-06 Thread Martin Eden
Hi Tony,

Yes exactly I am assuming the lambda emits a value only after it has been
published to the control topic (t1) and at least 1 value arrives in the
data topic for each of it's arguments. This will happen at a time t2 > t1.
So yes, there is uncertainty with regards to when t2 will happen. Ideally
t2 - t1 ~ 0 but for our use case it is fine. Is this the correctness that
you are talking about? Do I have the right picture of what happens?

Thanks
M

On Thu, Sep 7, 2017 at 3:11 AM, Tony Wei  wrote:

> Hi Martin,
>
> The performance is an issue, but in your case, yes, it might not be a
> problem if X << N.
>
> However, the other problem is where data should go in the beginning if
> there is no lambda been received. This problem doesn't associate with
> performance, but instead with correctness. If you want to keep the value
> state for the incoming lambda you should broadcast it to all nodes, because
> you would never know where the next lambda that requires this data would be
> routed to. Of course, you can send this data to a pre-defined node and
> route the lambda to this node, but this will lead to all data in the same
> node to let all lambda can get all required data. It is not a good solution
> because of a lack of scalability.
>
> In my origin thought, it is based on only storing state of data after you
> receive at least one lambda that requires it, so that data has its
> destination node to go. Can this assumption be acceptable in your case?
> What do you think?
>
> Best,
> Tony Wei
>
> 2017-09-06 22:41 GMT+08:00 Martin Eden :
>
>> Hi Aljoscha, Tony,
>>
>> We actually do not need all the keys to be on all nodes where lambdas
>> are. We just need the keys that represent the data for the lambda arguments
>> to be routed to the same node as the lambda, whichever one it might be.
>>
>> Essentially in the solution we emit the data multiple times and by doing
>> that we roughly multiply the input rate by the average number of lambdas a
>> key is a part of (X). In terms of memory this is O(X * N) where N is the
>> number of keys int the data. N is the large bit. If X ~ N then we have O
>> (N^2) complexity for the Flink state. And in that case yes I see your point
>> about performance Aljoscha. But if X << N, as is our case, then we have
>> O(N) which should be manageable by Flink's distributed state mechanism
>> right? Do you see any gotchas in this new light? Are my assumptions correct?
>>
>> Thanks,
>> M
>>
>>
>>
>>
>>
>> On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei  wrote:
>>
>>> Hi Martin, Aljoscha
>>>
>>> I think Aljoscha is right. My origin thought was to keep the state only
>>> after a lambda function coming.
>>>
>>> Use Aljoscha's scenario as example, initially, all data will be
>>> discarded because there is no any lambdas. When lambda f1 [D, E] and f2
>>> [A, C] comes, A, C begin to be routed to machine "0" and D, E begin to be
>>> routed to machine "1". Then, when we get a new lambda f3 [C, D], we can
>>> duplicate C, D and route these copies to machine "2".
>>>
>>> However, after reading your example again, I found what you want is a
>>> whole picture for all variables' state in a global view, so that no matter
>>> what time a new lambda comes it can always get its variables' state
>>> immediately. In that case, I have the same opinion as Aljoscha.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek :
>>>
 Hi Martin,

 I think with those requirements this is very hard (or maybe impossible)
 to do efficiently in a distributed setting. It might be that I'm
 misunderstanding things but let's look at an example. Assume that
 initially, we don't have any lambdas, so data can be sent to any machine
 because it doesn't matter where they go. Now, we get a new lambda f2 [A,
 C]. Say this gets routed to machine "0", now this means that messages with
 key A and C also need to be router to machine "0". Now, we get a new lambda
 f1 [D, E], say this gets routed to machine "2", meaning that messages with
 key D and E are also routed to machine "2".

 Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
 lambdas and inputs to different machines? They all have to go to the same
 machine, but which one? I'm currently thinking that there would need to be
 some component that does the routing, but this has to be global, so it's
 hard to do in a distributed setting.

 What do you think?

 Best,
 Aljoscha

 On 1. Sep 2017, at 07:17, Martin Eden  wrote:

 This might be a way forward but since side inputs are not there I will
 try and key the control stream by the keys in the first co flat map.

 I'll see how it goes.

 Thanks guys,
 M

 On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei 
 wrote:

> Hi Martin,
>
> Yes, that is exactly what I thought.
> But the first step also needs to be fulfilled  by SideInput. I'm not
> 

Exception when using keyby operator

2017-09-06 Thread Sridhar Chellappa
I am trying to use the KeyBy operator as follows :


Pattern myEventsCEPPattern =
Pattern.begin("FirstEvent")
.subtype(MyEvent.class)
.next("SecondEvent")
.subtype(MyEvent.class)
.within(Time.hours(1));



PatternStream myEventsPatternStream =
CEP.pattern(
meEvents.keyBy("field1", "field6"),
myEventsCEPPattern
);



When I run the program, I get the following exception:

The program finished with the following exception:

This type (GenericType) cannot be used as key.

org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:330)

org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294)


MyEvent is a POJO. What is that I am doing wrong?


Here is the relevant code :

public abstract class AbstractEvent {
private String field1;
private String field2;
private String field3;
private String field4;
private Timestamp eventTimestmp;

public AbstractEvent(String field1, String field2, String field3,
String field4, Timestamp eventTimestmp) {
this.field1 = field1;
this.field2 = field2;
this.field3 = field3;
this.field4 = field4;
this.eventTimestmp = eventTimestmp;
}

public AbstractEvent() {
}

public String getField1() {
return field1;
}

public AbstractEvent setField1(String field1) {
this.field1 = field1;
return this;
}

public String getField2() {
return field2;
}

public AbstractEvent setField2(String field2) {
this.field2 = field2;
return this;
}

public String getField3() {
return field3;
}

public AbstractEvent setField3(String field3) {
this.field3 = field3;
return this;
}

public String getField4() {
return field4;
}

public AbstractEvent setField4(String field4) {
this.field4 = field4;
return this;
}

public Timestamp getEventTimestmp() {
return eventTimestmp;
}

public AbstractEvent setEventTimestmp(Timestamp eventTimestmp) {
this.eventTimestmp = eventTimestmp;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AbstractEvent)) {
return false;
}

AbstractEvent that = (AbstractEvent) o;

if (!getField1().equals(that.getField1())) {
return false;
}
if (!getField2().equals(that.getField2())) {
return false;
}
if (!getField3().equals(that.getField3())) {
return false;
}
if (!getField4().equals(that.getField4())) {
return false;
}
return getEventTimestmp().equals(that.getEventTimestmp());
}

@SuppressWarnings({"MagicNumber"})
@Override
public int hashCode() {
int result = getField1().hashCode();
result = 31 * result + getField2().hashCode();
result = 31 * result + getField3().hashCode();
result = 31 * result + getField4().hashCode();
result = 31 * result + getEventTimestmp().hashCode();
return result;
}

@Override
public String toString() {
return "AbstractEvent{"
+ "field1='" + field1 + '\''
+ ", field2='" + field2 + '\''
+ ", field3='" + field3 + '\''
+ ", field4='" + field4 + '\''
+ ", eventTimestmp=" + eventTimestmp
+ '}';
}
}


public class MyEvent extends AbstractEvent {

private Timestamp responseTime;
private String field5;
private String field6;
private int field7;
private String field8;
private String field9;

public MyEvent(String field1, String field2, String field3, String
field4, Timestamp eventTimestmp,
   Timestamp responseTime, String
field5, String field6, int field7, String field8,
   String field9) {
super(field1, field2, field3, field4, eventTimestmp);
this.responseTime = responseTime;
this.field5 = field5;
this.field6 = field6;
this.field7 = field7;
this.field8 = field8;
this.field9 = field9;
}

public MyEvent() {
super();
}

public int getField7() {
return field7;
}

public String getField8() {
return field8;
}

public String getField9() {
return field9;
}

public String getField5() {
return field5;
}

public String getField6() {
return field6;
}

public Timestamp getResponseTime() {
return responseTime;
}

public MyEvent setResponseTime(Timestamp respons

Re: dynamically partitioned stream

2017-09-06 Thread Tony Wei
Hi Martin,

The performance is an issue, but in your case, yes, it might not be a
problem if X << N.

However, the other problem is where data should go in the beginning if
there is no lambda been received. This problem doesn't associate with
performance, but instead with correctness. If you want to keep the value
state for the incoming lambda you should broadcast it to all nodes, because
you would never know where the next lambda that requires this data would be
routed to. Of course, you can send this data to a pre-defined node and
route the lambda to this node, but this will lead to all data in the same
node to let all lambda can get all required data. It is not a good solution
because of a lack of scalability.

In my origin thought, it is based on only storing state of data after you
receive at least one lambda that requires it, so that data has its
destination node to go. Can this assumption be acceptable in your case?
What do you think?

Best,
Tony Wei

2017-09-06 22:41 GMT+08:00 Martin Eden :

> Hi Aljoscha, Tony,
>
> We actually do not need all the keys to be on all nodes where lambdas are.
> We just need the keys that represent the data for the lambda arguments to
> be routed to the same node as the lambda, whichever one it might be.
>
> Essentially in the solution we emit the data multiple times and by doing
> that we roughly multiply the input rate by the average number of lambdas a
> key is a part of (X). In terms of memory this is O(X * N) where N is the
> number of keys int the data. N is the large bit. If X ~ N then we have O
> (N^2) complexity for the Flink state. And in that case yes I see your point
> about performance Aljoscha. But if X << N, as is our case, then we have
> O(N) which should be manageable by Flink's distributed state mechanism
> right? Do you see any gotchas in this new light? Are my assumptions correct?
>
> Thanks,
> M
>
>
>
>
>
> On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei  wrote:
>
>> Hi Martin, Aljoscha
>>
>> I think Aljoscha is right. My origin thought was to keep the state only
>> after a lambda function coming.
>>
>> Use Aljoscha's scenario as example, initially, all data will be
>> discarded because there is no any lambdas. When lambda f1 [D, E] and f2
>> [A, C] comes, A, C begin to be routed to machine "0" and D, E begin to be
>> routed to machine "1". Then, when we get a new lambda f3 [C, D], we can
>> duplicate C, D and route these copies to machine "2".
>>
>> However, after reading your example again, I found what you want is a
>> whole picture for all variables' state in a global view, so that no matter
>> what time a new lambda comes it can always get its variables' state
>> immediately. In that case, I have the same opinion as Aljoscha.
>>
>> Best,
>> Tony Wei
>>
>> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek :
>>
>>> Hi Martin,
>>>
>>> I think with those requirements this is very hard (or maybe impossible)
>>> to do efficiently in a distributed setting. It might be that I'm
>>> misunderstanding things but let's look at an example. Assume that
>>> initially, we don't have any lambdas, so data can be sent to any machine
>>> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
>>> C]. Say this gets routed to machine "0", now this means that messages with
>>> key A and C also need to be router to machine "0". Now, we get a new lambda
>>> f1 [D, E], say this gets routed to machine "2", meaning that messages with
>>> key D and E are also routed to machine "2".
>>>
>>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
>>> lambdas and inputs to different machines? They all have to go to the same
>>> machine, but which one? I'm currently thinking that there would need to be
>>> some component that does the routing, but this has to be global, so it's
>>> hard to do in a distributed setting.
>>>
>>> What do you think?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 1. Sep 2017, at 07:17, Martin Eden  wrote:
>>>
>>> This might be a way forward but since side inputs are not there I will
>>> try and key the control stream by the keys in the first co flat map.
>>>
>>> I'll see how it goes.
>>>
>>> Thanks guys,
>>> M
>>>
>>> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei 
>>> wrote:
>>>
 Hi Martin,

 Yes, that is exactly what I thought.
 But the first step also needs to be fulfilled  by SideInput. I'm not
 sure how to achieve this in the current release.

 Best,
 Tony Wei

 Martin Eden 於 2017年8月31日 週四,下午11:32寫道:

> Hi Aljoscha, Tony,
>
> Aljoscha:
> Yes it's the first option you mentioned.
> Yes, the stream has multiple values in flight for A, B, C. f1 needs to
> be applied each time a new value for either A, B or C comes in. So we need
> to use state to cache the latest values. So using the example data stream
> in my first msg the emitted stream should be:
>
> 1. Data Stream:
> KEY VALUE TIME
> .
> .
> .
> C  V66
> B  V6

Re: FLINK-6117 issue work around

2017-09-06 Thread sunny yun
Nico, thank you for your reply. 

I looked at the commit you cherry-picked and nothing in there explains the 
error you got.
==> 
The commit I cherry-picked makes setting of 'zookeeper.sasl.disable' work
correctly. 
I changed flink-dist_2.11-1.2.0.jar according to it. 
So now zookeeper.sasl problem is gone. 
Yes, the error log I posted in the original message is completely different
one. 


Can you verify that nothing of your flink 1.3 tests remains
==> 
Below is what I just reproduced. I have 4 nodes cluster with non-secure. 
After run yarn-session.sh, JM process be created in flink-03 node but TM
process not. 
Standalone works well. 
Any clue would be really appreciate. Thanks. 


[bistel@flink-01 ~]$ jps 
1888 ResourceManager 
2000 NodeManager 
2433 NameNode 
2546 DataNode 
2754 SecondaryNameNode 
2891 Jps 
1724 QuorumPeerMain 

[bistel@flink-02 ~]$ jps 
2018 Jps 
1721 NodeManager 
1881 DataNode 
1515 QuorumPeerMain 

[bistel@flink-03 ~]$ jps 
1521 QuorumPeerMain 
1975 Jps 
1724 NodeManager 
1885 DataNode 

[bistel@flink-04 ~]$ jps 
2090 Jps 
1515 QuorumPeerMain 
1789 NodeManager 
1950 DataNode 

[bistel@flink-01 ~]$ /usr/local/flink-1.2.0/bin/yarn-session.sh -n 4 
2017-09-07 09:49:35,467 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, flink-01 
2017-09-07 09:49:35,468 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123 
2017-09-07 09:49:35,468 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.mb, 4096 
2017-09-07 09:49:35,468 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.mb, 8192 
2017-09-07 09:49:35,468 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 4 
2017-09-07 09:49:35,469 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.memory.preallocate, false 
2017-09-07 09:49:35,469 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 4 
2017-09-07 09:49:35,469 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.web.port, 8081 
2017-09-07 09:49:35,469 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: fs.hdfs.hadoopconf, /usr/local/hadoop/etc/hadoop/ 
2017-09-07 09:49:35,470 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability, zookeeper 
2017-09-07 09:49:35,470 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.quorum,
flink-01:2181,flink-02:2181,flink-03:2181,flink-04:2181 
2017-09-07 09:49:35,470 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.path.root, /flink 
2017-09-07 09:49:35,470 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.path.namespace,
/cluster_one 
2017-09-07 09:49:35,470 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.storageDir,
hdfs:///flink/recovery 
2017-09-07 09:49:35,470 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: yarn.application-attempts, 10 
2017-09-07 09:49:35,470 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: yarn.containers.vcores, 20 
2017-09-07 09:49:35,471 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: yarn.application-master.env.LD_LIBRARY_PATH,
/opt/tibco/TIBRV/8.0/lib 
2017-09-07 09:49:35,471 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: yarn.taskmanager.env.LD_LIBRARY_PATH,
/opt/tibco/TIBRV/8.0/lib 
2017-09-07 09:49:35,471 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: zookeeper.sasl.disable, true 
2017-09-07 09:49:35,662 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, flink-01 
2017-09-07 09:49:35,662 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123 
2017-09-07 09:49:35,662 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.mb, 4096 
2017-09-07 09:49:35,663 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.mb, 8

Re: FLINK-6117 issue work around

2017-09-06 Thread sunny yun
Nico, thank you for your reply./I looked at the commit you cherry-picked and
nothing in there explains theerror you got./==> The commit I cherry-picked
makes setting of 'zookeeper.sasl.disable' work correctly. I changed
flink-dist_2.11-1.2.0.jar according to it.So now zookeeper.sasl problem is
gone.Yes, the error log I posted in the original message is completely
different one./Can you verify that nothing of your flink 1.3 tests
remains/==> Below is what I just reproduced. I have 4 nodes cluster with
non-secure.After run yarn-session.sh, JM process be created in flink-03 node
but TM process not.Standalone works well.Any clue would be really
appreciate. Thanks.[bistel@flink-01 ~]$ jps1888 ResourceManager2000
NodeManager2433 NameNode2546 DataNode2754 SecondaryNameNode2891 Jps1724
QuorumPeerMain[bistel@flink-02 ~]$ jps2018 Jps1721 NodeManager1881
DataNode1515 QuorumPeerMain[bistel@flink-03 ~]$ jps1521 QuorumPeerMain1975
Jps1724 NodeManager1885 DataNode[bistel@flink-04 ~]$ jps2090 Jps1515
QuorumPeerMain1789 NodeManager1950 DataNode[bistel@flink-01 ~]$
/usr/local/flink-1.2.0/bin/yarn-session.sh -n 42017-09-07 09:49:35,467 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, flink-012017-09-07
09:49:35,468 INFO  org.apache.flink.configuration.GlobalConfiguration   
- Loading configuration property: jobmanager.rpc.port, 61232017-09-07
09:49:35,468 INFO  org.apache.flink.configuration.GlobalConfiguration   
- Loading configuration property: jobmanager.heap.mb, 40962017-09-07
09:49:35,468 INFO  org.apache.flink.configuration.GlobalConfiguration   
- Loading configuration property: taskmanager.heap.mb, 81922017-09-07
09:49:35,468 INFO  org.apache.flink.configuration.GlobalConfiguration   
- Loading configuration property: taskmanager.numberOfTaskSlots, 42017-09-07
09:49:35,469 INFO  org.apache.flink.configuration.GlobalConfiguration   
- Loading configuration property: taskmanager.memory.preallocate,
false2017-09-07 09:49:35,469 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 42017-09-07 09:49:35,469 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.web.port, 80812017-09-07 09:49:35,469
INFO  org.apache.flink.configuration.GlobalConfiguration-
Loading configuration property: fs.hdfs.hadoopconf,
/usr/local/hadoop/etc/hadoop/2017-09-07 09:49:35,470 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability, zookeeper2017-09-07 09:49:35,470
INFO  org.apache.flink.configuration.GlobalConfiguration-
Loading configuration property: high-availability.zookeeper.quorum,
flink-01:2181,flink-02:2181,flink-03:2181,flink-04:21812017-09-07
09:49:35,470 INFO  org.apache.flink.configuration.GlobalConfiguration   
- Loading configuration property: high-availability.zookeeper.path.root,
/flink2017-09-07 09:49:35,470 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.path.namespace,
/cluster_one2017-09-07 09:49:35,470 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: high-availability.zookeeper.storageDir,
hdfs:///flink/recovery2017-09-07 09:49:35,470 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: yarn.application-attempts, 102017-09-07 09:49:35,470
INFO  org.apache.flink.configuration.GlobalConfiguration-
Loading configuration property: yarn.containers.vcores, 202017-09-07
09:49:35,471 INFO  org.apache.flink.configuration.GlobalConfiguration   
- Loading configuration property:
yarn.application-master.env.LD_LIBRARY_PATH,
/opt/tibco/TIBRV/8.0/lib2017-09-07 09:49:35,471 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: yarn.taskmanager.env.LD_LIBRARY_PATH,
/opt/tibco/TIBRV/8.0/lib2017-09-07 09:49:35,471 INFO 
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: zookeeper.sasl.disable, true2017-09-07 09:49:35,662
INFO  org.apache.flink.configuration.GlobalConfiguration-
Loading configuration property: jobmanager.rpc.address, flink-012017-09-07
09:49:35,662 INFO  org.apache.flink.configuration.GlobalConfiguration   
- Loading configuration property: jobmanager.rpc.port, 61232017-09-07
09:49:35,662 INFO  org.apache.flink.configuration.GlobalConfiguration   
- Loading configuration property: jobmanager.heap.mb, 40962017-09-07
09:49:35,663 INFO  org.apache.flink.configuration.GlobalConfiguration   
- Loading configuration property: taskmanager.heap.mb, 81922017-09-07
09:49:35,663 INFO  org.apache.flink.configuration.GlobalConfiguration   
- Loading configuration p

MapState Default Value

2017-09-06 Thread Navneeth Krishnan
Hi,

Is there a reason behind removing the default value option in
MapStateDescriptor? I was using it in the earlier version to initialize
guava cache with loader etc and in the new version by default an empty map
is returned.

Thanks


Question about Flink internals

2017-09-06 Thread Junguk Cho
Hi, All.

I am new to Flink.
I just installed Flink in clusters and start reading documents to
understand Flink internals.
After reading some documents, I have some questions.
I have some experiences of Storm and Heron before, so I am linking their
mechanisms to questions to better understand Flink.

1. Can I specify worker parallelism explicitly like Storm?

2. Record in Flink
Can I think a "record" in FLINK is almost same as Tuple in Storm?
Tuple in Storm is used for carrying "real data" + "metadata (e.g., stream
type, source id and so on).

3. How does partition (e.g., shuffling,  map) works internally?
In Storm, it has (worker id) : (tcp info to next workers) tables.
So, based on this information, after executing partition function, Tuple is
 forwarded to next hops based on tables.
Is it the same?

4. How does Flink detect fault in case of worker dead machine failure?
Based on documents, Job manager checks liveness of task managers with
heartbeat message.
In Storm, supervisor (I think it is similar with Task manager) first
detects worker dead based on heartbeat and locally re-runs it again. For
machine failure, Nimbus (I think it is similar with Job manager) detects
machine failure based on supervisor's heartbeat and re-schedule all
assigned worker to other machine.
How does Flink work?

5. For exactly-once delivery, Flink uses checking point and record replay
mechanism.
It needs messages queues (e.g, Kafka) for record replay.
Kafka uses TCP to send and receive data. So I wonder if data source does
not use TCP (e.g., IoT sensors), what is general solutions to use record
replay?
For example, source workers are directly connected to several inputs (e.g.,
IoT sensors) while I think it is not normal deployments.

6. Flink supports Cycles.
However,  based on documents, Cycled tasks act as regular dataflow source
and sink respectively, yet they are collocated in the same physical
instance to share an in-memory buffer and thus, implement loopback stream
transparently.
So, what if the number of workers which make cycles is high? It would be
hard to put them in the same physical machine.

Thanks,
Junguk


Re: Process Function

2017-09-06 Thread Johannes Schulte
Thanks, that helped to see how we could implement this!

On Wed, Sep 6, 2017 at 12:01 PM, Timo Walther  wrote:

> Hi Johannes,
>
> you can find the implementation for the state clean up here:
> https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/main/scala/org/apache/flink/
> table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
>
> and a example usage here:
> https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/main/scala/org/apache/flink/
> table/runtime/aggregate/ProcTimeUnboundedOver.scala
>
> Regards,
> Timo
>
>
> Am 06.09.17 um 10:50 schrieb Aljoscha Krettek:
>
> Hi,
>
> I'm actually not very familiar with the current Table API implementations
> but Fabian or Timo (cc'ed) should know more. I suspect very much that this
> is implemented like this, yes.
>
> Best,
> Aljoscha
>
> On 5. Sep 2017, at 21:14, Johannes Schulte 
> wrote:
>
> Hi,
>
> one short question I had that fits here. When using higher level streaming
> we can set min and max retention time [1] which is probably used to reduce
> the number of timers registered under the hood. How is this implemented, by
> registering a "clamped" timer?
>
> Thanks,
>
> Johannes
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/
> streaming.html#idle-state-retention-time
>
> On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> This is mostly correct, but you cannot register a timer in open() because
>> we don't have an active key there. Only in process() and onTimer() can you
>> register a timer.
>>
>> In your case, I would suggest to somehow clamp the timestamp to the
>> nearest 2 minute (or whatever) interval or to keep an extra ValueState that
>> tells you whether you already registered a timer.
>>
>> Best,
>> Aljoscha
>>
>> On 5. Sep 2017, at 16:55, Kien Truong  wrote:
>>
>> Hi,
>>
>> You can register a processing time timer inside the onTimer and the open
>> function to have a timer that run periodically.
>>
>> Pseudo-code example:
>>
>> ValueState lastRuntime;
>>
>> void open() {
>>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 6);
>> }
>>
>> void onTimer() {
>>   // Run the periodic task
>>   if (lastRuntime.get() + 6 == timeStamp) {
>> periodicTask();
>>   }
>>   // Re-register the processing time timer timer
>>   lastRuntime.setValue(timeStamp);  
>> ctx.timerService().registerProcessingTimeTimer(current.timestamp + 6);
>> }
>>
>> void periodicTask()
>>
>>
>> For the second question, timer are already scoped by key, so you can keep
>> a lastModified variable as a ValueState,
>> then compare it to the timestamp provided by the timer to see if the
>> current key should be evicted.
>> Checkout the example on the ProcessFunction page.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/stream/process_function.html
>>
>> Best regards,
>> Kien
>>
>> On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>>
>> Hi All,
>>
>> I have a streaming pipeline which is keyed by userid and then to a
>> flatmap function. I need to clear the state after sometime and I was
>> looking at process function for it.
>>
>> Inside the process element function if I register a timer wouldn't it
>> create a timer for each incoming message?
>>
>> // schedule the next timer 60 seconds from the current event time
>> ctx.timerService().registerEventTimeTimer(current.timestamp + 6);
>>
>> How can I get something like a clean up task that runs every 2 mins and
>> evicts all stale data? Also is there a way to get the key inside onTimer
>> function so that I know which key has to be evicted?
>>
>> Thanks,
>> Navneeth
>>
>>
>>
>
>
>


Flink 1.2.1 JobManager Election Deadlock

2017-09-06 Thread James Bucher
Hey all,

Just wanted to report this for posterity in case someone else sees something 
similar. We were running Flink 1.2.1 in Kubernetes. We use an HA setup with an 
external Zookeeper and S3 for checkpointing. We recently noticed a job that 
appears to have deadlocked on JobManager Leader election. We think the issue 
happened something like the following:

  1.  Job was up and running normally
  2.  Some cluster event caused the JobManager Pod (process) to get restarted.
  3.  JobManager came up again but got stuck on LeaderElection. At this time 
the JobManager UI sent back a response with "Service temporarily unavailable 
due to an ongoing leader election. Please refresh."
  4.  JobManager never exited this state.
  5.  This state persisted across Pod restarts/deletes.

In order to try to pin down this problem we brought up the Job in another Flink 
Cluster and started debugging this issue. As a first step I upped the logging 
level on the JobManager and applied the change. This resulted in the following 
log (Full logs attached to this email):


2017-09-05 18:50:55,072 TRACE 
org.apache.flink.shaded.org.apache.curator.utils.DefaultTracerDriver  - Trace: 
GetDataBuilderImpl-Background - 4 ms

2017-09-05 18:50:55,075 DEBUG 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Received 
CHILD_ADDED event (path: /4e4cdc8fb8c1437c620cd4063bd265e1)

2017-09-05 18:50:55,088 DEBUG 
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Received 
CHILD_ADDED event notification for job 4e4cdc8fb8c1437c620cd4063bd265e1

2017-09-05 18:50:56,072 DEBUG org.apache.zookeeper.ClientCnxn   
- Reading reply sessionid:0x15e3df8b5a57b4b, packet:: 
clientPath:null serverPath:null finished:false header:: 1,3  replyHeader:: 
1,21476396435,0  request:: '/traveler-profile.us-west-2c.test.expedia.com,F  
response:: 
s{4305881524,4305881524,1497456679255,1497456679255,0,3,0,0,0,3,21475572278}

2017-09-05 18:50:56,078 DEBUG org.apache.zookeeper.ClientCnxn   
- Reading reply sessionid:0x15e3df8b5a57b4b, packet:: 
clientPath:null serverPath:null finished:false header:: 2,3  replyHeader:: 
2,21476396435,0  request:: 
'/traveler-profile.us-west-2c.test.expedia.com/krazyglue,F  response:: 
s{4305881525,4305881525,1497456679260,1497456679260,0,1,0,0,0,1,4305881526}

2017-09-05 18:50:56,079 DEBUG org.apache.zookeeper.ClientCnxn   
- Reading reply sessionid:0x15e3df8b5a57b4b, packet:: 
clientPath:null serverPath:null finished:false header:: 3,3  replyHeader:: 
3,21476396435,0  request:: 
'/traveler-profile.us-west-2c.test.expedia.com/krazyglue/mip,F  response:: 
s{4305881526,4305881526,1497456679263,1497456679263,0,1,0,0,0,1,4305881527}

2017-09-05 18:50:56,080 DEBUG org.apache.zookeeper.ClientCnxn   
- Reading reply sessionid:0x15e3df8b5a57b4b, packet:: 
clientPath:null serverPath:null finished:false header:: 4,3  replyHeader:: 
4,21476396435,0  request:: 
'/traveler-profile.us-west-2c.test.expedia.com/krazyglue/mip/flink,F  
response:: 
s{4305881527,4305881527,1497456679267,1497456679267,0,1,0,0,0,1,4305881528}

2017-09-05 18:50:56,081 DEBUG org.apache.zookeeper.ClientCnxn   
- Reading reply sessionid:0x15e3df8b5a57b4b, packet:: 
clientPath:null serverPath:null finished:false header:: 5,3  replyHeader:: 
5,21476396435,0  request:: 
'/traveler-profile.us-west-2c.test.expedia.com/krazyglue/mip/flink/default,F  
response:: 
s{4305881528,4305881528,1497456679270,1497456679270,0,27,0,0,0,5,21475449005}

2017-09-05 18:50:56,085 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService.

2017-09-05 18:50:56,087 DEBUG org.apache.zookeeper.ClientCnxn   
- Reading reply sessionid:0x15e3df8b5a57b4b, packet:: 
clientPath:null serverPath:null finished:false header:: 6,3  replyHeader:: 
6,21476396435,0  request:: '/traveler-profile.us-west-2c.test.expedia.com,F  
response:: 
s{4305881524,4305881524,1497456679255,1497456679255,0,3,0,0,0,3,21475572278}

2017-09-05 18:50:56,087 DEBUG org.apache.zookeeper.ClientCnxn   
- Reading reply sessionid:0x15e3df8b5a57b4b, packet:: 
clientPath:null serverPath:null finished:false header:: 7,3  replyHeader:: 
7,21476396435,0  request:: 
'/traveler-profile.us-west-2c.test.expedia.com/krazyglue,F  response:: 
s{4305881525,4305881525,1497456679260,1497456679260,0,1,0,0,0,1,4305881526}

2017-09-05 18:50:56,088 DEBUG org.apache.zookeeper.ClientCnxn   
- Reading reply sessionid:0x15e3df8b5a57b4b, packet:: 
clientPath:null serverPath:null finished:false header:: 8,3  replyHeader:: 
8,21476396435,0  request:: 
'/traveler-profile.us-west-2c.test.expedia.com/krazyglue/mip,F  response:: 
s{4305881526,4305881526,1497456679263,1497456679263,0,1,0,0,0,1,4305881527}

2017-09-05 18:50:56,090 DEBUG org.apache.z

Re: FLINK-6117 issue work around

2017-09-06 Thread Nico Kruber
I looked at the commit you cherry-picked and nothing in there explains the 
error you got. This rather sounds like something might be mixed up between 
(remaining artefacts of) flink 1.3 and 1.2.

Can you verify that nothing of your flink 1.3 tests remains, e.g. running 
JobManager or TaskManager instances? Also that you're not accidentally running 
the yarn-session.sh script of 1.3?


Nico

On Wednesday, 6 September 2017 06:36:42 CEST Sunny Yun wrote:
> Hi,
> 
> Using flink 1.2.0, I faced to issue
> https://issues.apache.org/jira/browse/FLINK-6117
> https://issues.apache.org/jira/browse/FLINK-6117.
> This issue is fixed at version 1.3.0. But I have some reason to trying to
> find out work around.
> 
> I did,
> 1. change source according to
> https://github.com/apache/flink/commit/eef85e095a8a0e4c4553631b74ba7b9f173ce
> bf0 2. replace $FLINK_HOME/lib/flink-dist_2.11-1.2.0.jar
> 3. set flink-conf.yaml "zookeeper.sasl.disable: true"
> 4. run yarn-session.sh
> 
> 
> Original problem-Authentication failed- seems to be passed.
> But I got this error,
> 
> Exception in thread "main" java.lang.RuntimeException: Failed to retrieve
> JobManager address
> at
> org.apache.flink.client.program.ClusterClient.getJobManagerAddress(ClusterCl
> ient.java:248) at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:6
> 27) at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.jav
> a:476) at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.jav
> a:473) at
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurity
> Context.java:43) at java.security.AccessController.doPrivileged(Native
> Method) at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.ja
> va:1656) at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSec
> urityContext.java:40) at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:
> 473) Caused by:
> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could
> not retrieve the leader address and leader session ID.
> at
> org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionI
> nfo(LeaderRetrievalUtils.java:175) at
> org.apache.flink.client.program.ClusterClient.getJobManagerAddress(ClusterCl
> ient.java:242) ... 9 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [6 milliseconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scal
> a:53) at scala.concurrent.Await$.result(package.scala:190)
> at scala.concurrent.Await.result(package.scala)
> at
> org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionI
> nfo(LeaderRetrievalUtils.java:173) ... 10 more
> 
> 
> I believe related setting(flink, hadoop, zookeeper) is correct. Because
> yarn-session works smoothly with flink 1.3.2 in same environment.
> 
> Does anyone have any inspiration for this error message?
> 
> Thanks.
> 
> ᐧ



signature.asc
Description: This is a digitally signed message part.


Re: dynamically partitioned stream

2017-09-06 Thread Martin Eden
Hi Aljoscha, Tony,

We actually do not need all the keys to be on all nodes where lambdas are.
We just need the keys that represent the data for the lambda arguments to
be routed to the same node as the lambda, whichever one it might be.

Essentially in the solution we emit the data multiple times and by doing
that we roughly multiply the input rate by the average number of lambdas a
key is a part of (X). In terms of memory this is O(X * N) where N is the
number of keys int the data. N is the large bit. If X ~ N then we have O
(N^2) complexity for the Flink state. And in that case yes I see your point
about performance Aljoscha. But if X << N, as is our case, then we have
O(N) which should be manageable by Flink's distributed state mechanism
right? Do you see any gotchas in this new light? Are my assumptions correct?

Thanks,
M





On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei  wrote:

> Hi Martin, Aljoscha
>
> I think Aljoscha is right. My origin thought was to keep the state only
> after a lambda function coming.
>
> Use Aljoscha's scenario as example, initially, all data will be discarded
> because there is no any lambdas. When lambda f1 [D, E] and f2 [A, C]
> comes, A, C begin to be routed to machine "0" and D, E begin to be routed
> to machine "1". Then, when we get a new lambda f3 [C, D], we can
> duplicate C, D and route these copies to machine "2".
>
> However, after reading your example again, I found what you want is a
> whole picture for all variables' state in a global view, so that no matter
> what time a new lambda comes it can always get its variables' state
> immediately. In that case, I have the same opinion as Aljoscha.
>
> Best,
> Tony Wei
>
> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek :
>
>> Hi Martin,
>>
>> I think with those requirements this is very hard (or maybe impossible)
>> to do efficiently in a distributed setting. It might be that I'm
>> misunderstanding things but let's look at an example. Assume that
>> initially, we don't have any lambdas, so data can be sent to any machine
>> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
>> C]. Say this gets routed to machine "0", now this means that messages with
>> key A and C also need to be router to machine "0". Now, we get a new lambda
>> f1 [D, E], say this gets routed to machine "2", meaning that messages with
>> key D and E are also routed to machine "2".
>>
>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
>> lambdas and inputs to different machines? They all have to go to the same
>> machine, but which one? I'm currently thinking that there would need to be
>> some component that does the routing, but this has to be global, so it's
>> hard to do in a distributed setting.
>>
>> What do you think?
>>
>> Best,
>> Aljoscha
>>
>> On 1. Sep 2017, at 07:17, Martin Eden  wrote:
>>
>> This might be a way forward but since side inputs are not there I will
>> try and key the control stream by the keys in the first co flat map.
>>
>> I'll see how it goes.
>>
>> Thanks guys,
>> M
>>
>> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei  wrote:
>>
>>> Hi Martin,
>>>
>>> Yes, that is exactly what I thought.
>>> But the first step also needs to be fulfilled  by SideInput. I'm not
>>> sure how to achieve this in the current release.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> Martin Eden 於 2017年8月31日 週四,下午11:32寫道:
>>>
 Hi Aljoscha, Tony,

 Aljoscha:
 Yes it's the first option you mentioned.
 Yes, the stream has multiple values in flight for A, B, C. f1 needs to
 be applied each time a new value for either A, B or C comes in. So we need
 to use state to cache the latest values. So using the example data stream
 in my first msg the emitted stream should be:

 1. Data Stream:
 KEY VALUE TIME
 .
 .
 .
 C  V66
 B  V66
 A  V55
 A  V44
 C  V33
 A  V33
 B  V33
 B  V22
 A  V11

 2. Control Stream:
 Lambda  ArgumentKeys TIME
 .
 .
 .
 f2[A, C] 4
 f1[A, B, C]1

 3. Expected emitted stream:
 TIMEVALUE
 .
 .
 .
 6  f1(V5, V6, V3)
 f1(V5, V6, V6)
 f2(V5, V6)
 5  f1(V5, V3, V3)
 f2(V5, V3)
 4  f1(V4, V3, V3)
 f2(V4, V3)
 3  f1(V3, V3, V3)
 2  -
 1  -

 So essentially as soon as the argument list fills up then we apply the
 function/lambda at each new arriving message in the data stream for either
 argument key.

 Tony:
 Yes we need to group by and pass to the lambda.
 Ok, so what you are proposing might work. So your solution assumes that
 we have to connect with the control stream twice? Once for the tagging and
 another time re-connect-ing the control

Re: Apache Phenix integration

2017-09-06 Thread Flavio Pompermaier
Maybe this should be well documented also...is there any dedicated page to
Flink and JDBC connectors?

On Wed, Sep 6, 2017 at 4:12 PM, Fabian Hueske  wrote:

> Great!
>
> If you want to, you can open a PR that adds
>
> if (!conn.getAutoCommit()) {
>   conn.setAutoCommit(true);
> }
>
> to JdbcOutputFormat.open().
>
> Cheers, Fabian
>
>
>
> 2017-09-06 15:55 GMT+02:00 Flavio Pompermaier :
>
>> Hi Fabian,
>> thanks for the detailed answer. Obviously you are right :)
>> As stated by https://phoenix.apache.org/tuning.html auto-commit is
>> disabled by default in Phoenix, but it can be easily enabled just appending
>> AutoCommit=true to the connection URL or, equivalently, setting the proper
>> property in the conf object passed to the Phoenix
>> QueryUtil.getConnectionUrl method that autogenerate the connection URL,
>> i.e.:
>>
>> --
>> Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
>> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>> final Properties phoenixProps = PropertiesUtil.extractProperties(new
>> Properties(), jobConf);
>> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>> --
>>
>> Now my job works also with the standard Flink JDBCOutputformat.
>> Just to help other people willing to play with Phoenix and HBase I paste
>> below my simple test job:
>>
>> @Test
>>   public void testPhoenixOutputFormat() throws Exception {
>>
>> final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
>> senv.enableCheckpointing(5000);
>> DataStream testStream = senv.fromElements("1,aaa,XXX",
>> "2,bbb,YYY", "3,ccc,ZZZ");
>>
>> // Set the target Phoenix table and the columns
>> DataStream rows = testStream.map(new MapFunction() {
>>
>>   private static final long serialVersionUID = 1L;
>>
>>   @Override
>>   public Row map(String str) throws Exception {
>> String[] split = str.split(Pattern.quote(","));
>> Row ret = new Row(3);
>> ret.setField(0, split[0]);
>> ret.setField(1, split[1]);
>> ret.setField(2, split[2]);
>> return ret;
>>   }
>> }).returns(new RowTypeInfo(BasicTypeInfo.STRI
>> NG_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_
>> INFO));
>>
>> Job job = Job.getInstance(HBaseConfiguration.create(),
>> "phoenix-mr-job");
>> PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
>> "FIELD_1,FIELD2,FIELD_3");
>> final org.apache.hadoop.conf.Configuration jobConf =
>> job.getConfiguration();
>> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>> final String upsertStatement = PhoenixConfigurationUtil.getUp
>> sertStatement(jobConf);
>> final Properties phoenixProps = PropertiesUtil.extractProperties(new
>> Properties(), jobConf);
>> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>>
>> rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
>> .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.
>> getCanonicalName())
>> .setDBUrl(connUrl)
>> .setQuery(upsertStatement)
>> .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR,
>> Types.VARCHAR})
>> .finish());
>>
>> senv.execute();
>>   }
>>
>> Best,
>> Flavio
>>
>> On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> According to the JavaDocs of java.sql.Connection, commit() will throw an
>>> exception if the connection is in auto commit mode which should be the
>>> default.
>>> So adding this change to the JdbcOutputFormat seems a bit risky.
>>>
>>> Maybe the Phoenix JDBC connector does not enable auto commits by default
>>> (or doesn't support it). Can you check that Flavio?
>>> If the Phoenix connector supports but not activates auto commits by
>>> default, we can enable it in JdbcOutputFormat.open().
>>> If auto commits are not supported, we can add a check after execute()
>>> and call commit() only if Connection.getAutoCommit() returns false.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier :
>>>
 Hi to all,
 I'm writing a job that uses Apache Phoenix.

 At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but
 it's not well suited to work with Table API because it cannot handle
 generic objects like Rows (it need a DBWritable Object that should be
 already present at compile time). So I've looked into the code of the
 PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
 (basically).

 However, to make it work I had to slightly modify the Flink
 JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
 PreparedStatement. E.g:

 upload.executeBatch();
 dbConn.commit();

 For the moment I've just created a dedicated PhoenixJdbcOutpuFormat
 where I've added these 2 lines o

Re: Apache Phenix integration

2017-09-06 Thread Fabian Hueske
Great!

If you want to, you can open a PR that adds

if (!conn.getAutoCommit()) {
  conn.setAutoCommit(true);
}

to JdbcOutputFormat.open().

Cheers, Fabian



2017-09-06 15:55 GMT+02:00 Flavio Pompermaier :

> Hi Fabian,
> thanks for the detailed answer. Obviously you are right :)
> As stated by https://phoenix.apache.org/tuning.html auto-commit is
> disabled by default in Phoenix, but it can be easily enabled just appending
> AutoCommit=true to the connection URL or, equivalently, setting the proper
> property in the conf object passed to the Phoenix
> QueryUtil.getConnectionUrl method that autogenerate the connection URL,
> i.e.:
>
> --
> Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
> final Properties phoenixProps = PropertiesUtil.extractProperties(new
> Properties(), jobConf);
> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
> --
>
> Now my job works also with the standard Flink JDBCOutputformat.
> Just to help other people willing to play with Phoenix and HBase I paste
> below my simple test job:
>
> @Test
>   public void testPhoenixOutputFormat() throws Exception {
>
> final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
> senv.enableCheckpointing(5000);
> DataStream testStream = senv.fromElements("1,aaa,XXX",
> "2,bbb,YYY", "3,ccc,ZZZ");
>
> // Set the target Phoenix table and the columns
> DataStream rows = testStream.map(new MapFunction() {
>
>   private static final long serialVersionUID = 1L;
>
>   @Override
>   public Row map(String str) throws Exception {
> String[] split = str.split(Pattern.quote(","));
> Row ret = new Row(3);
> ret.setField(0, split[0]);
> ret.setField(1, split[1]);
> ret.setField(2, split[2]);
> return ret;
>   }
> }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO));
>
> Job job = Job.getInstance(HBaseConfiguration.create(),
> "phoenix-mr-job");
> PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
> "FIELD_1,FIELD2,FIELD_3");
> final org.apache.hadoop.conf.Configuration jobConf =
> job.getConfiguration();
> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
> final String upsertStatement = PhoenixConfigurationUtil.
> getUpsertStatement(jobConf);
> final Properties phoenixProps = PropertiesUtil.extractProperties(new
> Properties(), jobConf);
> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>
> rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
> .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.
> class.getCanonicalName())
> .setDBUrl(connUrl)
> .setQuery(upsertStatement)
> .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR,
> Types.VARCHAR})
> .finish());
>
> senv.execute();
>   }
>
> Best,
> Flavio
>
> On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> According to the JavaDocs of java.sql.Connection, commit() will throw an
>> exception if the connection is in auto commit mode which should be the
>> default.
>> So adding this change to the JdbcOutputFormat seems a bit risky.
>>
>> Maybe the Phoenix JDBC connector does not enable auto commits by default
>> (or doesn't support it). Can you check that Flavio?
>> If the Phoenix connector supports but not activates auto commits by
>> default, we can enable it in JdbcOutputFormat.open().
>> If auto commits are not supported, we can add a check after execute() and
>> call commit() only if Connection.getAutoCommit() returns false.
>>
>> Best, Fabian
>>
>>
>> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier :
>>
>>> Hi to all,
>>> I'm writing a job that uses Apache Phoenix.
>>>
>>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but
>>> it's not well suited to work with Table API because it cannot handle
>>> generic objects like Rows (it need a DBWritable Object that should be
>>> already present at compile time). So I've looked into the code of the
>>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
>>> (basically).
>>>
>>> However, to make it work I had to slightly modify the Flink
>>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
>>> PreparedStatement. E.g:
>>>
>>> upload.executeBatch();
>>> dbConn.commit();
>>>
>>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat
>>> where I've added these 2 lines of code starting from the code of the
>>> JDBCOutputformat (it couldn't be extended in this case because all fields
>>> are private).
>>>
>>> What do you think about this? Should I open a ticket to add a connection
>>> commit after executeBatch (in order to be compatible with Phoenix) or
>>> something 

Re: Fwd: some question about side output

2017-09-06 Thread Biplob Biswas
Change the type of the mainstream from DataStream to
SingleOutputStreamOperator

The getSideOutput() function is not part of the base class DataStream rather
the extended Class SingleOutputStreamOperator



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Aljoscha Krettek
After discussing this between Stefan and me we think that this should actually 
work.

Do you have the log output from restoring the Kafka Consumer? It would be 
interesting to see whether any of those print:
 - 
https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
 

 - 
https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
 


> On 6. Sep 2017, at 14:45, Aljoscha Krettek  wrote:
> 
> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer 
> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we 
> store state in a union state, i.e. all sources get all partition on restore 
> and if they didn't get any they know that they are new. There is no specific 
> logic for detecting this situation, it's just that the partition discoverer 
> will be seeded with this information and it will know if it discovers a new 
> partition whether it can take ownership of that partition.
> 
> I'm sure Gordon (cc'ed) could explain it better than I did.
> 
>> On 6. Sep 2017, at 14:36, Gyula Fóra > > wrote:
>> 
>> Wouldnt it be enough that Kafka sources store some empty container for there 
>> state if it is empty, compared to null when it should be bootstrapped again?
>> 
>> Gyula
>> 
>> Aljoscha Krettek mailto:aljos...@apache.org>> ezt írta 
>> (időpont: 2017. szept. 6., Sze, 14:31):
>> The problem here is that context.isRestored() is a global flag and not local 
>> to each operator. It says "yes this job was restored" but the source would 
>> need to know that it is actually brand new and never had any state. This is 
>> quite tricky to do, since there is currently no way (if I'm correct) to 
>> differentiate between "I got empty state but others maybe got state" and 
>> "this source never had state and neither had other parallel instances".
>> 
>> Best,
>> Aljoscha
>> 
>>> On 6. Sep 2017, at 13:56, Stefan Richter >> > wrote:
>>> 
>>> Thanks for the report, I will take a look.
>>> 
 Am 06.09.2017 um 11:48 schrieb Gyula Fóra >>> >:
 
 Hi all,
 
 We are running into some problems with the kafka source after changing the 
 uid and restoring from the savepoint.
 What we are expecting is to clear the partition state, and set it up all 
 over again, but what seems to happen is that the consumer thinks that it 
 doesnt have any partitions assigned.
 
 This was supposed to be fixed in 
 https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
  
 
 but appears to be reworked/reverted in the latest release : 
 https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
  
 
 
 What is the expected behaviour here?
 
 Thanks!
 Gyula
>>> 
>> 
> 



Re: Apache Phenix integration

2017-09-06 Thread Flavio Pompermaier
Hi Fabian,
thanks for the detailed answer. Obviously you are right :)
As stated by https://phoenix.apache.org/tuning.html auto-commit is disabled
by default in Phoenix, but it can be easily enabled just appending
AutoCommit=true to the connection URL or, equivalently, setting the proper
property in the conf object passed to the Phoenix
QueryUtil.getConnectionUrl method that autogenerate the connection URL,
i.e.:

--
Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
final Properties phoenixProps = PropertiesUtil.extractProperties(new
Properties(), jobConf);
String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
--

Now my job works also with the standard Flink JDBCOutputformat.
Just to help other people willing to play with Phoenix and HBase I paste
below my simple test job:

@Test
  public void testPhoenixOutputFormat() throws Exception {

final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
senv.enableCheckpointing(5000);
DataStream testStream = senv.fromElements("1,aaa,XXX",
"2,bbb,YYY", "3,ccc,ZZZ");

// Set the target Phoenix table and the columns
DataStream rows = testStream.map(new MapFunction() {

  private static final long serialVersionUID = 1L;

  @Override
  public Row map(String str) throws Exception {
String[] split = str.split(Pattern.quote(","));
Row ret = new Row(3);
ret.setField(0, split[0]);
ret.setField(1, split[1]);
ret.setField(2, split[2]);
return ret;
  }
}).returns(new
RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO));

Job job = Job.getInstance(HBaseConfiguration.create(),
"phoenix-mr-job");
PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
"FIELD_1,FIELD2,FIELD_3");
final org.apache.hadoop.conf.Configuration jobConf =
job.getConfiguration();
jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
final String upsertStatement =
PhoenixConfigurationUtil.getUpsertStatement(jobConf);
final Properties phoenixProps = PropertiesUtil.extractProperties(new
Properties(), jobConf);
String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);

rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()

.setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.getCanonicalName())
.setDBUrl(connUrl)
.setQuery(upsertStatement)
.setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR, Types.VARCHAR})
.finish());

senv.execute();
  }

Best,
Flavio

On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske  wrote:

> Hi,
>
> According to the JavaDocs of java.sql.Connection, commit() will throw an
> exception if the connection is in auto commit mode which should be the
> default.
> So adding this change to the JdbcOutputFormat seems a bit risky.
>
> Maybe the Phoenix JDBC connector does not enable auto commits by default
> (or doesn't support it). Can you check that Flavio?
> If the Phoenix connector supports but not activates auto commits by
> default, we can enable it in JdbcOutputFormat.open().
> If auto commits are not supported, we can add a check after execute() and
> call commit() only if Connection.getAutoCommit() returns false.
>
> Best, Fabian
>
>
> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier :
>
>> Hi to all,
>> I'm writing a job that uses Apache Phoenix.
>>
>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's
>> not well suited to work with Table API because it cannot handle generic
>> objects like Rows (it need a DBWritable Object that should be already
>> present at compile time). So I've looked into the code of the
>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
>> (basically).
>>
>> However, to make it work I had to slightly modify the Flink
>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
>> PreparedStatement. E.g:
>>
>> upload.executeBatch();
>> dbConn.commit();
>>
>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where
>> I've added these 2 lines of code starting from the code of the
>> JDBCOutputformat (it couldn't be extended in this case because all fields
>> are private).
>>
>> What do you think about this? Should I open a ticket to add a connection
>> commit after executeBatch (in order to be compatible with Phoenix) or
>> something else (e.g. create a Phoenix connector that basically extend
>> JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of
>> its fields to protected)?
>>
>> Best,
>> Flavio
>>
>>
>


Re: Apache Phenix integration

2017-09-06 Thread Fabian Hueske
Hi,

According to the JavaDocs of java.sql.Connection, commit() will throw an
exception if the connection is in auto commit mode which should be the
default.
So adding this change to the JdbcOutputFormat seems a bit risky.

Maybe the Phoenix JDBC connector does not enable auto commits by default
(or doesn't support it). Can you check that Flavio?
If the Phoenix connector supports but not activates auto commits by
default, we can enable it in JdbcOutputFormat.open().
If auto commits are not supported, we can add a check after execute() and
call commit() only if Connection.getAutoCommit() returns false.

Best, Fabian


2017-09-06 11:38 GMT+02:00 Flavio Pompermaier :

> Hi to all,
> I'm writing a job that uses Apache Phoenix.
>
> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's
> not well suited to work with Table API because it cannot handle generic
> objects like Rows (it need a DBWritable Object that should be already
> present at compile time). So I've looked into the code of the
> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
> (basically).
>
> However, to make it work I had to slightly modify the Flink
> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
> PreparedStatement. E.g:
>
> upload.executeBatch();
> dbConn.commit();
>
> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where
> I've added these 2 lines of code starting from the code of the
> JDBCOutputformat (it couldn't be extended in this case because all fields
> are private).
>
> What do you think about this? Should I open a ticket to add a connection
> commit after executeBatch (in order to be compatible with Phoenix) or
> something else (e.g. create a Phoenix connector that basically extend
> JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of
> its fields to protected)?
>
> Best,
> Flavio
>
>


Re: DataSet: partitionByHash without materializing/spilling the entire partition?

2017-09-06 Thread Fabian Hueske
Hi Billy,

 a program that is defined as

Dataset -> Map > Filter -> Map -> Output

should not spill at all.
There is an unnecessary serialization/deserialization step between the last
map and the sink, but there shouldn't be any spilling to disk.

As I said in my response to Urs, spilling should only happen in a few cases:

- full sort with not sufficient memory
- hash-tables that need to spill (only in join operators)
- range partitioning to compute a histogram of the partitioning keys.
- temp nodes to avoid deadlocks. These can occur in plans that branch and
join later like the following:

  /--- Map ---\
Input --<   JOIN --- Output
  \--- Map ---/

The first two should not be surprising, but the last one is usually
unexpected.

Can you share a bit more information about your optimization of rewriting

Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output

to

Dataset -> Map -> FilterT -> CoGroup > Map -> Output
Dataset -> Map -> FilterF -> Map -> Output

I did not completely understand the structure of the first job. Is it
branching and merging again?
Maybe you can share the JSON plan (ExecutionEnvironment.getExecutionPlan())?

Thanks, Fabian

2017-09-06 14:41 GMT+02:00 Fabian Hueske :

> btw. not sure if you know that you can visualize the JSON plan returned by
> ExecutionEnvironment.getExecutionPlan() on the website [1].
>
> Best, Fabian
>
> [1] http://flink.apache.org/visualizer/
>
>
> 2017-09-06 14:39 GMT+02:00 Fabian Hueske :
>
>> Hi Urs,
>>
>> a hash-partition operator should not spill. In general, DataSet plans aim
>> to be as much pipelined as possible.
>> There are a few cases when spilling happens:
>>
>> - full sort with not sufficient memory
>> - hash-tables that need to spill (only in join operators)
>> - range partitioning to compute a histogram of the partitioning keys.
>> - temp nodes to avoid deadlocks. These can occur in plans that branch and
>> join later like the following:
>>
>>   /--- Map ---\
>> Input --<   JOIN --- Output
>>   \--- Map ---/
>>
>>
>> A simple plan without branching with as the one you posted
>>readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) ->
>> first(n)
>> has no reason to spill except for the full sort that is required for the
>> final aggregation.
>>
>> Can you share the execution plan that you get of the plan
>> (ExecutionEnvironment.getExecutionPlan())?
>>
>> Btw, the sortGroup(0) call is superfluous because it would sort a group
>> where all 0-fields are the same on the 0-field.
>> I believe Flink's optimizer automatically removes that so it does not
>> impact the performance.
>> Sorting on another field would indeed make sense, because this would
>> determine order within a group and hence the records which are forwarded by
>> First(n).
>>
>> In order to force a combiner on a partitioned data set, you can do the
>> following:
>>
>> 
>>
>> public static void main(String[] args) throws Exception {
>>
>>ExecutionEnvironment env = ExecutionEnvironment.getExecut
>> ionEnvironment();
>>
>>DataSet> data = randData(env);
>>
>>DataSet> result = data.partitionByHash(0)
>>   .groupBy(0).combineGroup(new First3())
>>  .withForwardedFields("f0")
>>   .groupBy(0).reduceGroup(new First3());
>>
>>result.print();
>> }
>>
>> public static class First3 implements
>>GroupCombineFunction, Tuple2>,
>>GroupReduceFunction, Tuple2> {
>>
>>@Override
>>public void combine(Iterable> values,
>> Collector> out) throws Exception {
>>   reduce(values, out);
>>}
>>
>>@Override
>>public void reduce(Iterable> values,
>> Collector> out) throws Exception {
>>   int i = 0;
>>   for (Tuple2 v : values) {
>>  out.collect(v);
>>  i++;
>>  if (i == 3) {
>> break;
>>  }
>>   }
>>}
>> }
>>
>> 
>>
>> The generated plan will
>> - hash partition the input data
>> - partially sort the data in memory on the first field (not going to disk)
>> - invoke the combiner for each in-memory sorted group
>> - locally forward the data (because of the forwarded field information
>> [1])
>> - fully sort the data
>> - invoke group reducer for each group
>>
>> In this plan, the only spilling should happen in the sort for the final
>> aggregation.
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/batch/index.html#semantic-annotations
>>
>>
>>
>>
>> 2017-09-05 22:21 GMT+02:00 Newport, Billy :
>>
>>> We have the same issue. We are finding that we cannot express the data
>>> flow in a natural way because of unnecessary spilling. Instead, we're
>>> making our own operators which combine multiple steps together and
>>> essentially hide it from flink OR sometimes we even have to read an input
>>> dataset once per flow to avoid spilling. The performance improvements are
>>> dramatic but it's kind of reducing  fli

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Aljoscha Krettek
Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer which 
also has discovery of new partitions. Starting from 1.4-SNAPSHOT we store state 
in a union state, i.e. all sources get all partition on restore and if they 
didn't get any they know that they are new. There is no specific logic for 
detecting this situation, it's just that the partition discoverer will be 
seeded with this information and it will know if it discovers a new partition 
whether it can take ownership of that partition.

I'm sure Gordon (cc'ed) could explain it better than I did.

> On 6. Sep 2017, at 14:36, Gyula Fóra  wrote:
> 
> Wouldnt it be enough that Kafka sources store some empty container for there 
> state if it is empty, compared to null when it should be bootstrapped again?
> 
> Gyula
> 
> Aljoscha Krettek mailto:aljos...@apache.org>> ezt írta 
> (időpont: 2017. szept. 6., Sze, 14:31):
> The problem here is that context.isRestored() is a global flag and not local 
> to each operator. It says "yes this job was restored" but the source would 
> need to know that it is actually brand new and never had any state. This is 
> quite tricky to do, since there is currently no way (if I'm correct) to 
> differentiate between "I got empty state but others maybe got state" and 
> "this source never had state and neither had other parallel instances".
> 
> Best,
> Aljoscha
> 
>> On 6. Sep 2017, at 13:56, Stefan Richter > > wrote:
>> 
>> Thanks for the report, I will take a look.
>> 
>>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra >> >:
>>> 
>>> Hi all,
>>> 
>>> We are running into some problems with the kafka source after changing the 
>>> uid and restoring from the savepoint.
>>> What we are expecting is to clear the partition state, and set it up all 
>>> over again, but what seems to happen is that the consumer thinks that it 
>>> doesnt have any partitions assigned.
>>> 
>>> This was supposed to be fixed in 
>>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>>>  
>>> 
>>> but appears to be reworked/reverted in the latest release : 
>>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>>>  
>>> 
>>> 
>>> What is the expected behaviour here?
>>> 
>>> Thanks!
>>> Gyula
>> 
> 



Re: DataSet: partitionByHash without materializing/spilling the entire partition?

2017-09-06 Thread Fabian Hueske
btw. not sure if you know that you can visualize the JSON plan returned by
ExecutionEnvironment.getExecutionPlan() on the website [1].

Best, Fabian

[1] http://flink.apache.org/visualizer/


2017-09-06 14:39 GMT+02:00 Fabian Hueske :

> Hi Urs,
>
> a hash-partition operator should not spill. In general, DataSet plans aim
> to be as much pipelined as possible.
> There are a few cases when spilling happens:
>
> - full sort with not sufficient memory
> - hash-tables that need to spill (only in join operators)
> - range partitioning to compute a histogram of the partitioning keys.
> - temp nodes to avoid deadlocks. These can occur in plans that branch and
> join later like the following:
>
>   /--- Map ---\
> Input --<   JOIN --- Output
>   \--- Map ---/
>
>
> A simple plan without branching with as the one you posted
>readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) ->
> first(n)
> has no reason to spill except for the full sort that is required for the
> final aggregation.
>
> Can you share the execution plan that you get of the plan
> (ExecutionEnvironment.getExecutionPlan())?
>
> Btw, the sortGroup(0) call is superfluous because it would sort a group
> where all 0-fields are the same on the 0-field.
> I believe Flink's optimizer automatically removes that so it does not
> impact the performance.
> Sorting on another field would indeed make sense, because this would
> determine order within a group and hence the records which are forwarded by
> First(n).
>
> In order to force a combiner on a partitioned data set, you can do the
> following:
>
> 
>
> public static void main(String[] args) throws Exception {
>
>ExecutionEnvironment env = ExecutionEnvironment.
> getExecutionEnvironment();
>
>DataSet> data = randData(env);
>
>DataSet> result = data.partitionByHash(0)
>   .groupBy(0).combineGroup(new First3())
>  .withForwardedFields("f0")
>   .groupBy(0).reduceGroup(new First3());
>
>result.print();
> }
>
> public static class First3 implements
>GroupCombineFunction, Tuple2>,
>GroupReduceFunction, Tuple2> {
>
>@Override
>public void combine(Iterable> values,
> Collector> out) throws Exception {
>   reduce(values, out);
>}
>
>@Override
>public void reduce(Iterable> values,
> Collector> out) throws Exception {
>   int i = 0;
>   for (Tuple2 v : values) {
>  out.collect(v);
>  i++;
>  if (i == 3) {
> break;
>  }
>   }
>}
> }
>
> 
>
> The generated plan will
> - hash partition the input data
> - partially sort the data in memory on the first field (not going to disk)
> - invoke the combiner for each in-memory sorted group
> - locally forward the data (because of the forwarded field information [1])
> - fully sort the data
> - invoke group reducer for each group
>
> In this plan, the only spilling should happen in the sort for the final
> aggregation.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/batch/index.html#semantic-annotations
>
>
>
>
> 2017-09-05 22:21 GMT+02:00 Newport, Billy :
>
>> We have the same issue. We are finding that we cannot express the data
>> flow in a natural way because of unnecessary spilling. Instead, we're
>> making our own operators which combine multiple steps together and
>> essentially hide it from flink OR sometimes we even have to read an input
>> dataset once per flow to avoid spilling. The performance improvements are
>> dramatic but it's kind of reducing  flink to a thread scheduler rather than
>> a data flow engine because we basically cannot express the flow to flink.
>> This worries us because if we let others write flink code using our infra,
>> we'll be spending all our time collapsing their flows into much simpler but
>> less intuititve flows to prevent flink from spilling.
>>
>> This also means higher level APIs such as the table API or Beam are off
>> the table because they prevent us optimizing in this manner.
>>
>> We already have prior implementations of the logic we are implementing in
>> flink and as a result, we know it's much less efficient than the prior
>> implementations which is giving us pause for rolling it out more broadly,
>> we're afraid of the flink tax in effect from a performance point of view as
>> well as from a usability point of view given naïve flows are not performant
>> without significant collapsing.
>>
>> For example, we see spilling here:
>>
>> Dataset -> Map > Filter -> Map -> Output
>>
>> We're trying to combine the Map ->Output into the filter operation now to
>> write the records which are not passed through to an output file during the
>> Filter.
>>
>>
>> Or in this case
>>
>> Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output
>>
>> Rewriting as
>>
>> Dataset -> Map -> FilterT -> CoGroup > Map -> Output
>> Dataset -> Map -> FilterF -> Map -> Output
>>
>> 

Re: DataSet: partitionByHash without materializing/spilling the entire partition?

2017-09-06 Thread Fabian Hueske
Hi Urs,

a hash-partition operator should not spill. In general, DataSet plans aim
to be as much pipelined as possible.
There are a few cases when spilling happens:

- full sort with not sufficient memory
- hash-tables that need to spill (only in join operators)
- range partitioning to compute a histogram of the partitioning keys.
- temp nodes to avoid deadlocks. These can occur in plans that branch and
join later like the following:

  /--- Map ---\
Input --<   JOIN --- Output
  \--- Map ---/


A simple plan without branching with as the one you posted
   readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) -> first(n)
has no reason to spill except for the full sort that is required for the
final aggregation.

Can you share the execution plan that you get of the plan
(ExecutionEnvironment.getExecutionPlan())?

Btw, the sortGroup(0) call is superfluous because it would sort a group
where all 0-fields are the same on the 0-field.
I believe Flink's optimizer automatically removes that so it does not
impact the performance.
Sorting on another field would indeed make sense, because this would
determine order within a group and hence the records which are forwarded by
First(n).

In order to force a combiner on a partitioned data set, you can do the
following:



public static void main(String[] args) throws Exception {

   ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

   DataSet> data = randData(env);

   DataSet> result = data.partitionByHash(0)
  .groupBy(0).combineGroup(new First3())
 .withForwardedFields("f0")
  .groupBy(0).reduceGroup(new First3());

   result.print();
}

public static class First3 implements
   GroupCombineFunction, Tuple2>,
   GroupReduceFunction, Tuple2> {

   @Override
   public void combine(Iterable> values,
Collector> out) throws Exception {
  reduce(values, out);
   }

   @Override
   public void reduce(Iterable> values,
Collector> out) throws Exception {
  int i = 0;
  for (Tuple2 v : values) {
 out.collect(v);
 i++;
 if (i == 3) {
break;
 }
  }
   }
}



The generated plan will
- hash partition the input data
- partially sort the data in memory on the first field (not going to disk)
- invoke the combiner for each in-memory sorted group
- locally forward the data (because of the forwarded field information [1])
- fully sort the data
- invoke group reducer for each group

In this plan, the only spilling should happen in the sort for the final
aggregation.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#semantic-annotations



2017-09-05 22:21 GMT+02:00 Newport, Billy :

> We have the same issue. We are finding that we cannot express the data
> flow in a natural way because of unnecessary spilling. Instead, we're
> making our own operators which combine multiple steps together and
> essentially hide it from flink OR sometimes we even have to read an input
> dataset once per flow to avoid spilling. The performance improvements are
> dramatic but it's kind of reducing  flink to a thread scheduler rather than
> a data flow engine because we basically cannot express the flow to flink.
> This worries us because if we let others write flink code using our infra,
> we'll be spending all our time collapsing their flows into much simpler but
> less intuititve flows to prevent flink from spilling.
>
> This also means higher level APIs such as the table API or Beam are off
> the table because they prevent us optimizing in this manner.
>
> We already have prior implementations of the logic we are implementing in
> flink and as a result, we know it's much less efficient than the prior
> implementations which is giving us pause for rolling it out more broadly,
> we're afraid of the flink tax in effect from a performance point of view as
> well as from a usability point of view given naïve flows are not performant
> without significant collapsing.
>
> For example, we see spilling here:
>
> Dataset -> Map > Filter -> Map -> Output
>
> We're trying to combine the Map ->Output into the filter operation now to
> write the records which are not passed through to an output file during the
> Filter.
>
>
> Or in this case
>
> Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output
>
> Rewriting as
>
> Dataset -> Map -> FilterT -> CoGroup > Map -> Output
> Dataset -> Map -> FilterF -> Map -> Output
>
> That is two separate flows is multiples faster. That is, reading the file
> twice rather than once.
>
> This is all pretty unintuitive and makes using Flink pretty difficult for
> us never mind our users. Writing the flink dataflows in a naïve way is fast
> but getting it to run with acceptable efficiency results in obscure
> workarounds and collapsing and takes the bulk of the time for us which is a
> shame and the main reason, we don't want to push

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Gyula Fóra
Wouldnt it be enough that Kafka sources store some empty container for
there state if it is empty, compared to null when it should be bootstrapped
again?

Gyula

Aljoscha Krettek  ezt írta (időpont: 2017. szept. 6.,
Sze, 14:31):

> The problem here is that context.isRestored() is a global flag and not
> local to each operator. It says "yes this job was restored" but the source
> would need to know that it is actually brand new and never had any state.
> This is quite tricky to do, since there is currently no way (if I'm
> correct) to differentiate between "I got empty state but others maybe got
> state" and "this source never had state and neither had other parallel
> instances".
>
> Best,
> Aljoscha
>
> On 6. Sep 2017, at 13:56, Stefan Richter 
> wrote:
>
> Thanks for the report, I will take a look.
>
> Am 06.09.2017 um 11:48 schrieb Gyula Fóra :
>
> Hi all,
>
> We are running into some problems with the kafka source after changing the
> uid and restoring from the savepoint.
> What we are expecting is to clear the partition state, and set it up all
> over again, but what seems to happen is that the consumer thinks that it
> doesnt have any partitions assigned.
>
> This was supposed to be fixed in
> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
> but appears to be reworked/reverted in the latest release :
> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>
> What is the expected behaviour here?
>
> Thanks!
> Gyula
>
>
>
>


Re: Flink on AWS EMR Protobuf

2017-09-06 Thread Aljoscha Krettek
Hi,

Could you please give a bit more context around that exception? Maybe a log or 
a full stack trace.

Best,
Aljoscha

> On 5. Sep 2017, at 23:52, ant burton  wrote:
> 
> Hello,
> 
> Has anybody experienced the following error on AWS EMR 5.8.0 with Flink 1.3.1
> 
> java.lang.ClassCastException: 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetFileInfoRequestProto
>  cannot be cast to com.google.protobuf.Message
> 
> Thanks,
> 
> 



Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Aljoscha Krettek
The problem here is that context.isRestored() is a global flag and not local to 
each operator. It says "yes this job was restored" but the source would need to 
know that it is actually brand new and never had any state. This is quite 
tricky to do, since there is currently no way (if I'm correct) to differentiate 
between "I got empty state but others maybe got state" and "this source never 
had state and neither had other parallel instances".

Best,
Aljoscha

> On 6. Sep 2017, at 13:56, Stefan Richter  wrote:
> 
> Thanks for the report, I will take a look.
> 
>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra > >:
>> 
>> Hi all,
>> 
>> We are running into some problems with the kafka source after changing the 
>> uid and restoring from the savepoint.
>> What we are expecting is to clear the partition state, and set it up all 
>> over again, but what seems to happen is that the consumer thinks that it 
>> doesnt have any partitions assigned.
>> 
>> This was supposed to be fixed in 
>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>>  
>> 
>> but appears to be reworked/reverted in the latest release : 
>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>>  
>> 
>> 
>> What is the expected behaviour here?
>> 
>> Thanks!
>> Gyula
> 



Re: Securing Flink Monitoring REST API

2017-09-06 Thread avivros
Does  jobmanager.web.ssl.enabled supports Client SSL Authentication?
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Stefan Richter
Thanks for the report, I will take a look.

> Am 06.09.2017 um 11:48 schrieb Gyula Fóra :
> 
> Hi all,
> 
> We are running into some problems with the kafka source after changing the 
> uid and restoring from the savepoint.
> What we are expecting is to clear the partition state, and set it up all over 
> again, but what seems to happen is that the consumer thinks that it doesnt 
> have any partitions assigned.
> 
> This was supposed to be fixed in 
> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>  
> 
> but appears to be reworked/reverted in the latest release : 
> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>  
> 
> 
> What is the expected behaviour here?
> 
> Thanks!
> Gyula



Re: Process Function

2017-09-06 Thread Timo Walther

Hi Johannes,

you can find the implementation for the state clean up here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala

and a example usage here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala

Regards,
Timo


Am 06.09.17 um 10:50 schrieb Aljoscha Krettek:

Hi,

I'm actually not very familiar with the current Table API 
implementations but Fabian or Timo (cc'ed) should know more. I suspect 
very much that this is implemented like this, yes.


Best,
Aljoscha

On 5. Sep 2017, at 21:14, Johannes Schulte 
mailto:johannes.schu...@gmail.com>> wrote:


Hi,

one short question I had that fits here. When using higher level 
streaming we can set min and max retention time [1] which is probably 
used to reduce the number of timers registered under the hood. How is 
this implemented, by registering a "clamped" timer?


Thanks,

Johannes

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#idle-state-retention-time


On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek > wrote:


Hi,

This is mostly correct, but you cannot register a timer in open()
because we don't have an active key there. Only in process() and
onTimer() can you register a timer.

In your case, I would suggest to somehow clamp the timestamp to
the nearest 2 minute (or whatever) interval or to keep an extra
ValueState that tells you whether you already registered a timer.

Best,
Aljoscha


On 5. Sep 2017, at 16:55, Kien Truong mailto:duckientru...@gmail.com>> wrote:

Hi,

You can register a processing time timer inside the onTimer and
the open function to have a timer that run periodically.

Pseudo-code example:

|ValueState lastRuntime; void open() {
ctx.timerService().registerProcessingTimeTimer(current.timestamp
+ 6); } void onTimer() { // Run the periodic task if
(lastRuntime.get() + 6 == timeStamp) { periodicTask(); } //
Re-register the processing time timer timer
lastRuntime.setValue(timeStamp); |   
||ctx.timerService().registerProcessingTimeTimer(current.timestamp
+ 6);| } void periodicTask() |

For the second question, timer are already scoped by key, so you
can keep a lastModified variable as a ValueState,
then compare it to the timestamp provided by the timer to see if
the current key should be evicted.
Checkout the example on the ProcessFunction page.


https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html



Best regards,
Kien

On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:

Hi All,

I have a streaming pipeline which is keyed by userid and then
to a flatmap function. I need to clear the state after sometime
and I was looking at process function for it.

Inside the process element function if I register a timer
wouldn't it create a timer for each incoming message?
|// schedule the next timer 60 seconds from the current event
time
ctx.timerService().registerEventTimeTimer(current.timestamp +
6);|
How can I get something like a clean up task that runs every 2
mins and evicts all stale data? Also is there a way to get the
key inside onTimer function so that I know which key has to be
evicted?

Thanks,
Navneeth









Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Gyula Fóra
Hi all,

We are running into some problems with the kafka source after changing the
uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all
over again, but what seems to happen is that the consumer thinks that it
doesnt have any partitions assigned.

This was supposed to be fixed in
https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
but appears to be reworked/reverted in the latest release :
https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547

What is the expected behaviour here?

Thanks!
Gyula


Apache Phenix integration

2017-09-06 Thread Flavio Pompermaier
Hi to all,
I'm writing a job that uses Apache Phoenix.

At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's
not well suited to work with Table API because it cannot handle generic
objects like Rows (it need a DBWritable Object that should be already
present at compile time). So I've looked into the code of the
PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
(basically).

However, to make it work I had to slightly modify the Flink
JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
PreparedStatement. E.g:

upload.executeBatch();
dbConn.commit();

For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where
I've added these 2 lines of code starting from the code of the
JDBCOutputformat (it couldn't be extended in this case because all fields
are private).

What do you think about this? Should I open a ticket to add a connection
commit after executeBatch (in order to be compatible with Phoenix) or
something else (e.g. create a Phoenix connector that basically extend
JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of
its fields to protected)?

Best,
Flavio


Re: Shuffling between map and keyBy operator

2017-09-06 Thread Kurt Young
Hi Marchant,

I'm afraid that the serde cost still exists even if both operators run in
same TaskManager.

Best,
Kurt

On Tue, Sep 5, 2017 at 9:26 PM, Marchant, Hayden 
wrote:

> I have a streaming application that has a keyBy operator followed by an
> operator working on the keyed values (a custom sum operator). If the map
> operator and aggregate operator are running on same Task Manager , will
> Flink always serialize and deserialize the tuples, or is there an
> optimization in this case due to 'locality'?
>
> (I was planning on deploying my Flink Streaming application to a single
> 'big' node in the hope that I can reduce latency by saving on both network
> and serde.)
>
>
> Thanks,
> Hayden Marchant
>
>
>


Re: Process Function

2017-09-06 Thread Aljoscha Krettek
Hi,

I'm actually not very familiar with the current Table API implementations but 
Fabian or Timo (cc'ed) should know more. I suspect very much that this is 
implemented like this, yes.

Best,
Aljoscha

> On 5. Sep 2017, at 21:14, Johannes Schulte  wrote:
> 
> Hi,
> 
> one short question I had that fits here. When using higher level streaming we 
> can set min and max retention time [1] which is probably used to reduce the 
> number of timers registered under the hood. How is this implemented, by 
> registering a "clamped" timer?
> 
> Thanks,
> 
> Johannes
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#idle-state-retention-time
>  
> 
> 
> On Tue, Sep 5, 2017 at 5:17 PM, Aljoscha Krettek  > wrote:
> Hi,
> 
> This is mostly correct, but you cannot register a timer in open() because we 
> don't have an active key there. Only in process() and onTimer() can you 
> register a timer.
> 
> In your case, I would suggest to somehow clamp the timestamp to the nearest 2 
> minute (or whatever) interval or to keep an extra ValueState that tells you 
> whether you already registered a timer.
> 
> Best,
> Aljoscha
> 
>> On 5. Sep 2017, at 16:55, Kien Truong > > wrote:
>> 
>> Hi,
>> 
>> You can register a processing time timer inside the onTimer and the open 
>> function to have a timer that run periodically.
>> Pseudo-code example:
>> 
>> ValueState lastRuntime;
>> 
>> void open() {
>>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 6);
>> }
>> 
>> void onTimer() {
>>   // Run the periodic task
>>   if (lastRuntime.get() + 6 == timeStamp) {
>> periodicTask();
>>   }
>>   // Re-register the processing time timer timer
>>   lastRuntime.setValue(timeStamp);
>>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 6);
>> }
>> 
>> void periodicTask()
>> 
>> For the second question, timer are already scoped by key, so you can keep a 
>> lastModified variable as a ValueState, 
>> then compare it to the timestamp provided by the timer to see if the current 
>> key should be evicted. 
>> Checkout the example on the ProcessFunction page. 
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
>>  
>> 
>> 
>> Best regards,
>> Kien
>> 
>> On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>>> Hi All,
>>> 
>>> I have a streaming pipeline which is keyed by userid and then to a flatmap 
>>> function. I need to clear the state after sometime and I was looking at 
>>> process function for it.
>>> 
>>> Inside the process element function if I register a timer wouldn't it 
>>> create a timer for each incoming message?
>>> // schedule the next timer 60 seconds from the current event time
>>> ctx.timerService().registerEventTimeTimer(current.timestamp + 
>>> 6);
>>> How can I get something like a clean up task that runs every 2 mins and 
>>> evicts all stale data? Also is there a way to get the key inside onTimer 
>>> function so that I know which key has to be evicted?
>>> 
>>> Thanks,
>>> Navneeth
> 
> 



Re: Union limit

2017-09-06 Thread Fabian Hueske
Hi,

the following code should do what you want.
I included an implementation of an IdMapper.
At the end, I print the execution plan which is generated after the
optimization (so the pipeline is working until then).

Best, Fabian

val data: Seq[Seq[Int]] = (1 until 315).map(i => Seq(1, 2, 3))

val dataSets: Seq[DataSet[Int]] = data.map(env.fromCollection(_))

dataSets.sliding(60, 60)
  .map(dsg => dsg.reduce( (ds1: DataSet[Int], ds2: DataSet[Int]) =>
ds1.union(ds2)).map(new IdMapper[Int]()))
  .reduce( (dsg1: DataSet[Int], dsg2: DataSet[Int]) => dsg1.union(dsg2))
  .map(x => x * 2) // do something with the union result
  .output(new DiscardingOutputFormat[Int])

println(env.getExecutionPlan())

class IdMapper[T] extends MapFunction[T, T] {
  override def map(value: T): T = value
}

2017-08-31 12:30 GMT+02:00 boci :

> Dear Fabian,
>
> Thanks to your answer (I think you said same in StackOverflow) but as you
> see in my code your solution does not work anymore:
>
> Here is the code, it's split the datasets to list (each list contains
> maximum 60 datasets)
> After that, I  reduce the dataset using union and map with an IdMapper and
> return the id mapped data set.
> But when the next reduce (where I want to merge the id mapped stream) the
> flink said I reached the limit.
>
> Maybe my IdMapper is wrong... Can you show a correct working IdMapper?
>
> b0c1
>
> ps:
> Here is the code segment:
> listOfDataSet
> .sliding(60,60)
> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)).map(new IdMapper()))
> //There is an iterator of DataSet
> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
> .map(finalDataSet => ... some transformation ...)
> .count()
>
>
>
> On Wed, 30 Aug 2017 at 15:44 Fabian Hueske  wrote:
>
>> Hi b0c1,
>>
>> This is an limitation in Flink's optimizer.
>> Internally, all binary unions are merged into a single n-ary union. The
>> optimizer restricts the number of inputs for an operator to 64.
>>
>> You can work around this limitation with an identity mapper which
>> prevents the union operators from merging:
>>
>> in1\
>> in2-- Id-Map--- NextOp
>> ...   / / /
>> in14--/ / /
>>   / /
>> in15/ /
>> ...   /
>> in74/
>>
>> This is not a super nice solution, but the only way that comes to my mind.
>>
>> Cheers, Fabian
>>
>> 2017-08-28 23:29 GMT+02:00 boci :
>>
>>> Hi guys!
>>>
>>> I have one input (from mongo) and I split the incoming data to multiple
>>> datasets (each created dynamically from configuration) and before I write
>>> back the result I want to merge it to one dataset (there is some common
>>> transformation).
>>> so the flow:
>>>
>>> DataSet from Mongod =>
>>> Create Mappers dynamically (currently 74) so I have 74 DataSet =>
>>> Custom filter and mapping on each dataset =>
>>> Union dynamically to one (every mapper result is same type) =>
>>> Some another common transformation =>
>>> Count the result
>>>
>>> but when I want to union more than 64 dataset I got these exception:
>>>
>>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>>> Cannot currently handle nodes with more than 64 outputs.
>>> at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(
>>> OptimizerNode.java:348)
>>> at org.apache.flink.optimizer.dag.SingleInputNode.setInput(
>>> SingleInputNode.java:202)
>>> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
>>> GraphCreatingVisitor.java:268)
>>> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
>>> GraphCreatingVisitor.java:82)
>>>
>>> I try to split the incoming (74) list of dataset to split to 60 + 14
>>>  dataset and create an id mapper and union the result datasets but no
>>> success:
>>>
>>> val listOfDataSet: List[DataSet[...]] = 
>>>
>>> listOfDataSet
>>> .sliding(60,60)
>>> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
>>> //There is an iterator of DataSet
>>> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
>>> .map(finalDataSet => ... some transformation ...)
>>> .count()
>>>
>>> There is any solution to solve this?
>>>
>>> Thanks
>>> b0c1
>>>
>>
>>