Hi, I'm currently creating RDDs using a pattern like follows:
val rdd: RDD[String] = session.sparkContext.parallelize(longkeys).flatMap( key => { logInfo(s"job at key: ${key}") Source.fromBytes(S3Util.getBytes(S3Util.getClient(region, S3Util.getCredentialsProvider("INSTANCE", "")), bucket, key)) .getLines() } ) We've been using this pattern or similar to workaround issues regarding S3 and our hadoop version. However, this same pattern could might be applied to other types of data sources, which may not have a connector. This method has been working out fairly well, but I'd like more control regarding how the data is partitioned from the start. I tried to manually partition the data without a partitioner, but got JVM exceptions regarding my Arrays being to large for the JVM. val keyList = groupedKeys.keys.toList val rdd: RDD[String] = session.sparkContext.parallelize(keyList,keyList.length).flatMap { key => logInfo(s"job at day: ${key}") val byteArrayBuffer = new ArrayBuffer[Byte]() val objectKeyList: List[String] = groupedKeys(key) objectKeyList.foreach( objectKey => { logInfo(s"working on object: ${objectKey}") byteArrayBuffer.appendAll(S3Util.getBytes(S3Util.getClient(region, S3Util.getCredentialsProvider("INSTANCE", "")), bucket, objectKey)) } ) Source.fromBytes(byteArrayBuffer.toArray[Byte]).getLines() } Then I've defined a custom partitioner based on my source data: class dayPartitioner(keys: List[String]) extends Partitioner with Logger { val keyMap: Map[String, List[String]] = keys.groupBy(_.substring(0, 10)) val partitions = keyMap.keySet.size val partitionMap: Map[String, Int] = keyMap.keys.zipWithIndex.toMap override def getPartition(key: Any): Int = { val keyString = key.asInstanceOf[String] val partitionKey = keyString.substring(0, 10) partitionMap(partitionKey) } override def numPartitions: Int = partitions } } I'd like to know do I have to create a custom RDD class to specify my RDD and use it like in the pattern above? If so I'd also like a reference regarding doing this, to hopefully save me some headaches and gotchas from a naive approach. I've found one such example https://stackoverflow.com/a/25204589 but it's from an older version of Spark. I'm hoping maybe there is something more recent and more in-depth. I don't mind references to books or otherwise. Best, Colin Williams --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org