退订
退订
Unsubscribe
Unsubscribe
Unsubscribe
Unsubscribe
Fetching TaskManager log failed
I run a flink job,when run one hour,there have a error: ERROR org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler - Fetching TaskManager log failed. java.util.NoSuchElementException: None.get
flink yarn-cluster run job --files
Hi,all in spark,the submit job can have --files,this means" Comma-separated list of files to be placed in the working directory of each executor." so,in flink,if there have the same method,i use --classpath file:///,but the job run error,there has not the file.
flink-1.2.0 java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/filter/Filter
today,i use flink-1.2.0,and run a job on yarn,the commend is: flink-1.2.0/bin/flink run \-m yarn-cluster \-yn 2 \-ys 4 \-yjm 3072 \-ytm 2048 \--class statics.ComputevecSim \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-server.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar \--classpath file:///opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar \/home/.../text-assembly-0.1.0.jar hdfs:///user/hadoop/wenhao/xj/wenda_search_aggregate_page.txt and have a error:Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.filter.Filter but use flink-1.1.1 it can run success,some one can tell me how to solve this problem.
flink one transformation end,the next transformation start
hi,all,i run a job,it is :-val data = env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]val dataVec = computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]val rescomm = computeCosSims (dataVec)//DataSet[(String,Array[(String,Double)])] but when run on the yarn cluster,the result was error,the job can success;and run on the local,in eclipse on my computer,the result is correct. so,i run twice,first:val data = env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]val dataVec = computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]dataVec.writeAsText("hdfs///vec")//the vector is correct, second:val readVec = env.readTextFile("hdfs:///vec").map(...)//DataSet[(String,Int,Array[(Int,Double)])]val rescomm = computeCosSims (dataVec)//DataSet[(String,Array[(String,Double)])]and the result is correct,is the same as on local,in eclispe.--someone can solve the problem?
回复:Re: flink Broadcast
yes,it is YARN single job,use the commend: flink-1.1.1/bin/flink run -m yarn-cluster \-yn 2 \-ys 2 \-yjm 2048 \-ytm 2048 \--class statics.ComputeDocSim \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar \--classpath file:///opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar \text-assembly-0.1.0.jar hdfs:///user/hadoop/tf-idf-ex hdfs:///user/hadoop/tf-idf-ex-sims and code is: val to = //DataSet[(String, Vector)] val to = from.collect() val cosDistince = CosineDistanceMetric.apply() val res = from.map{x=> val fromId = x._1 val docSims = to.filter(_._1!=fromId).map{y=> val toId = y._1 val score = 1-cosDistince.distance(x._2, y._2)(toId,score) }.toList.sortWith((x,y)=>x._2>y._2).take(20) (fromId,docSims) }res.writeAsText(..) - 原始邮件 - 发件人:Stephan Ewen收件人:user@flink.apache.org 抄送人:亘谷 主题:Re: flink Broadcast 日期:2017年03月24日 17点40分 The program consists of two executions - one that only collects() back to the client, one that executes the map function. Are you running this as a "YARN single job" execution? IN that case, there may be an issue that this incorrectly tries to submit to a stopping YARN cluster. On Fri, Mar 24, 2017 at 10:32 AM, Robert Metzger wrote: Hi, Can you provide more logs to help us understand whats going on? One note regarding your application: You are calling .collect() and send the collection with the map() call to the cluster again.This is pretty inefficient and can potentially break your application (in particular the RPC system of Flink). I would recommend to use broadcast variables to send the dataset to the map operator: https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables On Thu, Mar 23, 2017 at 3:11 PM, wrote: Hi ,alll, i have a 36000 documents,and the document all transfer a vector , one doc is a vector,and dimension is the same,so have DataSet val data :DataSet[(String,SparseVector)]= //36000 record val toData = data.collect() val docSims = data.map{x=> val fromId=x._1 val docsims = toData.filter{y=>y._1!=fromId}.map{y=> val score =1- cosDisticnce(x._2,y._2) (y._1,score) }.toList.sortWith{(a,b)=>a._2>b._2}.take(20) (fromId,docsims) } docSims.writeAsText(file) . when run the job on yarn,it will get error ,the message is following: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274) someone can tell me ?thank you
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
hi,i read file from hdfs,but there is error when run jon on yarn clutster,---val dataSeg = env.readTextFile("hdfs:///user/hadoop/text").filter(!_.startsWith("#")).map { x => val values = x.split("\t") (values.apply(0),values.apply(1).split(" ")) } logger.info("dataSeg="+dataSeg.count()) the error is following:--2017-03-24 11:32:15,012 INFO org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Interrupted while waiting for queuejava.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)-hadoop is 2.6flink is 1.1.0-hadoop2.6-scala-2.11(the org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl is in flink-shaded-hadoop2-1.1.0 )
flink Broadcast
Hi ,alll, i have a 36000 documents,and the document all transfer a vector , one doc is a vector,and dimension is the same,so have DataSet val data :DataSet[(String,SparseVector)]= //36000 record val toData = data.collect() val docSims = data.map{x=> val fromId=x._1 val docsims = toData.filter{y=>y._1!=fromId}.map{y=> val score =1- cosDisticnce(x._2,y._2) (y._1,score) }.toList.sortWith{(a,b)=>a._2>b._2}.take(20) (fromId,docsims) } docSims.writeAsText(file) . when run the job on yarn,it will get error ,the message is following: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274) someone can tell me ?thank you
回复:JVM Non Heap Memory
i have the same problem,but i put the flink job into yarn. but i put the job into yarn on the computer 22,and the job can success run,and the jobmanager is 79 and taskmanager is 69,they three different compu345ter, however,on computer 22,the pid=3463,which is the job that put into yarn,is have 2.3g memory,15% of total, the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 -ytm 1024 why in conputer 22,has occupy so much momory?the job is running computer 79 and computer 69. What would be the possible causes of such behavior ? Best Regards, - 原始邮件 - 发件人:Daniel Santos收件人:user@flink.apache.org 主题:JVM Non Heap Memory 日期:2016年11月29日 22点26分 Hello, Is it common to have high usage of Non-Heap in JVM ? I am running flink in stand-alone cluster and in docker, with each docker bieng capped at 6G of memory. I have been struggling to keep memory usage in check. The non-heap increases to no end. It start with just 100MB of usage and after a day it reaches to 1,3GB. Then evetually reaches to 2GB and then eventually the docker is killed because it has reached the memory limit. My configuration for each flink task manager is the following : --- flink-conf.yaml -- taskmanager.heap.mb: 3072 taskmanager.numberOfTaskSlots: 8 taskmanager.memory.preallocate: false taskmanager.network.numberOfBuffers: 12500 taskmanager.memory.off-heap: false - What would be the possible causes of such behavior ? Best Regards, Daniel Santos
flink-job-in-yarn,has max memory
Hi, i have a flink job,and abt assembly to get a jar file,so i put it to yarn and run it,use the follow commend:/home/www/flink-1.1.1/bin/flink run \-m yarn-cluster \-yn 1 \-ys 2 \-yjm 4096 \-ytm 4096 \--class skRecomm.SkProRecommFlink \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar \--classpath file:///opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar \--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar \/home/www/flink-mining/deploy/zx_article-7cffb87.jar --- the commend is in supervisor on a computer(*,*,*,22),and flink/conf/flink-conf.yaml,i set those pargam,--fs.hdfs.hadoopconf: /etc/hadoop/conf/jobmanager.web.port: 8081parallelism.default: 1taskmanager.memory.preallocate: falsetaskmanager.numberOfTaskSlots: 1taskmanager.heap.mb: 512jobmanager.heap.mb: 256arallelism.default: 1jobmanager.rpc.port: 6123jobmanager.rpc.address: localhost --the job is success, can find follow message in yarn monitor, flink.base.dir.path /data1/yarn/nm/usercache/work/appcache/application_1472623395420_36719/container_e03_1472623395420_36719_01_01fs.hdfs.hadoopconf /etc/hadoop/conf/jobmanager.heap.mb 256jobmanager.rpc.address *.*.*.79 -(is not *.*.*.22,and taskmanager is *.*.*.69)jobmanager.rpc.port 32987jobmanager.web.port0parallelism.default 1recovery.zookeeper.path.namespace application_1472623395420_36719taskmanager.heap.mb 512taskmanager.memory.preallocate falsetaskmanager.numberOfTaskSlots 1 -OverviewData Port All Slots Free Slots CPU Cores Physical Memory Free Memory Flink Managed Memory30471 2 032 189 GB 2.88 GB 1.96 GB---MemoryJVM (Heap/Non-Heap)Type Committed InitialMaximumHeap 2.92 GB 3.00 GB2.92 GBNon-Heap 53.4 MB 23.4 MB130 MBTotal 2.97 GB3.02 GB3.04 GB-Outside JVMType Count UsedCapacityDirect 510860 KB 860 KBMapped00 B 0 B--- i find in computer(*,*,*,22),the pid=345 has 2.36g memory,and the pid=345 is the job that from supervisor run, i really do not know why ?the job was run in yarn ,why occupy so much memory in computer(*.*.*.22),i just run the job in computer(*.*.*.22). thank you answer my question.
回复:How to Broadcast a very large model object (used in iterative scoring in recommendation system) in Flink
your message is very short,i can not read more.the follow is my guss, in flink,the dataStream is not for iterative computation,the dataSet would be more well.and fink suggest broadcast mini data,not large. your can load your model data (it can be from file,or table),before main function,andassignment to variable ,like name=yourModel. and the dataStream(it is a stream,unscored record,like DataStream[String] or DataStream[yourClass]), and dataStream.map{x=> val score = computeScore(x,yourModel) } object YourObject { load your model val yourModel = ; def main(){ ... read unscoreed record,from socket or kafka,or dataStream.map{x=> val score = computeScore(x,yourModel) } .. } } - 原始邮件 - 发件人:Anchit Jatana收件人:user@flink.apache.org 主题:How to Broadcast a very large model object (used in iterative scoring in recommendation system) in Flink 日期:2016年09月30日 14点15分 Hi All, Im building a recommendation system streaming application for which I need to broadcast a very large model object (used in iterative scoring) among all the task managers performing the operation parallely for the operator Im doing an this operation in map1 of CoMapFunction. Please suggest me some way to achieve the broadcasting of the large model variable (something similar to what Spark has with broadcast variables). Thank you Regards,Anchit
回复:Re: 回复:Re: modify coGroup GlobalWindows_GlobalWindow
think your anwser. but i can not get your ideal."If all elements of "words2" have been processed, the right side of your coGroup will always be empty no matter what is incoming in your socketTextStream.",the mean i can not get. the following is the ideal from me(it maybe error): the coGroup will create new dataStream,T1 and T2,this must use GlobalWindows to store all elements from T2,if use timeWindow or others,the T2's element will not all store. -- T1, T2 -- and into apply function,get result, when input first element,the T1 will add one element, - T1(+first), T2 - and into apply function,get result. when input second element,the T1 will add one element, --- T1(+first+second), T2 - and into apply function,get result. *** but,in fact ,i want to get the datastream like this, --- T1, T2 when input first ,is follow: T1(+first), T2 - when input second, is follow: T1(+second), T2 so the first must fired,this is my intention. and i try to cut socket input datastream,use countWindow or timewindow,it is not work,when use coGroup,the datastream is T1 and T2,they are a whole ,so i think must window the coGroup. - 原始邮件 - 发件人:Timo Walther收件人:user@flink.apache.org 主题:Re: 回复:Re: modify coGroup GlobalWindows_GlobalWindow 日期:2016年09月06日 20点52分 I think you have to rethink your approach. In your example "words2" is a stream but only with a fixed set of elements. If all elements of "words2" have been processed, the right side of your coGroup will always be empty no matter what is incoming in your socketTextStream. It is not read in over and over again. Is that your intention? Am 06/09/16 um 13:15 schrieb rimin...@sina.cn: i try read data into a list or List[Map] to store the T2,but i think if use list or List[Map],there is not parallelization,so i want to use coGroup. other hand,the coGroup function is join the T1 and T2,and must have window and trigger method,the window is cut the T1 and T2, the trigger is trigger the apply function when input to the trigger threshold. from the result,in apply(), i use my InnerJoinFunction,and output the T1 and T2,we can see when input data,and trigger the apply,into the InnerJoinFunction,the T1 and T2 will output, the T1 is increase,and T2 is not change, so the window cut the T1 and T2 do not achieve mine goal,so i want to write my "GlobalWindows.create()". and Flink's operator state i have no ideal for it,and really do not know how to use it.can you give me a example. - 原始邮件 - 发件人:Timo Walther 收件人:user@flink.apache.org 主题:Re: modify coGroup GlobalWindows GlobalWindow 日期:2016年09月06日 17点52分 Hi, will words2 always remain constant? If yes, you don't have to create a stream out of it and coGroup it, but you could simply pass the collection to Map/FlatMap function and do the joining there without the need of a window. Btw. you know that non-keyed global windows do not scale? If I understand your code correctly, you just want to get a stream with the last T2, right? I don't think you have to implement your own "GlobalWindow" for that. Have you tried to use Flink's operator state for that? So that if the state is growing it can be written to disk. Hope that helps. Timo Am 06/09/16 um 10:05 schrieb rimin...@sina.cn: Hi, the follow code: val text = env.socketTextStream(hostName, port) val words1 = text.map { x => val res = x.split(",") (res.apply(0)->res.apply(1)) } val words2 =
回复:Re: 回复:Re: fromParallelCollection
my data from a Hbase table ,it is like a List[rowkey,Map[String,String]], class MySplittableIterator extends SplittableIterator[String]{ // Members declared in java.util.Iterator def hasNext(): Boolean = { } def next(): Nothing = { } // Members declared in org.apache.flink.util.SplittableIterator def getMaximumNumberOfSplits(): Int = { } def split(num: Int): Array[Iterator[String]] = { } } i do not know the methods to write,can you give me a example. - 原始邮件 - 发件人:Timo Walther收件人:user@flink.apache.org 主题:Re: 回复:Re: fromParallelCollection 日期:2016年09月06日 17点03分 Hi, you have to implement a class that extends "org.apache.flink.util.SplittableIterator". The runtime will ask this class for multiple "java.util.Iterator"s over your split data. How you split your data and how an iterator looks like depends on your data and implementation. If you need more help, you should show us some examples of your data. Timo Am 06/09/16 um 09:46 schrieb rimin...@sina.cn: fromCollection is not parallelization,the data is huge,so i want to use env.fromParallelCollection(data),but the data i do not know how to initialize, - 原始邮件 - 发件人:Maximilian Michels 收件人:"user@flink.apache.org" , rimin...@sina.cn 主题:Re: fromParallelCollection 日期:2016年09月05日 16点58分 Please give us a bit more insight on what you're trying to do. On Sat, Sep 3, 2016 at 5:01 AM, wrote: > Hi, > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tr = env.fromParallelCollection(data) > > the data i do not know initialize,some one can tell me.. > > > > -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr
回复:Re: modify coGroup GlobalWindows GlobalWindow
i try read data into a list or List[Map] to store the T2,but i think if use list or List[Map],there is not parallelization,so i want to use coGroup. other hand,the coGroup function is join the T1 and T2,and must have window and trigger method,the window is cut the T1 and T2, the trigger is trigger the apply function when input to the trigger threshold. from the result,in apply(), i use my InnerJoinFunction,and output the T1 and T2,we can see when input data,and trigger the apply,into the InnerJoinFunction,the T1 and T2 will output, the T1 is increase,and T2 is not change, so the window cut the T1 and T2 do not achieve mine goal,so i want to write my "GlobalWindows.create()". and Flink's operator state i have no ideal for it,and really do not know how to use it.can you give me a example. - 原始邮件 - 发件人:Timo Walther收件人:user@flink.apache.org 主题:Re: modify coGroup GlobalWindows GlobalWindow 日期:2016年09月06日 17点52分 Hi, will words2 always remain constant? If yes, you don't have to create a stream out of it and coGroup it, but you could simply pass the collection to Map/FlatMap function and do the joining there without the need of a window. Btw. you know that non-keyed global windows do not scale? If I understand your code correctly, you just want to get a stream with the last T2, right? I don't think you have to implement your own "GlobalWindow" for that. Have you tried to use Flink's operator state for that? So that if the state is growing it can be written to disk. Hope that helps. Timo Am 06/09/16 um 10:05 schrieb rimin...@sina.cn: Hi, the follow code: val text = env.socketTextStream(hostName, port) val words1 = text.map { x => val res = x.split(",") (res.apply(0)->res.apply(1)) } val words2 = env.fromElements(("a","w1"),("a","w2"),("c","w3"),("d","w4")) val joinedStream = words1 .coGroup(words2) .where(_._1) .equalTo(_._1) .window(GlobalWindows.create()) .trigger(CountTrigger.of(1)) val res = joinedStream.apply(new InnerJoinFunction).print() env.execute() class InnerJoinFunction extends CoGroupFunction[(String,String),(String,String),(String,String)]{ override def coGroup(T1: java.lang.Iterable[(String,String)], T2: java.lang.Iterable[(String,String)], out: Collector[(String, String)]): Unit = { println("") println("T1="+T1+"T2="+T2) import scala.collection.JavaConverters._ val scalaT2 = T2.asScala.toList if(!T1.asScala.isEmpty && scalaT2.nonEmpty){ val transaction = T1.asScala.last println("T2 last="+transaction) for(snapshot <- scalaT2){ out.collect(transaction._1,transaction._2+snapshot._2) } } } } the code have no proplem,and can run,the follow is the result:(input "a,1" then input "a,2") T1=[(a,1)]T2=[(a,w2), (a,w1)] T2 last=(a,1) 2> (a,1w2) 2> (a,1w1) T1=[(a,1), (a,2)]T2=[(a,w2), (a,w1)] T2 last=(a,2) 2> (a,2w2) 2> (a,2w1) -- the T1 is increase,and T2 is not change.i worry,when input so many,the T1 will out of storage. so i want to write my "GlobalWindows.create()", to achieve T1 will store the only one,from input(or read from kafka),and the history of T1 will clear(input a,1 T1 is [(a,1)],then input a,2,T1 is [(a,2)],not T1=[(a,1), (a,2)]),but T2 will not change. i rewrite the "GlobalWindows",but it do not work,i read the code,find must rewrite the "GlobalWindow",and must modify "the class Serializer extends TypeSerializer",but when i run,it can not into there,why? some can tell me? -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr
modify coGroup GlobalWindows GlobalWindow
Hi, the follow code: val text = env.socketTextStream(hostName, port)val words1 = text.map { x => val res = x.split(",") (res.apply(0)->res.apply(1))} val words2 = env.fromElements(("a","w1"),("a","w2"),("c","w3"),("d","w4")) val joinedStream = words1 .coGroup(words2) .where(_._1) .equalTo(_._1) .window(GlobalWindows.create()) .trigger(CountTrigger.of(1)) val res = joinedStream.apply(new InnerJoinFunction).print() env.execute() class InnerJoinFunction extends CoGroupFunction[(String,String),(String,String),(String,String)]{ override def coGroup(T1: java.lang.Iterable[(String,String)], T2: java.lang.Iterable[(String,String)], out: Collector[(String, String)]): Unit = {println("") println("T1="+T1+"T2="+T2) import scala.collection.JavaConverters._ val scalaT2 = T2.asScala.toList if(!T1.asScala.isEmpty && scalaT2.nonEmpty){ val transaction = T1.asScala.last println("T2 last="+transaction) for(snapshot <- scalaT2){ out.collect(transaction._1,transaction._2+snapshot._2) } }} } the code have no proplem,and can run,the follow is the result:(input "a,1" then input "a,2") T1=[(a,1)]T2=[(a,w2), (a,w1)]T2 last=(a,1)2> (a,1w2)2> (a,1w1)T1=[(a,1), (a,2)]T2=[(a,w2), (a,w1)]T2 last=(a,2)2> (a,2w2)2> (a,2w1) --the T1 is increase,and T2 is not change.i worry,when input so many,the T1 will out of storage.so i want to write my "GlobalWindows.create()", to achieve T1 will store the only one,from input(or read from kafka),and the history of T1 will clear(input a,1 T1 is [(a,1)],then input a,2,T1 is [(a,2)],not T1=[(a,1), (a,2)]),but T2 will not change. i rewrite the "GlobalWindows",but it do not work,i read the code,find must rewrite the "GlobalWindow",and must modify "the class Serializer extends TypeSerializer",but when i run,it can not into there,why? some can tell me?
回复:Re: fromParallelCollection
fromCollection is not parallelization,the data is huge,so i want to use env.fromParallelCollection(data),but the data i do not know how to initialize, - 原始邮件 - 发件人:Maximilian Michels收件人:"user@flink.apache.org" , rimin...@sina.cn 主题:Re: fromParallelCollection 日期:2016年09月05日 16点58分 Please give us a bit more insight on what you're trying to do. On Sat, Sep 3, 2016 at 5:01 AM, wrote: > Hi, > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tr = env.fromParallelCollection(data) > > the data i do not know initialize,some one can tell me.. > > > >
fromParallelCollection
Hi,val env = StreamExecutionEnvironment.getExecutionEnvironment val tr = env.fromParallelCollection(data) the data i do not know initialize,some one can tell me..
flink dataStream operate dataSet
Hi, i have a problem,a dataStream read from rabbitMQ,and others data from a hbase table,which is a dataSet.Those two data from follow: val words=connectHelper.readFromRabbitMq(...) // words is DataStream[String] val dataSet=HBaseWrite.fullScan() //dataSet is DataSet[(int,String)] words.map{ word => val res = dataSet.map{ y => val score = computerScore(x,y) (word,score) } HBaseWrite.writeToTable(res,...,) } the error is task not serializable,what is the solution? under a DataStream, how to operate a DataSet?
flink-shaded-hadoop
Hi,every one , when i use scala version 2.10,and set the sbt project(add those:flink-core,flink-scala,flink-streaming-scala,flink-kafka,flink-streaming-connectors,),the result download the flink-shaded-hadoop1_2.10.jar,but use scala version 2.11,i got flink-shaded-hadoop1_2.10.jar and flink-shaded-hadoop2_2.11.jar. why? some can tell me?
flink1.0 DataStream groupby
Hi,today,I use flink to rewrite my spark project,in spark ,data is rdd,and it have much transformations and actions,but in flink,the DataStream does not have groupby and foreach, for example,val env=StreamExecutionEnvironment.createLocalEnvironment() val data=List(("1"->"a"),("2"->"b"),("1"->"c"),("2"->"f")) val ds=env.fromCollection(data) val dskeyby=ds.groupBy(0) ds.print() env.execute() the code "val dskeyby=ds.groupBy(0)" is error,say "value groupBy is not a member of org.apache.flink.streaming.api.scala.DataStream"so , the solution is?