jasonf20 commented on PR #9222: URL: https://github.com/apache/iceberg/pull/9222#issuecomment-1841290690
Hi @nastra I’m not very familiar with this codebase and adding tests that will fail at the right time would take me a long time. Unfortunately, I can’t do that right now. However, below is the code I used to reproduce the issue locally: ```scala import java.util import java.util.UUID import scala.collection.JavaConverters._ import org.apache.iceberg.aws.glue.GlueCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.data.GenericRecord import org.apache.iceberg.data.parquet.GenericParquetWriter import org.apache.iceberg.parquet.Parquet import org.apache.iceberg.types.Types import org.apache.iceberg.{DataFile, PartitionSpec, Schema, Table, data} object TestRewriteCommits { def main(args: Array[String]): Unit = { val catalog = new GlueCatalog() catalog.initialize("iceberg", Map.empty[String, String].asJava) val schema = new Schema( Types.NestedField.required(1, "id", Types.StringType.get()), ); val tableName = "temp4" val tableId = TableIdentifier.of("prod_iceberg", tableName) val basePath = s"s3://s3-bucket-path/ice/tables/${tableName}/" val tableProperties: util.Map[String, String] = Map( "format-version" -> "2", "commit.retry.num-retries" -> "0" //turn off retries for more control during testing process ).asJava if (!catalog.tableExists(tableId)) { catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), basePath, tableProperties) } val table = catalog.loadTable(tableId) val addedFiles = (1 to 2).map(i => { val file: DataFile = writeFile(basePath, table) val append = table.newAppend() append.appendFile(file) append.commit() file }) val transaction = table.newTransaction() val rewrite = transaction.newRewrite() addedFiles.foreach(rewrite.deleteFile) rewrite.addFile(writeFile(basePath, table)) rewrite.commit() try { // Make sure this commit fails (I failed it by breaking at glue.updateTable(updateTableRequest.build()); and changing the table from athena. transaction.commitTransaction() } catch { case e: Throwable => // This retry will run successfully but the result will not contain the data file added during the rewrite. transaction.commitTransaction() } } private def writeFile(basePath: String, table: Table) = { val writer = Parquet.writeData( table.io().newOutputFile(basePath + UUID.randomUUID().toString + ".parquet")) .forTable(table) .overwrite(true) .createWriterFunc(GenericParquetWriter.buildWriter) .build[data.Record]() writer.write(Iterable(GenericRecord.create(table.schema()).copy("id", "1")).asJava) writer.close() val file = writer.toDataFile file } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org