Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16857#discussion_r101594339
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
    @@ -141,6 +143,104 @@ class KafkaSourceSuite extends KafkaSourceTest {
     
       private val topicId = new AtomicInteger(0)
     
    +  testWithUninterruptibleThread(
    +    "deserialization of initial offset with Spark 2.1.0") {
    +    withTempDir { metadataPath =>
    +      val topic = newTopic
    +      testUtils.createTopic(topic, partitions = 3)
    +
    +      val provider = new KafkaSourceProvider
    +      val parameters = Map(
    +        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
    +        "subscribe" -> topic
    +      )
    +      val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
    +        "", parameters)
    +      source.getOffset.get // Write initial offset
    +
    +      intercept[java.lang.IllegalArgumentException] {
    +        val in = new FileInputStream(metadataPath.getAbsolutePath + "/0")
    +        val length = in.read()
    +        val bytes = new Array[Byte](length)
    +        in.read(bytes)
    +        KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
    +      }
    +    }
    +  }
    +
    +  testWithUninterruptibleThread("deserialization of initial offset written 
by Spark 2.1.0") {
    +    withTempDir { metadataPath =>
    +      val topic = "kafka-initial-offset-2-1-0"
    +      testUtils.createTopic(topic, partitions = 3)
    +
    +      val provider = new KafkaSourceProvider
    +      val parameters = Map(
    +        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
    +        "subscribe" -> topic
    +      )
    +
    +      val from = Paths.get(
    +        
getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").getPath)
    +      val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
    +      Files.copy(from, to)
    +
    +      val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
    +        "", parameters)
    +      val deserializedOffset = source.getOffset.get
    +      val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 
0L), (topic, 2, 0L))
    +      assert(referenceOffset == deserializedOffset)
    +    }
    +  }
    +
    +  testWithUninterruptibleThread("deserialization of initial offset written 
by future version") {
    +    withTempDir { metadataPath =>
    +      val futureMetadataLog =
    +        new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession,
    +          metadataPath.getAbsolutePath) {
    +          override def serialize(metadata: KafkaSourceOffset, out: 
OutputStream): Unit = {
    +            out.write(0)
    +            val writer = new BufferedWriter(new OutputStreamWriter(out, 
UTF_8))
    +            writer.write(s"v0\n${metadata.json}")
    +            writer.flush
    +          }
    +        }
    +
    +      val topic = newTopic
    +      testUtils.createTopic(topic, partitions = 3)
    +      val offset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), 
(topic, 2, 0L))
    +      futureMetadataLog.add(0, offset)
    +
    +      val provider = new KafkaSourceProvider
    +      val parameters = Map(
    +        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
    +        "subscribe" -> topic
    +      )
    +      val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
    +        "", parameters)
    +
    +      intercept[java.lang.IllegalArgumentException] {
    --- End diff --
    
    nit: please also check the error message to make sure it's the expected 
exception, such as
    ```
    val e = intercept[java.lang.IllegalArgumentException] {
      source.getOffset.get // Read initial offset
    }
    assert(e.getMessage.contains("Please upgrade your Spark"))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to