i try read data into a list or List[Map] to store the T2,but i think if use 
list or List[Map],there is not parallelization,so i want to use coGroup.
other hand,the  coGroup function is join the T1 and T2,and must have window and 
trigger method,the window is cut the T1 and T2,
the trigger is trigger  the apply function when input to the trigger threshold.

from the result,in apply(), i use my InnerJoinFunction,and output the T1 and 
T2,we can see when input data,and trigger the apply,into the 
InnerJoinFunction,the T1 and T2 will output,
the T1 is increase,and T2 is not change, so the window cut the T1 and T2 do not 
achieve mine goal,so i want to write my "GlobalWindows.create()".

and Flink's operator state i have no ideal for it,and really do not know how to 
use it.can you give me a example.
----- 原始邮件 -----
发件人:Timo Walther <twal...@apache.org>
收件人:user@flink.apache.org
主题:Re: modify coGroup GlobalWindows GlobalWindow
日期:2016年09月06日 17点52分


  
  
    Hi,

      

      will words2 always remain constant? If yes, you don't have to
      create a stream out of it and coGroup it, but you could simply
      pass the collection to Map/FlatMap function and do the joining
      there without the need of a window. Btw. you know that non-keyed
      global windows do not scale? 

      If I understand your code correctly, you just want to get a stream
      with the last T2, right? I don't think you have to implement your
      own "GlobalWindow" for that. Have you tried to use Flink's
      operator state for that? So that if the state is growing it can be
      written to disk.

      

      Hope that helps.

      

      Timo

      

      Am 06/09/16 um 10:05 schrieb rimin...@sina.cn:

    
    
      
        Hi,
              the follow code:
        

          
            val text =
            env.socketTextStream(hostName, port)
            val words1 = text.map { x =>
              val res = x.split(",")
              (res.apply(0)->res.apply(1))
            }
            
            val words2 =
            env.fromElements(("a","w1"),("a","w2"),("c","w3"),("d","w4"))
            val joinedStream = words1
              .coGroup(words2)
              .where(_._1)
              .equalTo(_._1)
              .window(GlobalWindows.create())
              .trigger(CountTrigger.of(1))
        

          
              val res = joinedStream.apply(new
            InnerJoinFunction).print()
        

          
            env.execute()
        

        
        

        
        
          class InnerJoinFunction extends
            CoGroupFunction[(String,String),(String,String),(String,String)]{
             
              override def coGroup(T1:
            java.lang.Iterable[(String,String)], 
                  T2: java.lang.Iterable[(String,String)], 
                  out: Collector[(String, String)]): Unit = {
                  println("****************************")
                  println("T1="+T1+"T2="+T2)
                import scala.collection.JavaConverters._
                val scalaT2 = T2.asScala.toList
                if(!T1.asScala.isEmpty &&
            scalaT2.nonEmpty){
                    val transaction = T1.asScala.last
                     println("T2 last="+transaction)
                    for(snapshot <- scalaT2){
                     
            out.collect(transaction._1,transaction._2+snapshot._2)
                    }
                }
              }
            }
        
        --------------------------------

        
        
          the code have no proplem,and can run,the
            follow is the result:(input "a,1" then input "a,2")
          

          
          
            ****************************
            T1=[(a,1)]T2=[(a,w2), (a,w1)]
            T2 last=(a,1)
            2> (a,1w2)
            2> (a,1w1)
            ****************************
            T1=[(a,1), (a,2)]T2=[(a,w2), (a,w1)]
            T2 last=(a,2)
            2> (a,2w2)
            2> (a,2w1)
          
          --------------------------------------------------
          the T1 is increase,and T2 is not change.i
            worry,when  input so many,the T1 will out of storage.
          so i want to write my
            "GlobalWindows.create()", to achieve T1 will store the only
            one,from input(or read from kafka),and the history of T1
            will clear(input a,1 T1 is [(a,1)],then input a,2,T1 is
            [(a,2)],not T1=[(a,1), (a,2)]),but T2 will not change.
          

          
          i rewrite the "GlobalWindows",but it do
            not work,i read the code,find must rewrite the
            "GlobalWindow",and must modify "the class Serializer extends
            TypeSerializer<MyGlobalWindow>",but when i run,it can
            not into there,why? some can tell me?
        
      
    
    

    

    
    -- 
Freundliche Grüße / Kind Regards

Timo Walther 

Follow me: @twalthr
https://www.linkedin.com/in/twalthr
  


Reply via email to