adevore3 commented on issue #9738:
URL: https://github.com/apache/iceberg/issues/9738#issuecomment-1954798452
Hi, I was trying to create a code sample but got distracted. I've tried to
copy all the relevant code pieces here, 1 thing I couldn't do is the protobuf
file. It seems to be failing on the `insertIntoTable` function call based on
the stacktrace
```
import com.indeed.osiris.iceberg.exporter.config.OutputTableDefinition
import com.indeed.osiris.iceberg.exporter.datasources.Datasource
import
com.indeed.osiris.iceberg.exporter.datasources.JobArchiveOsirisDatasource.getClass
import com.indeed.spark.hivesupport.SessionBuilder
import org.apache.hadoop.fs.Path
import org.apache.iceberg.Table
import org.apache.iceberg.spark.SparkCatalog
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import org.springframework.boot.builder.SpringApplicationBuilder
import org.springframework.boot.{CommandLineRunner, WebApplicationType}
import org.springframework.context.annotation.Bean
class OsirisIcebergExporter(
spark: SparkSession,
datasource: Datasource,
inputCheckpointBucket: Option[String],
inputCheckpointMillis: Option[Long],
outputTableDefinition: OutputTableDefinition,
) extends CommandLineRunner {
import OsirisIcebergExporter._
private val newHighWaterMark =
inputCheckpointMillis.getOrElse(System.currentTimeMillis())
private val inputCheckpointPath = inputCheckpointBucket.map(b => new
Path(s"s3a://$b/tmp/osiris_iceberg/jatt3/$newHighWaterMark"))
private val icebergCatalog =
spark.sessionState.catalogManager.catalog("outbox").asInstanceOf[SparkCatalog].icebergCatalog()
override def run(rawArgs: String*): Unit = {
try {
log.info(s"running OsirisIcebergExporter. new high water mark will is
$newHighWaterMark")
val df = inputDf(datasource.dataframe(spark))
df.createOrReplaceTempView("input")
val icebergTable = populateTable(df)
} finally {
// close connections
}
}
def populateTable(df: DataFrame): Table = {
// df not used in this example since we'll read from the temp view
val t = icebergCatalog.loadTable(outputTableDefinition.icebergIdentifier)
mergeIntoTable(spark, "input", t)
t
}
def mergeIntoTable(spark: SparkSession, view: String, icebergTable:
Table): Unit = {
val currentSnapshot = icebergTable.currentSnapshot()
log.info(s"mergeIntoTable >>> SET spark.wap.id =
${newHighWaterMark}_delete")
spark.sql(s"SET spark.wap.id = ${newHighWaterMark}_delete")
deleteFromTable(spark, view)
log.info(s"mergeIntoTable >>> SET spark.wap.id =
${newHighWaterMark}_insert")
spark.sql(s"SET spark.wap.id = ${newHighWaterMark}_insert")
insertIntoTable(spark, view)
log.info("mergeIntoTable >>> RESET spark.wap.id")
spark.sql(s"RESET spark.wap.id")
log.info("mergeIntoTable >>> icebergTable.refresh()")
icebergTable.refresh()
}
def deleteFromTable(spark: SparkSession, view: String): Unit = {
val identifierFieldTuple = "(jobId)"
spark.sql(
s"""
|DELETE FROM outbox.osiris_iceberg.jatt3
|WHERE $identifierFieldTuple in (select $identifierFieldTuple from
$view)
|""".stripMargin)
}
def insertIntoTable(spark: SparkSession, view: String): Unit = {
spark.sql(
s"""
|INSERT INTO (jobId)
|SELECT * FROM $view ORDER BY outbox.bucket(512, jobId), jobId
|""".stripMargin)
}
def inputDf(sourceDf: DataFrame): DataFrame = {
inputCheckpointPath.map(b => {
val fileSystem =
b.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (!fileSystem.exists(b)) {
//write the output, and then read it back
//which seems crazy, but it makes recovery faster if downstream
stages fail
//and an osiris partition needs to be reread (which is super slow)
//dont just checkpoint, cus then wed still only have 256 input
partitions
sourceDf.write
.option("maxRecordsPerFile", 500000) //this should probably
actually be configurable. assume 1k rows, goal is 512-1gb files
.orc(b.toString)
}
spark.createDataFrame(spark.read.orc(b.toString).rdd,
sourceDf.schema)
})
.getOrElse({
//if we didnt checkpoint in s3, we should cache
//not just optimization, needed for correctness
// (in case new records come into osiris between the delete and
insert)
sourceDf.cache()
sourceDf
})
}
}
object OsirisIcebergExporter {
private val log = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
java.security.Security.setProperty("networkaddress.cache.ttl", "60")
new SpringApplicationBuilder(classOf[OsirisIcebergExporter])
.web(WebApplicationType.NONE)
.run(args: _*)
.close()
}
}
@Bean
def spark(): SparkSession = {
val icebergCatalogConfig = Map(
s"spark.sql.catalog.outbox" -> "org.apache.iceberg.spark.SparkCatalog",
s"spark.sql.catalog.outbox.io-impl" ->
"org.apache.iceberg.aws.s3.S3FileIO",
s"spark.sql.catalog.outbox.catalog-impl" ->
"org.apache.iceberg.aws.glue.GlueCatalog",
s"spark.sql.catalog.outbox.glue.id" -> "936411429724",
//"spark.sql.catalog.iceberg.s3.staging-dir" -> "/tmp/" //should this be
configurable? is it even needed?
)
SessionBuilder(s"OsirisIcebergExporter-jatt3")
.withMoreProperties(icebergCatalogConfig)
.build
}
import org.apache.log4j.Level
case class SessionBuilder(
appName: String,
logLevel: Level = Level.INFO,
properties: Map[String, String] = Map.empty,
) {
def withMoreProperties(properties: Map[String, String]): SessionBuilder =
copy(properties = this.properties ++ properties)
def builder: SparkSession.Builder = {
SparkSession.builder
.appName(appName)
.enableHiveSupport
.config("spark.hadoop.hive.exec.dynamic.partition", "true")
.config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.hadoop.hive.exec.max.dynamic.partitions", "2048")
.config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs",
"false")
.config("spark.kryoserializer.buffer.max", "1g")
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.hive.metastorePartitionPruning", "true")
.config("spark.sql.orc.cache.stripe.details.size", "10000")
.config("spark.sql.orc.enabled", "true")
.config("spark.sql.orc.enableVectorizedReader", "true")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.orc.impl", "native")
.config("spark.sql.orc.splits.include.file.footer", "true")
.config("spark.sql.parquet.filterPushdown", "true")
.config("spark.sql.parquet.mergeSchema", "false")
.config("spark.sql.session.timeZone", "-06:00")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.ui.view.acls", "*")
}
def build: SparkSession = {
val sparkSession = builder.getOrCreate
sparkSession.sparkContext.setLogLevel(logLevel.toString)
sparkSession
}
}
@Bean
def datasource(): Datasource = {
val startTime: Long = DateTime.now().getMillis
val inputPartitions = 4
val useDfc = false
JobArchiveOsirisDatasource(startTime,
Option(inputPartitions).map(_.toInt), useDfc)
}
import com.example.proto.JobArchiveEntry
import org.apache.spark.sql.types.{StructField, StructType}
trait Datasource {
def dataframe(spark: SparkSession): DataFrame
def setNullable(df: DataFrame, fieldName: String, nullable: Boolean) :
DataFrame = {
val schema = df.schema
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) if c.equals(fieldName) => StructField(
c, t, nullable = nullable, m)
case y: StructField => y
})
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
}
case class OsirisRowv2(value: Array[Byte])
case class JobArchiveOsirisDatasource(minTs: Long, inputPartitions:
Option[Int], useDfc: Boolean) extends Datasource {
def dataframe(spark: SparkSession): DataFrame = {
import spark.implicits._
val dfcOptions: Map[String, String] =
if (useDfc)
Map(
"osiris.dfc"-> "true",
"osiris.dfc.datadir"-> "/osiris/",
"osiris.dfc.readahead"-> "4",
"osiris.s3.bucket" -> "cmhprod3-cdcosiris"
)
else
Map(
"servers" -> "osirisserver:26238"
)
val rawDf = spark.read
.format("osirisv2")
.option("keys", "jobId")
.option("keysplitter", "vlong-1000")
.option("table", "jobarchive_new")
.option("osiris.s3.region", "us-east-2")
.option("osiris.request.timeout", "180000")
.option("osiris.retry.timeout", "360000")
.option("minTs", minTs)
.options(dfcOptions)
.load()
val df = rawDf
.as[OsirisRowv2]
.map(r => RowTransformer.parseRow(r))
.toDF()
inputPartitions
.map(ps => df.repartition(ps))
.getOrElse(df)
}
}
object RowTransformer extends Serializable {
val transcoder = new ByteArrayTranscoder()
def parseRow(row: OsirisRowv2): JobInfo = {
val bytes = transcoder.toBytes(row.value)
val jobArchiveEntry = JobArchiveEntry.parseFrom(bytes)
JobInfo(
jobArchiveEntry.getJobId,
jobArchiveEntry.getTitleId,
)
}
}
case class JobInfo(
jobId: Long,
titleId: Int,
)
class ByteArrayTranscoder {
def fromBytes(bytes: Array[Byte]) = bytes
def toBytes(bytes: Array[Byte]) = bytes
def equals(o: AnyRef): Boolean = {
if (this eq o) return true
o != null && (getClass eq o.getClass)
}
override def toString = "byte"
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]