[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16857 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user vitillo commented on a diff in the pull request: https://github.com/apache/spark/pull/16857#discussion_r101803033 --- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin --- @@ -0,0 +1 @@ +2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}} --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/16857#discussion_r101764130 --- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin --- @@ -0,0 +1 @@ +2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}} --- End diff -- here maybe end with a new line? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16857#discussion_r101594339 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -141,6 +143,104 @@ class KafkaSourceSuite extends KafkaSourceTest { private val topicId = new AtomicInteger(0) + testWithUninterruptibleThread( +"deserialization of initial offset with Spark 2.1.0") { +withTempDir { metadataPath => + val topic = newTopic + testUtils.createTopic(topic, partitions = 3) + + val provider = new KafkaSourceProvider + val parameters = Map( +"kafka.bootstrap.servers" -> testUtils.brokerAddress, +"subscribe" -> topic + ) + val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None, +"", parameters) + source.getOffset.get // Write initial offset + + intercept[java.lang.IllegalArgumentException] { +val in = new FileInputStream(metadataPath.getAbsolutePath + "/0") +val length = in.read() +val bytes = new Array[Byte](length) +in.read(bytes) +KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8))) + } +} + } + + testWithUninterruptibleThread("deserialization of initial offset written by Spark 2.1.0") { +withTempDir { metadataPath => + val topic = "kafka-initial-offset-2-1-0" + testUtils.createTopic(topic, partitions = 3) + + val provider = new KafkaSourceProvider + val parameters = Map( +"kafka.bootstrap.servers" -> testUtils.brokerAddress, +"subscribe" -> topic + ) + + val from = Paths.get( + getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").getPath) + val to = Paths.get(s"${metadataPath.getAbsolutePath}/0") + Files.copy(from, to) + + val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None, +"", parameters) + val deserializedOffset = source.getOffset.get + val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L)) + assert(referenceOffset == deserializedOffset) +} + } + + testWithUninterruptibleThread("deserialization of initial offset written by future version") { +withTempDir { metadataPath => + val futureMetadataLog = +new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, + metadataPath.getAbsolutePath) { + override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { +out.write(0) +val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8)) +writer.write(s"v0\n${metadata.json}") +writer.flush + } +} + + val topic = newTopic + testUtils.createTopic(topic, partitions = 3) + val offset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L)) + futureMetadataLog.add(0, offset) + + val provider = new KafkaSourceProvider + val parameters = Map( +"kafka.bootstrap.servers" -> testUtils.brokerAddress, +"subscribe" -> topic + ) + val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None, +"", parameters) + + intercept[java.lang.IllegalArgumentException] { --- End diff -- nit: please also check the error message to make sure it's the expected exception, such as ``` val e = intercept[java.lang.IllegalArgumentException] { source.getOffset.get // Read initial offset } assert(e.getMessage.contains("Please upgrade your Spark")) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16857#discussion_r101594563 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -97,16 +100,29 @@ private[kafka010] class KafkaSource( val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) { override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { - val bytes = metadata.json.getBytes(StandardCharsets.UTF_8) - out.write(bytes.length) - out.write(bytes) + out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) + val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) + writer.write(s"$VERSION\n${metadata.json}") --- End diff -- nit: I prefer to use multiple `write`s to avoid creating the temp string. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16857#discussion_r101593021 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -88,6 +89,8 @@ private[kafka010] class KafkaSource( private val maxOffsetsPerTrigger = sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + private val VERSION = "v1" --- End diff -- nit: please move this to `object KafkaSource`. It should be `v1\n` if you are using it in `startsWith`. Otherwise, you cannot distinguish `v1` and `v11`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16857#discussion_r101594004 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -97,16 +100,29 @@ private[kafka010] class KafkaSource( val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) { override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { - val bytes = metadata.json.getBytes(StandardCharsets.UTF_8) - out.write(bytes.length) - out.write(bytes) + out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) + val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) + writer.write(s"$VERSION\n${metadata.json}") + writer.flush } override def deserialize(in: InputStream): KafkaSourceOffset = { - val length = in.read() - val bytes = new Array[Byte](length) - in.read(bytes) - KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8))) + in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517) + val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8)) + // HDFSMetadataLog guarantees that it never creates a partial file. + assert(content.length != 0) + if (content(0) == 'v') { +if (content.startsWith(VERSION)) { + KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length + 1))) +} else { + val versionInFile = content.substring(0, content.indexOf("\n")) + throw new IllegalArgumentException( --- End diff -- nit: use `IllegalStateException` to make it consistent with `CompactibleFileStreamLog`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16857#discussion_r101353274 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -97,16 +97,27 @@ private[kafka010] class KafkaSource( val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) { override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { - val bytes = metadata.json.getBytes(StandardCharsets.UTF_8) - out.write(bytes.length) - out.write(bytes) + out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) + val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) + writer.write("v1\n") + writer.write(metadata.json) + writer.flush } override def deserialize(in: InputStream): KafkaSourceOffset = { - val length = in.read() - val bytes = new Array[Byte](length) - in.read(bytes) - KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8))) + in.read() --- End diff -- nit: please also document this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16857#discussion_r101360629 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -141,6 +142,118 @@ class KafkaSourceSuite extends KafkaSourceTest { private val topicId = new AtomicInteger(0) + private def createSpark210MetadataLog(metadataPath: String): HDFSMetadataLog[KafkaSourceOffset] = + { +new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) { + override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { +val bytes = metadata.json.getBytes(UTF_8) +out.write(bytes.length) +out.write(bytes) + } + + override def deserialize(in: InputStream): KafkaSourceOffset = { +val length = in.read() +val bytes = new Array[Byte](length) +in.read(bytes) +KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8))) + } +} + } + + testWithUninterruptibleThread( +"deserialization of initial offsets compliant to new spec with Spark 2.1.0") { +withTempDir { metadataPath => + val topic = newTopic + testUtils.createTopic(topic, partitions = 3) + + // Write metadata using new spec + val provider = new KafkaSourceProvider + val parameters = Map( +"kafka.bootstrap.servers" -> testUtils.brokerAddress, +"subscribe" -> topic + ) + val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None, +"", parameters) + source.getOffset.get // Initialize partition offsets + val spark210MetadataLog = createSpark210MetadataLog(metadataPath.getAbsolutePath) + + intercept[java.lang.IllegalArgumentException] { +spark210MetadataLog.get(0) + } +} + } + + testWithUninterruptibleThread("deserialization of initial offsets written by Spark 2.1.0") { --- End diff -- Could you use Spark 2.1.0 to generate a real file and put it in the `external/kafka-0-10-sql/src/test/resources/` folder (there is already a file called kafka-source-offset-version-2.1.0.txt in it)? Using a real file to write the test so that if we change the APIs of HDFSMetadataLog and KafkaSourceOffset in future, we can still generate the correct file format. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16857#discussion_r101359070 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -97,16 +97,27 @@ private[kafka010] class KafkaSource( val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) { override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { - val bytes = metadata.json.getBytes(StandardCharsets.UTF_8) - out.write(bytes.length) - out.write(bytes) + out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) + val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) + writer.write("v1\n") --- End diff -- nit: define a constant `VERSION = "v1\n"` in `object KafkaSource` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16857#discussion_r101362919 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -141,6 +142,118 @@ class KafkaSourceSuite extends KafkaSourceTest { private val topicId = new AtomicInteger(0) + private def createSpark210MetadataLog(metadataPath: String): HDFSMetadataLog[KafkaSourceOffset] = + { +new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) { + override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { +val bytes = metadata.json.getBytes(UTF_8) +out.write(bytes.length) +out.write(bytes) + } + + override def deserialize(in: InputStream): KafkaSourceOffset = { +val length = in.read() +val bytes = new Array[Byte](length) +in.read(bytes) +KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8))) + } +} + } + + testWithUninterruptibleThread( +"deserialization of initial offsets compliant to new spec with Spark 2.1.0") { +withTempDir { metadataPath => + val topic = newTopic + testUtils.createTopic(topic, partitions = 3) + + // Write metadata using new spec + val provider = new KafkaSourceProvider + val parameters = Map( +"kafka.bootstrap.servers" -> testUtils.brokerAddress, +"subscribe" -> topic + ) + val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None, +"", parameters) + source.getOffset.get // Initialize partition offsets + val spark210MetadataLog = createSpark210MetadataLog(metadataPath.getAbsolutePath) + + intercept[java.lang.IllegalArgumentException] { +spark210MetadataLog.get(0) --- End diff -- How about simulating the behavior of Spark 2.1.0 instead of using HDFSMetadataLog? ```Scala intercept[java.lang.IllegalArgumentException] { val in = new FileInputStream(metadataPath.getAbsolutePath + "/0") val length = in.read() val bytes = new Array[Byte](length) in.read(bytes) KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8))) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/16857#discussion_r101359300 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -97,16 +97,27 @@ private[kafka010] class KafkaSource( val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) { override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { - val bytes = metadata.json.getBytes(StandardCharsets.UTF_8) - out.write(bytes.length) - out.write(bytes) + out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) + val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) + writer.write("v1\n") + writer.write(metadata.json) + writer.flush } override def deserialize(in: InputStream): KafkaSourceOffset = { - val length = in.read() - val bytes = new Array[Byte](length) - in.read(bytes) - KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8))) + in.read() + val reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)) + val version = reader.readLine() --- End diff -- I prefer to not assume the json is only one line. How about: ```Scala override def deserialize(in: InputStream): KafkaSourceOffset = { in.read() val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8)) // HDFSMetadataLog guarantees that it never creates a partial file. assert(content.length != 0) if (content(0) == 'v') { if (content.startsWith(VERSION)) { KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length))) } else { val versionInFile = content.substring(0, content.indexOf("\n")) throw new IllegalArgumentException( s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " + s"but was $versionInFile. Please upgrade your Spark.") } } else { KafkaSourceOffset(SerializedOffset(content)) } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...
GitHub user vitillo opened a pull request: https://github.com/apache/spark/pull/16857 [SPARK-19517][SS] KafkaSource fails to initialize partition offsets ## What changes were proposed in this pull request? This patch fixes a bug in `KafkaSource` with the (de)serialization of the length of the JSON string that contains the initial partition offsets. ## How was this patch tested? I ran the test suite for spark-sql-kafka-0-10. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vitillo/spark kafka_source_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16857.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16857 commit b2523b920de2329878a37f7efc1e9dda5d969b79 Author: Roberto Agostino VitilloDate: 2017-02-08T15:07:40Z Fix (de)serialization of initial partition offsets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org