退订

2022-01-14 Thread rimin515
退订





Unsubscribe

2021-12-07 Thread rimin515
Unsubscribe




Unsubscribe

2021-11-23 Thread rimin515
Unsubscribe




Fetching TaskManager log failed

2017-12-25 Thread rimin515
I run a flink job,when run one hour,there have a error:
 ERROR org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler  - 
Fetching TaskManager log failed.
java.util.NoSuchElementException: None.get


flink yarn-cluster run job --files

2017-12-25 Thread rimin515
Hi,all
in spark,the submit job can have --files,this means" Comma-separated list of 
files to be placed in the working directory of each executor."
so,in flink,if there have the same method,i use --classpath file:///,but 
the job run error,there has not the file.


flink-1.2.0 java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/filter/Filter

2017-04-01 Thread rimin515
today,i use flink-1.2.0,and run a job on yarn,the commend is:
flink-1.2.0/bin/flink run \-m yarn-cluster \-yn 2 \-ys 4 \-yjm 3072 \-ytm 2048 
\--class statics.ComputevecSim \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-server.jar \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar \--classpath 
file:///opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar 
\--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar 
\/home/.../text-assembly-0.1.0.jar 
hdfs:///user/hadoop/wenhao/xj/wenda_search_aggregate_page.txt 
and have a error:Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.hbase.filter.Filter

but use flink-1.1.1 it can run success,some one can tell me how to solve this 
problem.


flink one transformation end,the next transformation start

2017-03-30 Thread rimin515
hi,all,i run a job,it is 
:-val data = 
env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]val dataVec = 
computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]val rescomm = 
computeCosSims (dataVec)//DataSet[(String,Array[(String,Double)])]
but when run on the yarn cluster,the result was error,the job can success;and 
run on the local,in eclipse on my computer,the result is correct.
so,i run twice,first:val data = 
env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]val dataVec = 
computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]dataVec.writeAsText("hdfs///vec")//the
 vector is correct,
second:val readVec = 
env.readTextFile("hdfs:///vec").map(...)//DataSet[(String,Int,Array[(Int,Double)])]val
 rescomm = computeCosSims 
(dataVec)//DataSet[(String,Array[(String,Double)])]and the result is correct,is 
the same as on local,in eclispe.--someone can 
solve the problem?


回复:Re: flink Broadcast

2017-03-24 Thread rimin515
yes,it is YARN single job,use the commend:
flink-1.1.1/bin/flink run -m yarn-cluster \-yn 2 \-ys 2 \-yjm 2048 \-ytm 2048 
\--class statics.ComputeDocSim \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar \--classpath 
file:///opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar 
\--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar 
\text-assembly-0.1.0.jar hdfs:///user/hadoop/tf-idf-ex 
hdfs:///user/hadoop/tf-idf-ex-sims
and code is: val to = //DataSet[(String, Vector)] val to = from.collect() val 
cosDistince = CosineDistanceMetric.apply() val res = from.map{x=>  val 
fromId = x._1  val docSims = to.filter(_._1!=fromId).map{y=>
val toId = y._1  val score = 1-cosDistince.distance(x._2, 
y._2)(toId,score)  
}.toList.sortWith((x,y)=>x._2>y._2).take(20) (fromId,docSims)   
}res.writeAsText(..)
- 原始邮件 -
发件人:Stephan Ewen 
收件人:user@flink.apache.org
抄送人:亘谷 
主题:Re: flink Broadcast
日期:2017年03月24日 17点40分

The program consists of two executions - one that only collects() back to the 
client, one that executes the map function.
Are you running this as a "YARN single job" execution? IN that case, there may 
be an issue that this incorrectly tries to submit to a stopping YARN cluster.


