Todd Lipcon has posted comments on this change.

Change subject: Spark connectors for Kudu
......................................................................


Patch Set 1:

(27 comments)

took a quick pass. did you verify that the spark tests are now running as part 
of a normal mvn test run? or do we need to change some scripts too?

http://gerrit.cloudera.org:8080/#/c/1788/1/java/kudu-spark/pom.xml
File java/kudu-spark/pom.xml:

Line 108:                     <groupId>org.jruby</groupId>
I find it surprising you have to have these exclusions, because AFAIK 
hadoop-client doesn't have any of these things as dependencies. Maybe these are 
holdovers from some copy-pasted code from spark-on-hbase?


Line 214:                                 -Xmx1536m -XX:MaxPermSize=512m 
-XX:ReservedCodeCacheSize=512m
curious where this stuff got cargo-culted from :)


http://gerrit.cloudera.org:8080/#/c/1788/1/java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala
File java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala:

Line 40:    * Is given input from SparkSQL to construct a BaseRelation.
style: not really proper javadoc


Line 50:       new Throwable("Invalid value for " + TABLE_KEY +" '" + tableName 
+ "'")
hrm... I dont know Scala really, but this seems to be a no-op.. you construct a 
Throwable and then dont throw it? Throwable seems like an odd choice too?


Line 66:                     val kuduMaster: String) (
inconsistent style: spaces or not after the ':'?


Line 70:   // Create or get latest KuduContext.
what's "latest"?


Line 82:       kuduSchemaColumnMap = buildKuduSchemaColumnMap(kuduSchema)
not really following this block - isn't this duplicate with the various class 
members above?


Line 103:    * @return schema generated from the SCHEMA_COLUMNS_MAPPING_KEY 
value
this is hbase cruft


Line 111:     val columnIt = kuduSchema.getColumns.iterator()
why explicitly use an iterator instead of a more normal foreach loop? or maybe 
something Scala-like like:

for ((col, colIdx) <- List(kuduSchema.getColumns).zipWithIndex) {
}


Line 112:     var indexCounter = 0
'indexCounter' is unnecessarily verbose. 'colIdx' perhaps


Line 116:       val columnSparkSqlType = if (c.getType.equals(Type.BOOL)) 
BooleanType
seems like we should have a static final hashmap which maps from kudu types to 
spark types rather than this uglier sequence of ifs


Line 125:       else throw new Throwable("Unsupported column type :" + 
c.getType)
nit: should be "Unsupported column type: " (misplaced space and colon)


Line 129:         new StructField(c.getName, columnSparkSqlType, nullable = 
true, metadata)
shouldn't we pass down nullability from the kudu column schema?


Line 136:     result
is this common scala style vs just ending with 'new 
StructType(structFieldArray)'?


Line 141:    * Here is where we will do the following:
weird style. "here we are" isn't normal javadoc


Line 148:    * @return                RDD will all the results from HBase 
needed for SparkSQL to
Kudu, not HBase


Line 155:     if (resultRDD == null) {
what's the point of this if? you just set it to null


Line 163:       })
I think the above 5 lines could be something like:

val colsStr = requiredColumns.mkString(",")


Line 168:         Row.fromSeq(requiredColumns.map(c => getKuduValue(c, 
rowResults)))
perf-wise, it would be better to do a mapping above from column names to column 
indexes, so you could use the indexes here. Worth a TODO at least.


Line 179:     val columnType = columnSchema.getType
this is fishy because above you use getOrElse(, null) and here you assume it's 
not null. better to check and throw some exception for missing column


Line 190:     else if (columnType == Type.STRING) row.getString(columnName)
worht an else and throw an error. also again a static final mapping would be 
cleaner than the sequence of ifs


http://gerrit.cloudera.org:8080/#/c/1788/1/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala
File java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala:

Line 129:   def kuduRDD(tableName: String, columnProjection: String = null):
is this a public API? needs some doc. taking the string version of 
columnProjection doesn't seem very clean either vs an array of column names


Line 133:     conf.set("kudu.mapreduce.master.address", kuduMaster)
would be good to use public APIs here for setters instead of hard-coding the 
strings (I think we have some)


Line 142:     rdd
seems like you might want the kuduRDD to not be key/value pairs, but rather get 
rid of the silly NullWritable before returning to the user (eg by returning 
rdd.values, which I think exists)


Line 147:    *  Underlining wrapper all foreach functions in KuduContext.
underlying?


http://gerrit.cloudera.org:8080/#/c/1788/1/java/kudu-spark/src/main/scala/org/kududb/spark/KuduDStreamFunctions.scala
File java/kudu-spark/src/main/scala/org/kududb/spark/KuduDStreamFunctions.scala:

Line 28: object KuduDStreamFunctions {
maybe we should delay the Streaming stuff to a second commit?


http://gerrit.cloudera.org:8080/#/c/1788/1/java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextSuite.scala
File java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextSuite.scala:

Line 80:     val scanList = scanRdd.map(r => r._2.getInt(0)).collect()
yea, here is where it becomes obvious that it's goofy that we return (null, 
row) pairs instead of just rows


-- 
To view, visit http://gerrit.cloudera.org:8080/1788
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ic187513ef9724d50024f7401d7ecd19d53554245
Gerrit-PatchSet: 1
Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-Owner: Jean-Daniel Cryans
Gerrit-Reviewer: Dan Burkert <[email protected]>
Gerrit-Reviewer: Internal Jenkins
Gerrit-Reviewer: Mike Percy <[email protected]>
Gerrit-Reviewer: Todd Lipcon <[email protected]>
Gerrit-HasComments: Yes

Reply via email to