I forgot i was importing guava in this way
import com.google.common.hash.{HashFunction, Hashing}
including it in maven
but i had also the opportunity to use
import org.apache.flink.shaded.com.google.common.hash.{HashFunction, Hashing}
none of them is working properly
Il giorno 16/mag/2015, alle ore 12:20, Michele Bertoni
<[email protected]<mailto:[email protected]>> ha
scritto:
The first time I hash my data is in the reading phase: each line is added of
one field that is the hash of its file name, I do this with a custom reader
that extends the DelimitedInputFormat and override the open, nextRecord and
readRecord methods
/* … */
private var id : Long = 0L
override def open(split : FileInputSplit) = {
super.open (split)
//TODO hasher problem: guava fails, java hashcode works
//val hf : HashFunction = Hashing.sha256()
//id = hf.newHasher.putString(split.getPath.getName.toString,
Charsets.UTF_8).hash.asLong
id = (split.getPath.getName.toString).hashCode.toLong
}
override def readRecord(reusable : (FlinkRegionType), bytes : Array[Byte],
offset : Int, numBytes : Int) : (FlinkRegionType) = {
(parser(id, new String(bytes.slice(offset,offset+numBytes),
Charset.forName(charsetName))))
}
override def nextRecord(record : FlinkRegionType) : FlinkRegionType = {
try{
super.nextRecord(record)
} catch {
case e : ParsingException => {
logger.info(“Region Data format error in the tuple: " + e.getMessage)
nextRecord(record)
}
}
}
/* … */
Then every time I join two dataset or want to aggregate (groupBy) by many
different field of the tuple I create a new hash of the concatenation of the
respective id
val joinResult : DataSet[(Long, String, Long, Long, Char, Array[GValue],
List[Array[GValue]], Int, Long)] =
ref
.joinWithHuge(exp).where(0,2).equalTo(0,2){
(r : (String, Int, Int, Long, String, Long, Long, Char,
Array[GValue]), x : (String, Int, Int, Long, String, Long, Long, Char,
Array[GValue]), out : Collector[(Long, String, Long, Long, Char, Array[GValue],
List[Array[GValue]], Int, Long)]) => {
if(/* regions cross */) {
//TODO hasher problem: guava fails, java hashcode works
//val hashId = hf.newHasher().putString(r._4.toString +
x._4.toString, Charsets.UTF_8).hash.asLong
val hashId = (r._4.toString + x._4.toString).hashCode.toLong
//TODO hasher problem: guava fails, java hashcode works
//val aggregationId = hf.newHasher().putString(hashId.toString +
r._5.toString + r._6.toString + r._7.toString + r._8.toString + r._9.map((g) =>
g.toString).sorted.reduce(_ + _).toString, Charsets.UTF_8).hash.asLong
val aggregationId = (hashId.toString + r._5.toString +
r._6.toString + r._7.toString + r._8.toString + r._9.map((g) =>
g.toString).sorted.reduce(_ + _).toString).hashCode.toLong
out.collect(hashId, r._5, r._6, r._7, r._8, r._9, List(x._9), 1,
aggregationId)
}
}
}
This is just an example, I have two kind of data the one I showed is the core
data, then I have the meta data associated to the core via the same hash of the
original file name
Also on the meta I have similar functionality of joining grouping and re-hashing
Again with the java hashcode (see above) anything seems to work
Il giorno 16/mag/2015, alle ore 12:00, Fabian Hueske
<[email protected]<mailto:[email protected]>> ha scritto:
Invalid hash values can certainly cause non-deterministic results.
Can you provide a code snippet that shows how and where you used the Guava
Hasher?
2015-05-16 11:52 GMT+02:00 Michele Bertoni
<[email protected]<mailto:[email protected]>>:
Is it possible that is due to the hasher?
Inside my code i was using the google guava hasher (sha256 as a Long hash)
sometimes I got errors from it (ArrayOutOfBoundException) sometimes i just got
different hash for the same id, especially when running on an not-local
execution environment
I removed it anywhere and I started using the java hashcode, now it is seems to
work
> Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni
> <[email protected]<mailto:[email protected]>> ha
> scritto:
>
> Hi,
> it is 2 days i am going mad with a problem, every time i run the code (on the
> same dataset) i get a different result
>
> while i was trying debugging i found this
>
> i have this code
>
> val aggregationResult = //something that creates the dataset and uses join,
> group, reduce and map
> logger.error("res count " + aggregationResult.count)
> aggregationResult.print
>
>
>
> the logger prints a dataset size of 7
> the output result is made of 6 elements
>
> this happens randomly sometimes the result is larger than the count and
> sometimes they are both correct at 10
>
>
>
> flink version 0.9milestone1
>
> any idea of what can make it “not deterministic”?
> thanks for help