On Fri, Mar 24, 2017 at 10:32 AM, Robert Metzger  wrote:
Hi,
Can you provide more logs to help us understand whats going on?
One note regarding your application: You are calling .collect() and send the 
collection with the map() call to the cluster again.This is pretty inefficient 
and can potentially break your application (in particular the RPC system of 
Flink).
I would recommend to use broadcast variables to send the dataset to the map 
operator: 
https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables

On Thu, Mar 23, 2017 at 3:11 PM,   wrote:
Hi ,alll,

i have a 36000 documents,and the document all transfer a vector , one doc is a 
vector,and dimension is the same,so have DataSet



val data :DataSet[(String,SparseVector)]= //36000 record

val toData = data.collect()

val docSims = data.map{x=>

 val fromId=x._1

 val docsims = toData.filter{y=>y._1!=fromId}.map{y=>

  val score =1- cosDisticnce(x._2,y._2)

 (y._1,score)

 }.toList.sortWith{(a,b)=>a._2>b._2}.take(20)

   (fromId,docsims)

}

docSims.writeAsText(file)

.

when run the job on yarn,it will get error ,the message is following:

   java.lang.InterruptedException  at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)

at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)

at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)

at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)





someone can tell me ?thank you






org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl

2017-03-23 Thread rimin515
hi,i read file from hdfs,but there is error when run jon on yarn 
clutster,---val dataSeg = 
env.readTextFile("hdfs:///user/hadoop/text").filter(!_.startsWith("#")).map { x 
=> 
   val values = x.split("\t")
   (values.apply(0),values.apply(1).split(" "))
}

logger.info("dataSeg="+dataSeg.count())
the error is 
following:--2017-03-24
 11:32:15,012 INFO  
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Interrupted 
while waiting for queuejava.lang.InterruptedException
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)-hadoop
 is 2.6flink is 1.1.0-hadoop2.6-scala-2.11(the 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl is in 
flink-shaded-hadoop2-1.1.0 )


flink Broadcast

2017-03-23 Thread rimin515
Hi ,alll,
i have a 36000 documents,and the document all transfer a vector , one doc is a 
vector,and dimension is the same,so have DataSet

val data :DataSet[(String,SparseVector)]= //36000 record
val toData = data.collect()
val docSims = data.map{x=>
 val fromId=x._1
 val docsims = toData.filter{y=>y._1!=fromId}.map{y=>
  val score =1- cosDisticnce(x._2,y._2)
 (y._1,score)
 }.toList.sortWith{(a,b)=>a._2>b._2}.take(20)
   (fromId,docsims)
}
docSims.writeAsText(file)
.
when run the job on yarn,it will get error ,the message is following:
   java.lang.InterruptedException  at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)


someone can tell me ?thank you

回复:JVM Non Heap Memory

2016-11-29 Thread rimin515
i have the same problem,but i put the flink job into yarn.
but i put the job into yarn on the computer 22,and the job can success run,and 
the jobmanager is 79 and taskmanager is 69,they three different compu345ter,
however,on computer 22,the pid=3463,which is the job that put into yarn,is have 
2.3g memory,15% of total,
 the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 -ytm 1024 

why in conputer 22,has occupy so much momory?the job is running computer 79 and 
computer 69.
What would be the possible causes of such behavior ?
Best Regards,
- 原始邮件 -
发件人:Daniel Santos 
收件人:user@flink.apache.org
主题:JVM Non Heap Memory
日期:2016年11月29日 22点26分


Hello,
Is it common to have high usage of Non-Heap in JVM ?
I am running flink in stand-alone cluster and in docker, with each 
docker bieng capped at 6G of memory.
I have been struggling to keep memory usage in check.
The non-heap increases to no end. It start with just 100MB of usage and 
after a day it reaches to 1,3GB.
Then evetually reaches to 2GB and then eventually the docker is killed 
because it has reached the memory limit.
My configuration for each flink task manager is the following :
--- flink-conf.yaml --
taskmanager.heap.mb: 3072
taskmanager.numberOfTaskSlots: 8
taskmanager.memory.preallocate: false
taskmanager.network.numberOfBuffers: 12500
taskmanager.memory.off-heap: false
-
What would be the possible causes of such behavior ?
Best Regards,
Daniel Santos


