This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new e7c1700 [SPARK-35602][SS] Update state schema to be able to accept long length JSON e7c1700 is described below commit e7c1700368b2cba4df50d35144a4f13879956ca8 Author: Kousuke Saruta <saru...@oss.nttdata.com> AuthorDate: Wed Jun 9 10:09:57 2021 +0900 [SPARK-35602][SS] Update state schema to be able to accept long length JSON ### What changes were proposed in this pull request? This PR fixes an issue that both key and value of state schema cannot accept long length (>65535 bytes) JSON. To solve the problem explained below, JSON represented schema is divided into chunks whose maximum length is 65535 bytes, and each chunk is written by `DataOutputStream.writeUTF`. As the solution changes the format of the schema, the version is also changes from `1` to `2` but old version schema is still acceptable to ensures backward compatibility. ### Why are the changes needed? In the current implementation, writing state schema fails if the length of schema exceeds 65535 bytes and `UTFDataFormatException` is thrown. It's due to the limitation of `DataOutputStream.writeUTF`. `writeUTF` writes a length field first and it's 2 bytes width, meaning the maximum content length is limited to `2^16-1`=`65535` bytes. https://docs.oracle.com/javase/8/docs/api/java/io/DataOutputStream.html#writeUTF-java.lang.String- ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. Closes #32788 from sarutak/fix-UTFDataFormatException. Authored-by: Kousuke Saruta <saru...@oss.nttdata.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 93a9dc479c098ef0989d64f38c2157f20ec4f32d) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../execution/streaming/state/SchemaHelper.scala | 147 +++++++++++++++++++++ .../state/StateSchemaCompatibilityChecker.scala | 34 ++--- .../StateSchemaCompatibilityCheckerSuite.scala | 35 +++++ 3 files changed, 200 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala new file mode 100644 index 0000000..2eef3d9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.StringReader + +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream} + +import org.apache.spark.sql.execution.streaming.MetadataVersionUtil +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +/** + * Helper classes for reading/writing state schema. + */ +object SchemaHelper { + + sealed trait SchemaReader { + def read(inputStream: FSDataInputStream): (StructType, StructType) + } + + object SchemaReader { + def createSchemaReader(versionStr: String): SchemaReader = { + val version = MetadataVersionUtil.validateVersion(versionStr, + StateSchemaCompatibilityChecker.VERSION) + version match { + case 1 => new SchemaV1Reader + case 2 => new SchemaV2Reader + } + } + } + + class SchemaV1Reader extends SchemaReader { + def read(inputStream: FSDataInputStream): (StructType, StructType) = { + val keySchemaStr = inputStream.readUTF() + val valueSchemaStr = inputStream.readUTF() + (StructType.fromString(keySchemaStr), StructType.fromString(valueSchemaStr)) + } + } + + class SchemaV2Reader extends SchemaReader { + def read(inputStream: FSDataInputStream): (StructType, StructType) = { + val buf = new StringBuilder + val numKeyChunks = inputStream.readInt() + (0 until numKeyChunks).foreach(_ => buf.append(inputStream.readUTF())) + val keySchemaStr = buf.toString() + + buf.clear() + val numValueChunks = inputStream.readInt() + (0 until numValueChunks).foreach(_ => buf.append(inputStream.readUTF())) + val valueSchemaStr = buf.toString() + (StructType.fromString(keySchemaStr), StructType.fromString(valueSchemaStr)) + } + } + + trait SchemaWriter { + val version: Int + + final def write( + keySchema: StructType, + valueSchema: StructType, + outputStream: FSDataOutputStream): Unit = { + writeVersion(outputStream) + writeSchema(keySchema, valueSchema, outputStream) + } + + private def writeVersion(outputStream: FSDataOutputStream): Unit = { + outputStream.writeUTF(s"v${version}") + } + + protected def writeSchema( + keySchema: StructType, + valueSchema: StructType, + outputStream: FSDataOutputStream): Unit + } + + object SchemaWriter { + def createSchemaWriter(version: Int): SchemaWriter = { + version match { + case 1 if Utils.isTesting => new SchemaV1Writer + case 2 => new SchemaV2Writer + } + } + } + + class SchemaV1Writer extends SchemaWriter { + val version: Int = 1 + + def writeSchema( + keySchema: StructType, + valueSchema: StructType, + outputStream: FSDataOutputStream): Unit = { + outputStream.writeUTF(keySchema.json) + outputStream.writeUTF(valueSchema.json) + } + } + + class SchemaV2Writer extends SchemaWriter { + val version: Int = 2 + + // 2^16 - 1 bytes + final val MAX_UTF_CHUNK_SIZE = 65535 + + def writeSchema( + keySchema: StructType, + valueSchema: StructType, + outputStream: FSDataOutputStream): Unit = { + val buf = new Array[Char](MAX_UTF_CHUNK_SIZE) + + // DataOutputStream.writeUTF can't write a string at once + // if the size exceeds 65535 (2^16 - 1) bytes. + // So a key as well as a value consist of multiple chunks in schema version 2. + val keySchemaJson = keySchema.json + val numKeyChunks = (keySchemaJson.length - 1) / MAX_UTF_CHUNK_SIZE + 1 + val keyStringReader = new StringReader(keySchemaJson) + outputStream.writeInt(numKeyChunks) + (0 until numKeyChunks).foreach { _ => + val numRead = keyStringReader.read(buf, 0, MAX_UTF_CHUNK_SIZE) + outputStream.writeUTF(new String(buf, 0, numRead)) + } + + val valueSchemaJson = valueSchema.json + val numValueChunks = (valueSchemaJson.length - 1) / MAX_UTF_CHUNK_SIZE + 1 + val valueStringReader = new StringReader(valueSchemaJson) + outputStream.writeInt(numValueChunks) + (0 until numValueChunks).foreach { _ => + val numRead = valueStringReader.read(buf, 0, MAX_UTF_CHUNK_SIZE) + outputStream.writeUTF(new String(buf, 0, numRead)) + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala index 4ac12c0..20625e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala @@ -21,7 +21,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MetadataVersionUtil} +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader, SchemaWriter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} @@ -34,6 +35,8 @@ class StateSchemaCompatibilityChecker( private val storeCpLocation = providerId.storeId.storeCheckpointLocation() private val fm = CheckpointFileManager.create(storeCpLocation, hadoopConf) private val schemaFileLocation = schemaFile(storeCpLocation) + private val schemaWriter = + SchemaWriter.createSchemaWriter(StateSchemaCompatibilityChecker.VERSION) fm.mkdirs(schemaFileLocation.getParent) @@ -71,20 +74,13 @@ class StateSchemaCompatibilityChecker( private def schemasCompatible(storedSchema: StructType, schema: StructType): Boolean = DataType.equalsIgnoreNameAndCompatibleNullability(storedSchema, schema) - private def readSchemaFile(): (StructType, StructType) = { + // Visible for testing + private[sql] def readSchemaFile(): (StructType, StructType) = { val inStream = fm.open(schemaFileLocation) try { val versionStr = inStream.readUTF() - // Currently we only support version 1, which we can simplify the version validation and - // the parse logic. - val version = MetadataVersionUtil.validateVersion(versionStr, - StateSchemaCompatibilityChecker.VERSION) - require(version == 1) - - val keySchemaStr = inStream.readUTF() - val valueSchemaStr = inStream.readUTF() - - (StructType.fromString(keySchemaStr), StructType.fromString(valueSchemaStr)) + val schemaReader = SchemaReader.createSchemaReader(versionStr) + schemaReader.read(inStream) } catch { case e: Throwable => logError(s"Fail to read schema file from $schemaFileLocation", e) @@ -95,11 +91,17 @@ class StateSchemaCompatibilityChecker( } private def createSchemaFile(keySchema: StructType, valueSchema: StructType): Unit = { + createSchemaFile(keySchema, valueSchema, schemaWriter) + } + + // Visible for testing + private[sql] def createSchemaFile( + keySchema: StructType, + valueSchema: StructType, + schemaWriter: SchemaWriter): Unit = { val outStream = fm.createAtomic(schemaFileLocation, overwriteIfPossible = false) try { - outStream.writeUTF(s"v${StateSchemaCompatibilityChecker.VERSION}") - outStream.writeUTF(keySchema.json) - outStream.writeUTF(valueSchema.json) + schemaWriter.write(keySchema, valueSchema, outStream) outStream.close() } catch { case e: Throwable => @@ -114,5 +116,5 @@ class StateSchemaCompatibilityChecker( } object StateSchemaCompatibilityChecker { - val VERSION = 1 + val VERSION = 2 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala index 4eb7603b..a9cc90c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala @@ -47,6 +47,22 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { .add(StructField("value2", StringType, nullable = true)) .add(StructField("value3", structSchema, nullable = true)) + private val longKeySchema = new StructType() + .add(StructField("key" + "1" * 64 * 1024, IntegerType, nullable = true)) + .add(StructField("key" + "2" * 64 * 1024, StringType, nullable = true)) + .add(StructField("key" + "3" * 64 * 1024, structSchema, nullable = true)) + + private val longValueSchema = new StructType() + .add(StructField("value" + "1" * 64 * 1024, IntegerType, nullable = true)) + .add(StructField("value" + "2" * 64 * 1024, StringType, nullable = true)) + .add(StructField("value" + "3" * 64 * 1024, structSchema, nullable = true)) + + private val keySchema65535Bytes = new StructType() + .add(StructField("k" * (65535 - 87), IntegerType, nullable = true)) + + private val valueSchema65535Bytes = new StructType() + .add(StructField("v" * (65535 - 87), IntegerType, nullable = true)) + test("adding field to key should fail") { val fieldAddedKeySchema = keySchema.add(StructField("newKey", IntegerType)) verifyException(keySchema, valueSchema, fieldAddedKeySchema, valueSchema) @@ -161,6 +177,25 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { verifySuccess(keySchema, valueSchema, keySchema, fieldNameChangedValueSchema) } + test("SPARK-35602: checking for long length schema") { + verifySuccess(longKeySchema, longValueSchema, longKeySchema, longValueSchema) + verifySuccess( + keySchema65535Bytes, valueSchema65535Bytes, keySchema65535Bytes, valueSchema65535Bytes) + } + + test("SPARK-35602: checking for compatibility with schema version 1") { + val dir = newDir() + val queryId = UUID.randomUUID() + val providerId = StateStoreProviderId( + StateStoreId(dir, opId, partitionId), queryId) + val checker = new StateSchemaCompatibilityChecker(providerId, hadoopConf) + checker.createSchemaFile(keySchema, valueSchema, + SchemaHelper.SchemaWriter.createSchemaWriter(1)) + val (resultKeySchema, resultValueSchema) = checker.readSchemaFile() + + assert((resultKeySchema, resultValueSchema) === (keySchema, valueSchema)) + } + private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): StructType = { applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org