unsubscribe error

2016-06-18 Thread Marco Platania
Dear admin,
I've tried to unsubscribe from this mailing list twice, but I'm still receiving 
emails. Can you please fix this?
Thanks,Marco

unsubscribe

2016-06-16 Thread Marco Platania
unsubscribe

Neither previous window has value for key, nor new values found

2016-06-10 Thread Marco Platania
Hi all, 

I'm running a Spark Streaming application that uses reduceByKeyAndWindow(). The 
window interval is 2 hours, while the slide interval is 1 hour. I have a 
JavaPairRDD in which both keys and values are strings. Each time the 
reduceByKeyAndWindow() function is called, it uses appendString() and 
removeString() functions below to incrementally build a windowed stream of 
data: 

Function2 appendString = new Function2() { 
      @Override 
      public String call(String s1, String s2) { 
        return s1 + s2; 
      } 
    }; 

    Function2 removeString = new Function2() { 
      @Override 
      public String call(String s1, String s2) { 
        return s1.replace(s2, ""); 
      } 
    }; 

filterEmptyRecords() removes keys that eventually won't contain any value: 

    Function, Boolean> filterEmptyRecords = new 
Function, Boolean>() { 
      @Override 
      public Boolean call(scala.Tuple2 t) { 
        return (!t._2().isEmpty()); 
      } 
    }; 

The windowed operation is then: 

JavaPairDStream cdr_kv = 
cdr_filtered.reduceByKeyAndWindow(appendString, removeString, 
Durations.seconds(WINDOW_DURATION), Durations.seconds(SLIDE_DURATION), 
PARTITIONS, filterEmptyRecords); 

After a few hours of operation, this function raises the following exception: 
"Neither previous window has value for key, nor new values found. Are you sure 
your key class hashes consistently?" 

I've found this post from 2013: 
https://groups.google.com/forum/#!msg/spark-users/9OM1YvWzwgE/PhFgdSTP2OQJ
which however doesn't solve my problem. I'm using String to represent keys, 
which I'm pretty sure hash consistently. 

Any clue why this happens and possible suggestions to fix it? 

Thanks!


How to carry data streams over multiple batch intervals in Spark Streaming

2016-05-21 Thread Marco Platania
Hi experts,I'm using Apache Spark Streaming 1.6.1 to write a Java application 
that joins two Key/Value data streams and writes the output to HDFS. The two 
data streams contain K/V strings and are periodically ingested in Spark from 
HDFS by using textFileStream().
The two data streams aren't synchronized, which means that some keys that are 
in stream1 at time t0 may appear in stream2 at time t1, or the vice versa. 
Hence, my goal is to join the two streams and compute "leftover" keys, which 
should be considered for the join operation in the next batch intervals.To 
better clarify this, look at the following algorithm:variables:
stream1 =  input stream at time t1
stream2 =  input stream at time t1
left_keys_s1 =  records of stream1 that didn't appear in the 
join at time t0
left_keys_s2 =  records of stream2 that didn't appear in the 
join at time t0

operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be 
used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be 
used at time t2)
I've tried to implement this algorithm with Spark Streaming unsuccessfully. 
Initially, I create two empty streams for leftover keys in this way (this is 
only one stream, but the code to generate the second stream is 
similar):JavaRDD empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue q = new LinkedList();
q.add(empty_rdd);
JavaDStream empty_dstream = jssc.queueStream(q);
JavaPairDStream k1 = empty_dstream.mapToPair(new 
PairFunction () {
 @Override
 public scala.Tuple2 
call(String s) {
   return new scala.Tuple2(s, s);
 }
   });
Later on, this empty stream is unified (i.e., union()) with stream1 and 
finally, after the join, I add the leftover keys from stream1 and call 
window(). The same happens with stream2.The problem is that the operations that 
generate left_keys_s1 and left_keys_s2 are transformations without actions, 
which means that Spark doesn't create any RDD flow graph and, hence, they are 
never executed. What I get right now is a join that outputs only the records 
whose keys are in stream1 and stream2 in the same time interval.Do you guys 
have any suggestion to implement this correctly with Spark?Thanks, Marco