flink-job-in-yarn,has max memory

2016-11-29 Thread rimin515
Hi, i have a flink job,and abt assembly to get a jar file,so i put it to 
yarn and run it,use the follow 
commend:/home/www/flink-1.1.1/bin/flink
 run \-m yarn-cluster \-yn 1 \-ys 2 \-yjm 4096 \-ytm 4096 \--class 
skRecomm.SkProRecommFlink \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar \--classpath 
file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar \--classpath 
file:///opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar 
\--classpath file:///opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar 
\/home/www/flink-mining/deploy/zx_article-7cffb87.jar 
---
the commend is in  supervisor on a 
computer(*,*,*,22),and  
flink/conf/flink-conf.yaml,i set those 
pargam,--fs.hdfs.hadoopconf: 
/etc/hadoop/conf/jobmanager.web.port: 8081parallelism.default: 
1taskmanager.memory.preallocate: falsetaskmanager.numberOfTaskSlots: 
1taskmanager.heap.mb: 512jobmanager.heap.mb: 256arallelism.default: 
1jobmanager.rpc.port: 6123jobmanager.rpc.address: localhost

--the job is success, can find follow 
message in yarn monitor,
flink.base.dir.path 
/data1/yarn/nm/usercache/work/appcache/application_1472623395420_36719/container_e03_1472623395420_36719_01_01fs.hdfs.hadoopconf
/etc/hadoop/conf/jobmanager.heap.mb 256jobmanager.rpc.address   
*.*.*.79  -(is not *.*.*.22,and taskmanager is *.*.*.69)jobmanager.rpc.port 
32987jobmanager.web.port0parallelism.default   
1recovery.zookeeper.path.namespace   
application_1472623395420_36719taskmanager.heap.mb  
512taskmanager.memory.preallocate   falsetaskmanager.numberOfTaskSlots  
1
-OverviewData Port  All 
Slots   Free Slots  CPU Cores   Physical Memory Free Memory 
Flink Managed Memory30471   2  032  
 189 GB 2.88 GB 1.96 
GB---MemoryJVM
 (Heap/Non-Heap)Type Committed   InitialMaximumHeap   2.92 GB   
  3.00 GB2.92 GBNon-Heap  53.4 MB 23.4 
MB130 MBTotal   2.97 GB3.02 GB3.04 
GB-Outside 
JVMType Count   UsedCapacityDirect  510860 KB   860 
KBMapped00 B 0 
B---
i find in computer(*,*,*,22),the pid=345 has 2.36g memory,and the pid=345 is 
the job that  from supervisor run,
i really do not know why ?the job was run in yarn ,why occupy so much memory in 
computer(*.*.*.22),i just run the job in computer(*.*.*.22).
thank you answer my question.


回复:How to Broadcast a very large model object (used in iterative scoring in recommendation system) in Flink

2016-09-30 Thread rimin515
your message is very short,i can not read more.the follow is my guss,
in flink,the dataStream is not for iterative computation,the dataSet would 
be more well.and fink suggest broadcast mini data,not large.

   your can load your model data (it can be from file,or table),before main 
function,andassignment to variable ,like name=yourModel.
 and the dataStream(it is a stream,unscored record,like DataStream[String] or 
DataStream[yourClass]),
and dataStream.map{x=>
  val score = computeScore(x,yourModel) 
}

object YourObject {

load your model 
val yourModel = ;

def main(){
   ...
read unscoreed record,from socket or kafka,or 

 dataStream.map{x=>
  val score = computeScore(x,yourModel) 
}
   ..
}
}
- 原始邮件 -
发件人:Anchit Jatana 
收件人:user@flink.apache.org
主题:How to Broadcast a very large model object (used in iterative scoring in 
recommendation system) in Flink
日期:2016年09月30日 14点15分

