[ANNOUNCE] Flink Forward San Francisco 10-11 Apr 2017 community discount codes

2017-03-24 Thread Robert Metzger
Dear Flink community,

I would like to bring Flink Forward San Francisco to your attention. After
hosting Flink Forward for two years in Berlin, Germany, we decided to bring
it to the US west coast as well.

Check out this very nice video summary from last year's conference in
Berlin: https://www.youtube.com/watch?v=RYoFwQFvC_g

Given the amazing program we have for San Francisco in April, I'm sure
it'll be a similar success: http://sf.flink-forward.org/kb_day/day2/ (with
keynotes from Netflix and Uber and talks from many others)


Since the Flink community is an important part of the success of Flink,
data Artisans decided to offer all community members a 25% discount code: "
*Mailing_FFSF17_**25*".
Feel free to pass this code to your colleagues if they are not subscribed
to the mailing list.


We are also looking for volunteers to help as stage managers, at the
registration and other logistics.
Volunteers will get a free ticket! Please get in touch via:
http://sf.flink-forward.org/be-part-of-flink-forward-san-francisco/

There will also be a colocated Flink ML Hackathon organized by Parallel
Machines: https://www.meetup.com/Parallel-Machines-Meetup/events/238390498/


I'm really looking forward to see you all in San Francisco in April!
Almost all committers employed at data Artisans will come to San Francisco
to hang out, have beers and chat about what's going on in the community
these days.



Regards,
Robert


Re: Support/connector for Neo4j?

2017-03-24 Thread Martin Junghanns
Please let me know, if you need help with the connector or if you want 
to extend it.


Cheers,

Martin


On 24.03.2017 16:07, alex.decastro wrote:

Thanks Tim! I missed that one on Jira. :-)



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Support-connector-for-Neo4j-tp12397p12399.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.




Re: Windows emit results at the end of the stream

2017-03-24 Thread Sonex
Hello Till,

Yes elements do have a timestamp associated which is parsed in the first map
function.

Yes, indeed if all timestamps lie within 1 hour the triggering will happen
after the complete file has been read. I had wrong window size and sliding
step for a dataset I was testing (I tested it in different datasets).

Now, since I solved the non-triggering from the beginning, one problem
remains in all my tests. Assume that I have a dataset of 50 hours of
events/elements. The triggering happens as expected but now when it reaches
the 6th hour it stops and do not continue (all parallel operators remain
idle).

Thanx,
Sonex



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12403.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: deploying flink cluster in AWS - Containerized

2017-03-24 Thread Till Rohrmann
A small addition for the component discovery: Flink works so that the
TaskManagers register at the JobManager. This means that the TaskManager
have to somehow retrieve the JobManager's address. Either you do it as
Philippe described it or you use the HA mode with ZooKeeper. In the latter
case, the leading JobManager will write its address to ZooKeeper from where
the TaskManager's can retrieve it.

Cheers,
Till

On Fri, Mar 24, 2017 at 11:34 AM, Philippe Caparroy <
philippe.capar...@orange.fr> wrote:

