Todd Lipcon has posted comments on this change. Change subject: Spark connectors for Kudu ......................................................................
Patch Set 1: (27 comments) took a quick pass. did you verify that the spark tests are now running as part of a normal mvn test run? or do we need to change some scripts too? http://gerrit.cloudera.org:8080/#/c/1788/1/java/kudu-spark/pom.xml File java/kudu-spark/pom.xml: Line 108: <groupId>org.jruby</groupId> I find it surprising you have to have these exclusions, because AFAIK hadoop-client doesn't have any of these things as dependencies. Maybe these are holdovers from some copy-pasted code from spark-on-hbase? Line 214: -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m curious where this stuff got cargo-culted from :) http://gerrit.cloudera.org:8080/#/c/1788/1/java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala File java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala: Line 40: * Is given input from SparkSQL to construct a BaseRelation. style: not really proper javadoc Line 50: new Throwable("Invalid value for " + TABLE_KEY +" '" + tableName + "'") hrm... I dont know Scala really, but this seems to be a no-op.. you construct a Throwable and then dont throw it? Throwable seems like an odd choice too? Line 66: val kuduMaster: String) ( inconsistent style: spaces or not after the ':'? Line 70: // Create or get latest KuduContext. what's "latest"? Line 82: kuduSchemaColumnMap = buildKuduSchemaColumnMap(kuduSchema) not really following this block - isn't this duplicate with the various class members above? Line 103: * @return schema generated from the SCHEMA_COLUMNS_MAPPING_KEY value this is hbase cruft Line 111: val columnIt = kuduSchema.getColumns.iterator() why explicitly use an iterator instead of a more normal foreach loop? or maybe something Scala-like like: for ((col, colIdx) <- List(kuduSchema.getColumns).zipWithIndex) { } Line 112: var indexCounter = 0 'indexCounter' is unnecessarily verbose. 'colIdx' perhaps Line 116: val columnSparkSqlType = if (c.getType.equals(Type.BOOL)) BooleanType seems like we should have a static final hashmap which maps from kudu types to spark types rather than this uglier sequence of ifs Line 125: else throw new Throwable("Unsupported column type :" + c.getType) nit: should be "Unsupported column type: " (misplaced space and colon) Line 129: new StructField(c.getName, columnSparkSqlType, nullable = true, metadata) shouldn't we pass down nullability from the kudu column schema? Line 136: result is this common scala style vs just ending with 'new StructType(structFieldArray)'? Line 141: * Here is where we will do the following: weird style. "here we are" isn't normal javadoc Line 148: * @return RDD will all the results from HBase needed for SparkSQL to Kudu, not HBase Line 155: if (resultRDD == null) { what's the point of this if? you just set it to null Line 163: }) I think the above 5 lines could be something like: val colsStr = requiredColumns.mkString(",") Line 168: Row.fromSeq(requiredColumns.map(c => getKuduValue(c, rowResults))) perf-wise, it would be better to do a mapping above from column names to column indexes, so you could use the indexes here. Worth a TODO at least. Line 179: val columnType = columnSchema.getType this is fishy because above you use getOrElse(, null) and here you assume it's not null. better to check and throw some exception for missing column Line 190: else if (columnType == Type.STRING) row.getString(columnName) worht an else and throw an error. also again a static final mapping would be cleaner than the sequence of ifs http://gerrit.cloudera.org:8080/#/c/1788/1/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala File java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala: Line 129: def kuduRDD(tableName: String, columnProjection: String = null): is this a public API? needs some doc. taking the string version of columnProjection doesn't seem very clean either vs an array of column names Line 133: conf.set("kudu.mapreduce.master.address", kuduMaster) would be good to use public APIs here for setters instead of hard-coding the strings (I think we have some) Line 142: rdd seems like you might want the kuduRDD to not be key/value pairs, but rather get rid of the silly NullWritable before returning to the user (eg by returning rdd.values, which I think exists) Line 147: * Underlining wrapper all foreach functions in KuduContext. underlying? http://gerrit.cloudera.org:8080/#/c/1788/1/java/kudu-spark/src/main/scala/org/kududb/spark/KuduDStreamFunctions.scala File java/kudu-spark/src/main/scala/org/kududb/spark/KuduDStreamFunctions.scala: Line 28: object KuduDStreamFunctions { maybe we should delay the Streaming stuff to a second commit? http://gerrit.cloudera.org:8080/#/c/1788/1/java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextSuite.scala File java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextSuite.scala: Line 80: val scanList = scanRdd.map(r => r._2.getInt(0)).collect() yea, here is where it becomes obvious that it's goofy that we return (null, row) pairs instead of just rows -- To view, visit http://gerrit.cloudera.org:8080/1788 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic187513ef9724d50024f7401d7ecd19d53554245 Gerrit-PatchSet: 1 Gerrit-Project: kudu Gerrit-Branch: master Gerrit-Owner: Jean-Daniel Cryans Gerrit-Reviewer: Dan Burkert <[email protected]> Gerrit-Reviewer: Internal Jenkins Gerrit-Reviewer: Mike Percy <[email protected]> Gerrit-Reviewer: Todd Lipcon <[email protected]> Gerrit-HasComments: Yes