Hi All,
Im building a recommendation system streaming application for which I need 
to broadcast a very large model object (used in iterative scoring) among all 
the task managers performing the operation parallely for the operator
Im doing an this operation in map1 of CoMapFunction. Please suggest me 
some way to achieve the broadcasting of the large model variable (something 
similar to what Spark has with broadcast variables).
Thank you
Regards,Anchit



回复:Re: 回复:Re: modify coGroup GlobalWindows_GlobalWindow

2016-09-06 Thread rimin515
think your anwser.
but i can not get your ideal."If all elements of "words2" have been processed, 
the right side of your coGroup will always be empty no matter what is incoming 
in your socketTextStream.",the mean i can not get.

the following is the ideal from me(it maybe error):
 the coGroup will create new dataStream,T1 and T2,this must use 
GlobalWindows to store all elements from T2,if use timeWindow or others,the 
T2's element will not all store.
   --
   T1,  T2
   --
and into apply function,get result,
 when input first element,the T1 will add one element,
-
   T1(+first),  T2
   -
 and into apply function,get result.

when input second element,the T1 will add one element,
---
   T1(+first+second),  T2
   -
 and into apply function,get result.
 ***
but,in fact ,i want to get the datastream like this,
   ---
   T1,  T2
  
when input first ,is follow:
   
  T1(+first),  T2
  -
when input second, is follow:
   
  T1(+second),  T2
  
so the first must fired,this is my intention.

and i try to cut socket input datastream,use countWindow or timewindow,it is 
not work,when use coGroup,the datastream is T1 and T2,they are  a whole ,so i 
think must window the coGroup.
- 原始邮件 -
发件人:Timo Walther 
收件人:user@flink.apache.org
主题:Re: 回复:Re: modify coGroup GlobalWindows_GlobalWindow
日期:2016年09月06日 20点52分


  
  
I think you have to rethink your
  approach. In your example "words2" is a stream but only with a
  fixed set of elements. If all elements of "words2" have been
  processed, the right side of your coGroup will always be empty no
  matter what is incoming in your socketTextStream.

  It is not read in over and over again. Is that your intention?

  

  

  Am 06/09/16 um 13:15 schrieb rimin...@sina.cn:


i try read data into a list or List[Map] to store the
  T2,but i think if use list or List[Map],there is not
  parallelization,so i want to use coGroup.

  other hand,the coGroup function is join the T1 and T2,and must
  have window and trigger method,the window is cut the T1 and T2,

  the trigger is trigger the apply function when input to the
  trigger threshold.

  from the result,in apply(), i use my InnerJoinFunction,and output
  the T1 and T2,we can see when input data,and trigger the
  apply,into the InnerJoinFunction,the T1 and T2 will output,

  the T1 is increase,and T2 is not change, so the window cut the T1
  and T2 do not achieve mine goal,so i want to write my
  "GlobalWindows.create()".

  and Flink's operator state i have no ideal for it,and really do
  not know how to use it.can you give me a example.

  
- 原始邮件 -

  发件人:Timo Walther 

  收件人:user@flink.apache.org

  主题:Re: modify coGroup GlobalWindows GlobalWindow

  日期:2016年09月06日 17点52分




Hi,

  

  will words2 always remain constant? If yes, you don't have to
  create a stream out of it and coGroup it, but you could simply
  pass the collection to Map/FlatMap function and do the joining
  there without the need of a window. Btw. you know that
  non-keyed global windows do not scale? 

  If I understand your code correctly, you just want to get a
  stream with the last T2, right? I don't think you have to
  implement your own "GlobalWindow" for that. Have you tried to
  use Flink's operator state for that? So that if the state is
  growing it can be written to disk.

  

  Hope that helps.

  

  Timo

  

  Am 06/09/16 um 10:05 schrieb rimin...@sina.cn:



  