> Weave  allows encryption of the vpn, and your Flink containers can be
> secured using kerberos https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/config.html#kerberos-based-security.
>
> Le 24 mars 2017 à 11:16, Chakravarthy varaga  a
> écrit :
>
> Hi,
>
>  Thanks for your inputs. It kind of makes sense to use a container
> orchestrator to plough through networking under the hood.
>  How do you tackle security?
>
>  I don't see a way to authorize users for job management. I understand
> few orchestrators provide name space isolation and security policies on
> these. How do this work if the flink cluster is standalone on AWS ?
>
>
> Best Regards
> CVP
>
> On Fri, Mar 24, 2017 at 8:49 AM, Philippe Caparroy <
> philippe.capar...@orange.fr> wrote:
>
>> Hi,
>>
>> If I can give my 2 cents.
>>
>> One simple solution to your problem is using weave (
>> https://www.weave.works/) a Docker network plugin.
>>
>> We’ve been working for more then year with dockerized
>> (Flink+zookeeper+Yarn+spark+Kafka+hadoop+elasticsearch ) cluster using
>> weave.
>>
>> Design your docker container so that you can set the cluster size on
>> startup (number of task manager stand job managers should be a docker arg).
>>
>> Weave will act as a switch with dns server embedded. Your containers will
>> only have to be configured with hosts names such as :
>> flink.taskmanager-1.weave.local, link.taskmanager-2.weave.local,
>> flink.jobmanager-1.weave.local, and so on …
>>
>> with flink Yarn it’s even simpler, but you have to dockerize a Yarn
>> cluster.
>>
>> It works perfectly on bare metal machines and in the cloud
>> (digital-ocean, aws,…).
>>
>>
>>
>> Le 24 mars 2017 à 08:50, Chakravarthy varaga 
>> a écrit :
>>
>> Hi,
>>
>> I request someone to help here.
>>
>> Best Regards
>> CVP
>>
>> On Thu, Mar 23, 2017 at 10:13 PM, Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>>> I'm looking forward to hearing some updates on this...
>>>
>>> Any help here is highly appreciated !!
>>>
>>> On Thu, Mar 23, 2017 at 4:20 PM, Chakravarthy varaga <
>>> chakravarth...@gmail.com> wrote:
>>>
 Hi Team,

  We are doing a PoC to deploy Flink cluster on AWS. All runtime
 components will be dockerized.

   I have few questions in relation to discover & security:

   1. How does Job Manager discover task managers? Do they talk to
 over TCP ?

   2. If the runtime components TM, JM are containerized how are the
 IPs resolved dynamically? Basically do I have to configure the JM with the
 hostnames of the TMs. If so, if the TMs are on ephemeral IPs and on restart
 of TM how does the job manager know the TM's (IP/Host). Before I go into
 DNS and subnets, I'd like to understand how they disvoer & talk to each
 other !

3. I went through some Flink materials on the web on security
 precisely on kerebros. However how do I ensure that user level
 authentication is applied on job management. For ex., only certain users
 are allowed to start/stop jobs ? This question is in relation to if flink
 is deployed as standalone-cluster

 Thanks & Regards
 CVP

>>>
>>>
>>
>>
>
>


Re: [E] Re: unable to add more servers in zookeeper quorum peers in flink 1.2

2017-03-24 Thread Robert Metzger
You are right and I'm sorry. I've opened a PR:
https://github.com/apache/flink/pull/3605

On Thu, Mar 23, 2017 at 3:47 PM, Greg Hogan  wrote:

> A PR and review may have noted that the same regex is in
> stop-zookeeper-quorum.sh and recommended ignoring whitespace both before
> and after the equals sign.
>
>
> On Mar 23, 2017, at 10:12 AM, Robert Metzger  wrote:
>
> Thank you for verifying. Fixed in master: http://git-wip-us.
> apache.org/repos/asf/flink/commit/3e860b40
>
> On Wed, Mar 22, 2017 at 9:25 PM, 
> wrote:
>
>> That worked.. Thanks Chesnay,
>>
>>
>>
>>
>>  
>>
>> Kanagaraj Vengidasamy
>> RTCI
>>
>> 7701 E Telecom PKWY
>> Temple Terrace, FL 33637
>>
>> O 813.978.4372 | M 813.455.9757
>>
>>    
>>   
>> 
>>
>>
>>
>> *From:* Chesnay Schepler [mailto:ches...@apache.org]
>> *Sent:* Wednesday, March 22, 2017 11:23 AM
>> *To:* user@flink.apache.org
>> *Subject:* Re: [E] Re: unable to add more servers in zookeeper quorum
>> peers in flink 1.2
>>
>>
>>
>> I guess that's because the grouping is wrong.
>>
>> ^server\.*([0-9])+*[[:space:]]*\=([^: \#]+)
>>
>> should probably be
>>
>> ^server\.*([0-9]+)*[[:space:]]*\=([^: \#]+)
>>
>> Could you modify the .sh script as such and try again?
>>
>> Regards,
>> Chesnay
>>
>> On 22.03.2017 16:10, kanagaraj.vengidas...@verizon.com wrote:
>>
>> Greg,
>>
>>
>>
>> Sorry about that.. when I copy the config and replaced the real server
>> names . I don’t have space in my configurations. The  issue is when I have 
>> server.10=,
>> server.11=
>>
>> And tried to start the zookeeper,  Myid replaced with 0 and 1 for these
>> servers which is conflicting with server.0 and server.1
>>
>>
>>
>>
>>
>> Thanks
>>
>>
>>  
>>
>> Kanagaraj Vengidasamy
>> RTCI
>>
>> 7701 E Telecom PKWY
>> Temple Terrace, FL 33637
>>
>> O 813.978.4372 | M 813.455.9757
>>
>> 
>> 
>>   
>> 
>>   
>> 
>>
>>
>>
>> *From:* Greg Hogan [mailto:c...@greghogan.com ]
>> *Sent:* Wednesday, March 22, 2017 10:08 AM
>> *To:* user@flink.apache.org
>> *Subject:* [E] Re: unable to add more servers in zookeeper quorum peers
>> in flink 1.2
>>
>>
>>
>> Kanagaraj,
>>
>>
>>
>> None of the server lines are matching since the regex in
>> start-zookeeper-quorum.sh does not allow for spaces after the equals sign.
>>
>>   ^server\.([0-9])+[[:space:]]*\=([^: \#]+)
>>
>>
>>
>> Greg
>>
>>
>>
>> On Mar 22, 2017, at 8:49 AM, kanagaraj.vengidas...@verizon.com wrote:
>>
>>
>>
>> Hi All,
>>
>>
>>
>> We are using flink 1.2  .  Unable to add more than one digit in
>> server.x.  when I have more than one digit it is not allowing to start the
>> zookeeper.  What I need to do if I want to keep more servers?
>>
>>
>>
>> # ZooKeeper quorum peers
>>
>> server.0=server1:2888:3888
>>
>> server.1=server2:2888:3888
>>
>> server.2=server3:2888:3888
>>
>> server.3=server4:2888:3888
>>
>> server.4=server5:2888:3888
>>
>> server.5=server6:2888:3888
>>
>> server.6=server7:2888:3888
>>
>> server.7=server8:2888:3888
>>
>> server.8=server9:2888:3888
>>
>> server.9=server10:2888:3888
>>
>> #server.10=server11:2888:3888
>>
>> #server.11=server12:2888:3888
>>
>>
>>
>> Thanks
>>
>> Kanagaraj
>>
>>
>>
>>
>>
>
>
>


Re: deploying flink cluster in AWS - Containerized

2017-03-24 Thread Philippe Caparroy
Weave  allows encryption of the vpn, and your Flink containers can be secured 
using kerberos 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/config.html#kerberos-based-security.

> Le 24 mars 2017 à 11:16, Chakravarthy varaga  a 
> écrit :
> 
> Hi,
> 
>  Thanks for your inputs. It kind of makes sense to use a container 
> orchestrator to plough through networking under the hood.
>  How do you tackle security?
> 
>  I don't see a way to authorize users for job management. I understand 
> few orchestrators provide name space isolation and security policies on 
> these. How do this work if the flink cluster is standalone on AWS ?
>  
> 
> Best Regards
> CVP
> 
> On Fri, Mar 24, 2017 at 8:49 AM, Philippe Caparroy 
> > wrote:
> Hi,
> 
> If I can give my 2 cents.
> 
> One simple solution to your problem is using weave (https://www.weave.works/ 
> ) a Docker network plugin.
> 
> We’ve been working for more then year with dockerized 
> (Flink+zookeeper+Yarn+spark+Kafka+hadoop+elasticsearch ) cluster using weave.
> 
> Design your docker container so that you can set the cluster size on startup 
> (number of task manager stand job managers should be a docker arg).
> 
> Weave will act as a switch with dns server embedded. Your containers will 
> only have to be configured with hosts names such as : 
> flink.taskmanager-1.weave.local, link.taskmanager-2.weave.local, 
> flink.jobmanager-1.weave.local, and so on …
> 
> with flink Yarn it’s even simpler, but you have to dockerize a Yarn cluster.
> 
> It works perfectly on bare metal machines and in the cloud (digital-ocean, 
> aws,…).
> 
> 
> 
>> Le 24 mars 2017 à 08:50, Chakravarthy varaga > > a écrit :
>> 
>> Hi,
>> 
>> I request someone to help here.
>> 
>> Best Regards
>> CVP
>> 
>> On Thu, Mar 23, 2017 at 10:13 PM, Chakravarthy varaga 
>> > wrote:
>> I'm looking forward to hearing some updates on this...
>> 
>> Any help here is highly appreciated !!
>> 
>> On Thu, Mar 23, 2017 at 4:20 PM, Chakravarthy varaga 
>> > wrote:
>> Hi Team,
>> 
>>  We are doing a PoC to deploy Flink cluster on AWS. All runtime 
>> components will be dockerized.
>>  
>>   I have few questions in relation to discover & security:
>> 
>>   1. How does Job Manager discover task managers? Do they talk to over 
>> TCP ?
>> 
>>   2. If the runtime components TM, JM are containerized how are the IPs 
>> resolved dynamically? Basically do I have to configure the JM with the 
>> hostnames of the TMs. If so, if the TMs are on ephemeral IPs and on restart 
>> of TM how does the job manager know the TM's (IP/Host). Before I go into DNS 
>> and subnets, I'd like to understand how they disvoer & talk to each other !
>> 
>>3. I went through some Flink materials on the web on security 
>> precisely on kerebros. However how do I ensure that user level 
>> authentication is applied on job management. For ex., only certain users are 
>> allowed to start/stop jobs ? This question is in relation to if flink is 
>> deployed as standalone-cluster
>>
>> Thanks & Regards
>> CVP
>> 
>> 
> 
> 



回复:Re: flink Broadcast

2017-03-24 Thread rimin515
yes,it is YARN single job,use the commend:
flink-1.1.1/bin/flink run -m yarn-cluster \-yn 2 \-ys 2 \-yjm 2048 \-ytm 2048 
\--class statics.ComputeDocSim \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar \--classpath 
file:///opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar 
\--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar 
\text-assembly-0.1.0.jar hdfs:///user/hadoop/tf-idf-ex 
hdfs:///user/hadoop/tf-idf-ex-sims
and code is: val to = //DataSet[(String, Vector)] val to = from.collect() val 
cosDistince = CosineDistanceMetric.apply() val res = from.map{x=>  val 
fromId = x._1  val docSims = to.filter(_._1!=fromId).map{y=>
val toId = y._1  val score = 1-cosDistince.distance(x._2, 
y._2)(toId,score)  
}.toList.sortWith((x,y)=>x._2>y._2).take(20) (fromId,docSims)   
}res.writeAsText(..)
- 原始邮件 -
发件人:Stephan Ewen 
收件人:user@flink.apache.org
抄送人:亘谷 
主题:Re: flink Broadcast
日期:2017年03月24日 17点40分

The program consists of two executions - one that only collects() back to the 
client, one that executes the map function.
Are you running this as a "YARN single job" execution? IN that case, there may 
be an issue that this incorrectly tries to submit to a stopping YARN cluster.


On Fri, Mar 24, 2017 at 10:32 AM, Robert Metzger  wrote:
Hi,
Can you provide more logs to help us understand whats going on?
One note regarding your application: You are calling .collect() and send the 
collection with the map() call to the cluster again.This is pretty inefficient 
and can potentially break your application (in particular the RPC system of 
Flink).
I would recommend to use broadcast variables to send the dataset to the map 
operator: 
https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables

On Thu, Mar 23, 2017 at 3:11 PM,   wrote:
Hi ,alll,

i have a 36000 documents,and the document all transfer a vector , one doc is a 
vector,and dimension is the same,so have DataSet



val data :DataSet[(String,SparseVector)]= //36000 record

val toData = data.collect()

val docSims = data.map{x=>

 val fromId=x._1

 val docsims = toData.filter{y=>y._1!=fromId}.map{y=>

  val score =1- cosDisticnce(x._2,y._2)

 (y._1,score)

 }.toList.sortWith{(a,b)=>a._2>b._2}.take(20)

   (fromId,docsims)

}

docSims.writeAsText(file)

.

when run the job on yarn,it will get error ,the message is following:

   java.lang.InterruptedException  at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)

at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)

at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)

at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)





someone can tell me ?thank you






Re: RocksDB segfaults

2017-03-24 Thread Florian König
Hi,

@Robert: I have uploaded all the log files that I could get my hands on to 
https://www.dropbox.com/sh/l35q6979hy7mue7/AAAe1gABW59eQt6jGxA3pAYaa?dl=0. I 
tried to remove all unrelated messages logged by the job itself. In 
flink-root-jobmanager-0-micardo-dev.log I kept the Flink startup messages and 
the last half hour before the segfault.

@Stefan: Your theory could be the key. In the stack trace I see a call to 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge 
that results in a call to 
com.micardo.backend.tail.TransitionProcessor$2.getValue()Ljava/lang/Long later. 
Here’s a slimmed-down snippet of the relevant code:

class TransitionProcessor extends RichFlatMapFunction {

transient ValueState headSeenState;

public void open(final Configuration parameters) throws Exception {
headSeenState = FlinkUtil.getStateHandle(this, "head-seen", 
IdSet.class);

getRuntimeContext()
.getMetricGroup()
.gauge("head-seen", new Gauge() {
public Long getValue() {
try {
return 
headSeenState.value().count();
} catch (IOException e) {
e.printStackTrace();
return 0L;
}
}
});
}
…
}

FlinkUtil.getStateHandle instantiates a ValueStateDescriptor and acquires a 
reference to that state via the RuntimeContext of the RichFunction passed as 
‚this‘ in the above code.

Further along in the stack trace I see that headSeenState.value() results in a 
call to org.apache.flink.contrib.streaming.state.RocksDBValueState.value() and 
then to org.rocksdb.RocksDB.get(J[BIJ).

It looks like part of the metrics system asynchronously reads the value of the 
gauge and needs RocksDB for that. Is it possible that this thread does not hold 
the checkpointing lock you were talking about?

Best regards
Florian


> Am 22.03.2017 um 18:19 schrieb Stefan Richter :
> 
> Hi,
> 
> for the first checkpoint, from the stacktrace I assume that the backend is 
> not accessed as part of processing an element, but by another thread. Is that 
> correct? RocksDB requires accessing threads to hold the task’s checkpointing 
> lock, otherwise they might call methods on an instance that is already 
> disposed. However, this should only happen when the task was already about to 
> shutdown anyways. Is that a plausible explanation for your observed 
> behaviour? I can also not rule out that segfaults can happen inside RocksDB 
> or due to the JNI bridge.
> 
> Best,
> Stefan
> 
>> Am 22.03.2017 um 16:53 schrieb Florian König :
>> 
>> Hi Stephen,
>> 
>> you are right, the second stack trace is indeed from a run of Flink 1.1.4. 
>> Sorry, my bad.
>> 
>> That leaves us with the first trace of a segfault for which I can guarantee 
>> that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the 
>> problem. It has happened twice so far, but I can’t see any pattern. Is there 
>> anything in the stack trace that could point us to a probable cause?
>> 
>> Florian
>> 
>>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen :
>>> 
>>> Hi!
>>> 
>>> It looks like you are running the RocksDB state backend 1.1 (is still an 
>>> old version packaged into your JAR file?)
>>> 
>>> This line indicates that: 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot
>>>  (the method does not exist in 1.2 any more)
>>> 
>>> Can you try and run 1.2 and see if that still occurs? In general, I cannot 
>>> vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
>>> 
>>> Stephan
>>> 
>>> 
>>> 
>>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König  
>>> wrote:
>>> Hi,
>>> 
>>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something 
>>> in RocksDB. What is the preferred way to report them? All I got at the 
>>> moment are two hs_err_pid12345.log files. They are over 4000 lines long 
>>> each. Is there anything significant that I should extract to help you guys 
>>> and/or put into a JIRA ticket?
>>> 
>>> The first thing that came to my mind was the stack traces (see below). 
>>> Anything else?
>>> 
>>> Thanks
>>> Florian
>>> 
>>> 
>>> 
>>> Stack: [0x7fec04341000,0x7fec04442000],  sp=0x7fec0443ff48,  
>>> free space=1019k
>>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>>> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x7fec925887cc 
>>> [0x7fec92588780+0x4c]
>>> J 27241 C2 
>>>