Hi,
      the follow code:


  
    val text =
env.socketTextStream(hostName, port)
    val words1 = text.map { x =>
      val res = x.split(",")
     
(res.apply(0)->res.apply(1))
    }
    
    val words2 =

回复:Re: 回复:Re: fromParallelCollection

2016-09-06 Thread rimin515
my data from a Hbase table ,it is like a List[rowkey,Map[String,String]],
class MySplittableIterator extends SplittableIterator[String]{


 // Members declared in java.util.Iterator
def hasNext(): Boolean = {
  
}
def next(): Nothing = {
  
}
  
  // Members declared in org.apache.flink.util.SplittableIterator
 def getMaximumNumberOfSplits(): Int = {
  
}
 def split(num: Int): Array[Iterator[String]] = {
  
}
}

i do not know the methods to write,can you give me a example.
- 原始邮件 -
发件人:Timo Walther 
收件人:user@flink.apache.org
主题:Re: 回复:Re: fromParallelCollection
日期:2016年09月06日 17点03分


  
  
Hi,

  

  you have to implement a class that extends
  
  "org.apache.flink.util.SplittableIterator". The runtime will ask
  this class for multiple "java.util.Iterator"s over your split
  data. How you split your data and how an iterator looks like
  depends on your data and implementation.
  
  

  

  If you need more help, you should show us some examples of your
  data.

  

  Timo

  

  Am 06/09/16 um 09:46 schrieb rimin...@sina.cn:


fromCollection is not parallelization,the data is
  huge,so i want to use env.fromParallelCollection(data),but the
  data i do not know how to initialize,

  
- 原始邮件 -

  发件人:Maximilian Michels 

  收件人:"user@flink.apache.org" ,
  rimin...@sina.cn

  主题:Re: fromParallelCollection

  日期:2016年09月05日 16点58分






Please give us a bit more insight on what you're trying to do.

On Sat, Sep 3, 2016 at 5:01 AM,  wrote:

> Hi,

> val env =
StreamExecutionEnvironment.getExecutionEnvironment

> val tr = env.fromParallelCollection(data)

>

> the data i do not know initialize,some one can tell me..

> 

>

>

>

  






-- 
Freundliche Grüße / Kind Regards

Timo Walther 

Follow me: @twalthr
https://www.linkedin.com/in/twalthr
  




回复:Re: modify coGroup GlobalWindows GlobalWindow

2016-09-06 Thread rimin515
i try read data into a list or List[Map] to store the T2,but i think if use 
list or List[Map],there is not parallelization,so i want to use coGroup.
other hand,the  coGroup function is join the T1 and T2,and must have window and 
trigger method,the window is cut the T1 and T2,
the trigger is trigger  the apply function when input to the trigger threshold.

from the result,in apply(), i use my InnerJoinFunction,and output the T1 and 
T2,we can see when input data,and trigger the apply,into the 
InnerJoinFunction,the T1 and T2 will output,
the T1 is increase,and T2 is not change, so the window cut the T1 and T2 do not 
achieve mine goal,so i want to write my "GlobalWindows.create()".

and Flink's operator state i have no ideal for it,and really do not know how to 
use it.can you give me a example.
- 原始邮件 -
发件人:Timo Walther 
收件人:user@flink.apache.org
主题:Re: modify coGroup GlobalWindows GlobalWindow
日期:2016年09月06日 17点52分


  
  
Hi,

  

  will words2 always remain constant? If yes, you don't have to
  create a stream out of it and coGroup it, but you could simply
  pass the collection to Map/FlatMap function and do the joining
  there without the need of a window. Btw. you know that non-keyed
  global windows do not scale? 

  If I understand your code correctly, you just want to get a stream
  with the last T2, right? I don't think you have to implement your
  own "GlobalWindow" for that. Have you tried to use Flink's
  operator state for that? So that if the state is growing it can be
  written to disk.

  

  Hope that helps.

  

  Timo

  

  Am 06/09/16 um 10:05 schrieb rimin...@sina.cn:



  
Hi,
      the follow code:


  
    val text =
env.socketTextStream(hostName, port)
    val words1 = text.map { x =>
      val res = x.split(",")
      (res.apply(0)->res.apply(1))
    }
    
    val words2 =
env.fromElements(("a","w1"),("a","w2"),("c","w3"),("d","w4"))
    val joinedStream = words1
      .coGroup(words2)
      .where(_._1)
      .equalTo(_._1)
      .window(GlobalWindows.create())
      .trigger(CountTrigger.of(1))


  
      val res = joinedStream.apply(new
InnerJoinFunction).print()


  
    env.execute()







  class InnerJoinFunction extends
CoGroupFunction[(String,String),(String,String),(String,String)]{
     
      override def coGroup(T1:
java.lang.Iterable[(String,String)], 
          T2: java.lang.Iterable[(String,String)], 
          out: Collector[(String, String)]): Unit = {
          println("")
          println("T1="+T1+"T2="+T2)
        import scala.collection.JavaConverters._
        val scalaT2 = T2.asScala.toList
        if(!T1.asScala.isEmpty &&
scalaT2.nonEmpty){
            val transaction = T1.asScala.last
             println("T2 last="+transaction)
            for(snapshot <- scalaT2){
             
out.collect(transaction._1,transaction._2+snapshot._2)
            }
        }
      }
    }





  the code have no proplem,and can run,the
follow is the result:(input "a,1" then input "a,2")
  

  
  

T1=[(a,1)]T2=[(a,w2), (a,w1)]
T2 last=(a,1)
2> (a,1w2)
2> (a,1w1)

T1=[(a,1), (a,2)]T2=[(a,w2), (a,w1)]
T2 last=(a,2)
2> (a,2w2)
2> (a,2w1)
  
  --
  the T1 is increase,and T2 is not change.i
worry,when  input so many,the T1 will out of storage.
  so i want to write my
"GlobalWindows.create()", to achieve T1 will store the only
one,from input(or read from kafka),and the history of T1
will clear(input a,1 T1 is [(a,1)],then input a,2,T1 is
[(a,2)],not T1=[(a,1), (a,2)]),but T2 will not change.
  

  
  i rewrite the "GlobalWindows",but it do
not work,i read the code,find must rewrite the
"GlobalWindow",and must modify "the class Serializer extends
TypeSerializer",but when i run,it can
not into there,why? some can tell me?

  






-- 
Freundliche Grüße / Kind Regards

Timo Walther 

Follow me: @twalthr
https://www.linkedin.com/in/twalthr
  




modify coGroup GlobalWindows GlobalWindow

2016-09-06 Thread rimin515
Hi,  the follow code:
val text = env.socketTextStream(hostName, port)val words1 = text.map { 
x =>  val res = x.split(",")  (res.apply(0)->res.apply(1))}
val words2 = env.fromElements(("a","w1"),("a","w2"),("c","w3"),("d","w4"))
val joinedStream = words1  .coGroup(words2)  .where(_._1)  
.equalTo(_._1)  .window(GlobalWindows.create())  
.trigger(CountTrigger.of(1))
  val res = joinedStream.apply(new InnerJoinFunction).print()
env.execute()

class InnerJoinFunction extends 
CoGroupFunction[(String,String),(String,String),(String,String)]{   
override def coGroup(T1: java.lang.Iterable[(String,String)], T2: 
java.lang.Iterable[(String,String)], out: Collector[(String, String)]): 
Unit = {println("")
println("T1="+T1+"T2="+T2)  import scala.collection.JavaConverters._  
val scalaT2 = T2.asScala.toList  if(!T1.asScala.isEmpty && 
scalaT2.nonEmpty){  val transaction = T1.asScala.last   
println("T2 last="+transaction)  for(snapshot <- scalaT2){
out.collect(transaction._1,transaction._2+snapshot._2)  }  }}  
}

the code have no proplem,and can run,the follow is the result:(input "a,1" then 
input "a,2")
T1=[(a,1)]T2=[(a,w2), (a,w1)]T2 last=(a,1)2> 
(a,1w2)2> (a,1w1)T1=[(a,1), (a,2)]T2=[(a,w2), 
(a,w1)]T2 last=(a,2)2> (a,2w2)2> (a,2w1)
--the T1 is increase,and T2 is 
not change.i worry,when  input so many,the T1 will out of storage.so i want to 
write my "GlobalWindows.create()", to achieve T1 will store the only one,from 
input(or read from kafka),and the history of T1 will clear(input a,1 T1 is 
[(a,1)],then input a,2,T1 is [(a,2)],not T1=[(a,1), (a,2)]),but T2 will not 
change.
i rewrite the "GlobalWindows",but it do not work,i read the code,find must 
rewrite the "GlobalWindow",and must modify "the class Serializer extends 
TypeSerializer",but when i run,it can not into there,why? some 
can tell me?

回复:Re: fromParallelCollection

2016-09-06 Thread rimin515
fromCollection is not parallelization,the data is huge,so i want to use 
env.fromParallelCollection(data),but the data i do not know how to  initialize,
- 原始邮件 -
发件人:Maximilian Michels 
收件人:"user@flink.apache.org" , rimin...@sina.cn
主题:Re: fromParallelCollection
日期:2016年09月05日 16点58分


Please give us a bit more insight on what you're trying to do.
On Sat, Sep 3, 2016 at 5:01 AM,   wrote:
> Hi,
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val tr = env.fromParallelCollection(data)
>
> the data i do not know initialize,some one can tell me..
> 
>
>
>


fromParallelCollection

2016-09-02 Thread rimin515
Hi,val env = StreamExecutionEnvironment.getExecutionEnvironment  val tr 
= env.fromParallelCollection(data)
the data i do not know initialize,some one can tell me..








flink dataStream operate dataSet

2016-08-30 Thread rimin515
Hi, i have a problem,a dataStream read from rabbitMQ,and others data from a 
hbase table,which is a dataSet.Those two data from follow:
 val words=connectHelper.readFromRabbitMq(...)  // words is 
DataStream[String] val dataSet=HBaseWrite.fullScan()  //dataSet is 
DataSet[(int,String)]
 words.map{ word => val res = dataSet.map{ y => 
  val score = computerScore(x,y)   (word,score)  }  
   HBaseWrite.writeToTable(res,...,) }
   the  error is task not serializable,what is the solution?  under a 
DataStream, how to operate a DataSet?







flink-shaded-hadoop

2016-08-21 Thread rimin515
Hi,every one , when i use scala version 2.10,and set the sbt project(add 
those:flink-core,flink-scala,flink-streaming-scala,flink-kafka,flink-streaming-connectors,),the
 result download the flink-shaded-hadoop1_2.10.jar,but use scala version 2.11,i 
got flink-shaded-hadoop1_2.10.jar and flink-shaded-hadoop2_2.11.jar. why? some 
can tell me?










flink1.0 DataStream groupby

2016-07-21 Thread rimin515
Hi,today,I use flink to rewrite my spark project,in spark ,data is rdd,and 
it have much transformations and actions,but in flink,the DataStream does not 
have groupby and foreach,  for example,val 
env=StreamExecutionEnvironment.createLocalEnvironment()  val 
data=List(("1"->"a"),("2"->"b"),("1"->"c"),("2"->"f"))  val 
ds=env.fromCollection(data)  val dskeyby=ds.groupBy(0)  ds.print() 
env.execute()
the code "val dskeyby=ds.groupBy(0)" is error,say "value groupBy is not a 
member of org.apache.flink.streaming.api.scala.DataStream"so , the solution 
is?