[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16857


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-17 Thread vitillo
Github user vitillo commented on a diff in the pull request:

https://github.com/apache/spark/pull/16857#discussion_r101803033
  
--- Diff: 
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
 ---
@@ -0,0 +1 @@
+2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-17 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16857#discussion_r101764130
  
--- Diff: 
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
 ---
@@ -0,0 +1 @@
+2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
--- End diff --

here maybe end with a new line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-16 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16857#discussion_r101594339
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -141,6 +143,104 @@ class KafkaSourceSuite extends KafkaSourceTest {
 
   private val topicId = new AtomicInteger(0)
 
+  testWithUninterruptibleThread(
+"deserialization of initial offset with Spark 2.1.0") {
+withTempDir { metadataPath =>
+  val topic = newTopic
+  testUtils.createTopic(topic, partitions = 3)
+
+  val provider = new KafkaSourceProvider
+  val parameters = Map(
+"kafka.bootstrap.servers" -> testUtils.brokerAddress,
+"subscribe" -> topic
+  )
+  val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
+"", parameters)
+  source.getOffset.get // Write initial offset
+
+  intercept[java.lang.IllegalArgumentException] {
+val in = new FileInputStream(metadataPath.getAbsolutePath + "/0")
+val length = in.read()
+val bytes = new Array[Byte](length)
+in.read(bytes)
+KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
+  }
+}
+  }
+
+  testWithUninterruptibleThread("deserialization of initial offset written 
by Spark 2.1.0") {
+withTempDir { metadataPath =>
+  val topic = "kafka-initial-offset-2-1-0"
+  testUtils.createTopic(topic, partitions = 3)
+
+  val provider = new KafkaSourceProvider
+  val parameters = Map(
+"kafka.bootstrap.servers" -> testUtils.brokerAddress,
+"subscribe" -> topic
+  )
+
+  val from = Paths.get(
+
getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").getPath)
+  val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
+  Files.copy(from, to)
+
+  val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
+"", parameters)
+  val deserializedOffset = source.getOffset.get
+  val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 
0L), (topic, 2, 0L))
+  assert(referenceOffset == deserializedOffset)
+}
+  }
+
+  testWithUninterruptibleThread("deserialization of initial offset written 
by future version") {
+withTempDir { metadataPath =>
+  val futureMetadataLog =
+new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession,
+  metadataPath.getAbsolutePath) {
+  override def serialize(metadata: KafkaSourceOffset, out: 
OutputStream): Unit = {
+out.write(0)
+val writer = new BufferedWriter(new OutputStreamWriter(out, 
UTF_8))
+writer.write(s"v0\n${metadata.json}")
+writer.flush
+  }
+}
+
+  val topic = newTopic
+  testUtils.createTopic(topic, partitions = 3)
+  val offset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), 
(topic, 2, 0L))
+  futureMetadataLog.add(0, offset)
+
+  val provider = new KafkaSourceProvider
+  val parameters = Map(
+"kafka.bootstrap.servers" -> testUtils.brokerAddress,
+"subscribe" -> topic
+  )
+  val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
+"", parameters)
+
+  intercept[java.lang.IllegalArgumentException] {
--- End diff --

nit: please also check the error message to make sure it's the expected 
exception, such as
```
val e = intercept[java.lang.IllegalArgumentException] {
  source.getOffset.get // Read initial offset
}
assert(e.getMessage.contains("Please upgrade your Spark"))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-16 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16857#discussion_r101594563
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -97,16 +100,29 @@ private[kafka010] class KafkaSource(
 val metadataLog =
   new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, 
metadataPath) {
 override def serialize(metadata: KafkaSourceOffset, out: 
OutputStream): Unit = {
-  val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
-  out.write(bytes.length)
-  out.write(bytes)
+  out.write(0) // A zero byte is written to support Spark 2.1.0 
(SPARK-19517)
+  val writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))
+  writer.write(s"$VERSION\n${metadata.json}")
--- End diff --

nit: I prefer to use multiple `write`s to avoid creating the temp string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-16 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16857#discussion_r101593021
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -88,6 +89,8 @@ private[kafka010] class KafkaSource(
   private val maxOffsetsPerTrigger =
 sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
 
+  private val VERSION = "v1"
--- End diff --

nit: please move this to `object KafkaSource`. It should be `v1\n` if you 
are using it in `startsWith`. Otherwise, you cannot distinguish `v1` and `v11`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-16 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16857#discussion_r101594004
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -97,16 +100,29 @@ private[kafka010] class KafkaSource(
 val metadataLog =
   new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, 
metadataPath) {
 override def serialize(metadata: KafkaSourceOffset, out: 
OutputStream): Unit = {
-  val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
-  out.write(bytes.length)
-  out.write(bytes)
+  out.write(0) // A zero byte is written to support Spark 2.1.0 
(SPARK-19517)
+  val writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))
+  writer.write(s"$VERSION\n${metadata.json}")
+  writer.flush
 }
 
 override def deserialize(in: InputStream): KafkaSourceOffset = {
-  val length = in.read()
-  val bytes = new Array[Byte](length)
-  in.read(bytes)
-  KafkaSourceOffset(SerializedOffset(new String(bytes, 
StandardCharsets.UTF_8)))
+  in.read() // A zero byte is read to support Spark 2.1.0 
(SPARK-19517)
+  val content = IOUtils.toString(new InputStreamReader(in, 
StandardCharsets.UTF_8))
+  // HDFSMetadataLog guarantees that it never creates a partial 
file.
+  assert(content.length != 0)
+  if (content(0) == 'v') {
+if (content.startsWith(VERSION)) {
+  
KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length + 1)))
+} else {
+  val versionInFile = content.substring(0, 
content.indexOf("\n"))
+  throw new IllegalArgumentException(
--- End diff --

nit: use `IllegalStateException` to make it consistent with 
`CompactibleFileStreamLog`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16857#discussion_r101353274
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -97,16 +97,27 @@ private[kafka010] class KafkaSource(
 val metadataLog =
   new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, 
metadataPath) {
 override def serialize(metadata: KafkaSourceOffset, out: 
OutputStream): Unit = {
-  val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
-  out.write(bytes.length)
-  out.write(bytes)
+  out.write(0) // A zero byte is written to support Spark 2.1.0 
(SPARK-19517)
+  val writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))
+  writer.write("v1\n")
+  writer.write(metadata.json)
+  writer.flush
 }
 
 override def deserialize(in: InputStream): KafkaSourceOffset = {
-  val length = in.read()
-  val bytes = new Array[Byte](length)
-  in.read(bytes)
-  KafkaSourceOffset(SerializedOffset(new String(bytes, 
StandardCharsets.UTF_8)))
+  in.read()
--- End diff --

nit: please also document this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16857#discussion_r101360629
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -141,6 +142,118 @@ class KafkaSourceSuite extends KafkaSourceTest {
 
   private val topicId = new AtomicInteger(0)
 
+  private def createSpark210MetadataLog(metadataPath: String): 
HDFSMetadataLog[KafkaSourceOffset] =
+  {
+new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, 
metadataPath) {
+  override def serialize(metadata: KafkaSourceOffset, out: 
OutputStream): Unit = {
+val bytes = metadata.json.getBytes(UTF_8)
+out.write(bytes.length)
+out.write(bytes)
+  }
+
+  override def deserialize(in: InputStream): KafkaSourceOffset = {
+val length = in.read()
+val bytes = new Array[Byte](length)
+in.read(bytes)
+KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
+  }
+}
+  }
+
+  testWithUninterruptibleThread(
+"deserialization of initial offsets compliant to new spec with Spark 
2.1.0") {
+withTempDir { metadataPath =>
+  val topic = newTopic
+  testUtils.createTopic(topic, partitions = 3)
+
+  // Write metadata using new spec
+  val provider = new KafkaSourceProvider
+  val parameters = Map(
+"kafka.bootstrap.servers" -> testUtils.brokerAddress,
+"subscribe" -> topic
+  )
+  val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
+"", parameters)
+  source.getOffset.get // Initialize partition offsets
+  val spark210MetadataLog = 
createSpark210MetadataLog(metadataPath.getAbsolutePath)
+
+  intercept[java.lang.IllegalArgumentException] {
+spark210MetadataLog.get(0)
+  }
+}
+  }
+
+  testWithUninterruptibleThread("deserialization of initial offsets 
written by Spark 2.1.0") {
--- End diff --

Could you use Spark 2.1.0 to generate a real file and put it in the 
`external/kafka-0-10-sql/src/test/resources/` folder (there is already a file 
called kafka-source-offset-version-2.1.0.txt in it)? Using a real file to write 
the test so that if we change the APIs of HDFSMetadataLog and KafkaSourceOffset 
in future, we can still generate the correct file format.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16857#discussion_r101359070
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -97,16 +97,27 @@ private[kafka010] class KafkaSource(
 val metadataLog =
   new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, 
metadataPath) {
 override def serialize(metadata: KafkaSourceOffset, out: 
OutputStream): Unit = {
-  val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
-  out.write(bytes.length)
-  out.write(bytes)
+  out.write(0) // A zero byte is written to support Spark 2.1.0 
(SPARK-19517)
+  val writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))
+  writer.write("v1\n")
--- End diff --

nit: define a constant `VERSION = "v1\n"` in `object KafkaSource`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16857#discussion_r101362919
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -141,6 +142,118 @@ class KafkaSourceSuite extends KafkaSourceTest {
 
   private val topicId = new AtomicInteger(0)
 
+  private def createSpark210MetadataLog(metadataPath: String): 
HDFSMetadataLog[KafkaSourceOffset] =
+  {
+new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, 
metadataPath) {
+  override def serialize(metadata: KafkaSourceOffset, out: 
OutputStream): Unit = {
+val bytes = metadata.json.getBytes(UTF_8)
+out.write(bytes.length)
+out.write(bytes)
+  }
+
+  override def deserialize(in: InputStream): KafkaSourceOffset = {
+val length = in.read()
+val bytes = new Array[Byte](length)
+in.read(bytes)
+KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
+  }
+}
+  }
+
+  testWithUninterruptibleThread(
+"deserialization of initial offsets compliant to new spec with Spark 
2.1.0") {
+withTempDir { metadataPath =>
+  val topic = newTopic
+  testUtils.createTopic(topic, partitions = 3)
+
+  // Write metadata using new spec
+  val provider = new KafkaSourceProvider
+  val parameters = Map(
+"kafka.bootstrap.servers" -> testUtils.brokerAddress,
+"subscribe" -> topic
+  )
+  val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
+"", parameters)
+  source.getOffset.get // Initialize partition offsets
+  val spark210MetadataLog = 
createSpark210MetadataLog(metadataPath.getAbsolutePath)
+
+  intercept[java.lang.IllegalArgumentException] {
+spark210MetadataLog.get(0)
--- End diff --

How about simulating the behavior of Spark 2.1.0 instead of using 
HDFSMetadataLog?
```Scala
  intercept[java.lang.IllegalArgumentException] {
val in = new FileInputStream(metadataPath.getAbsolutePath + "/0")
val length = in.read()
val bytes = new Array[Byte](length)
in.read(bytes)
KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
  }
```





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16857#discussion_r101359300
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -97,16 +97,27 @@ private[kafka010] class KafkaSource(
 val metadataLog =
   new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, 
metadataPath) {
 override def serialize(metadata: KafkaSourceOffset, out: 
OutputStream): Unit = {
-  val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
-  out.write(bytes.length)
-  out.write(bytes)
+  out.write(0) // A zero byte is written to support Spark 2.1.0 
(SPARK-19517)
+  val writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))
+  writer.write("v1\n")
+  writer.write(metadata.json)
+  writer.flush
 }
 
 override def deserialize(in: InputStream): KafkaSourceOffset = {
-  val length = in.read()
-  val bytes = new Array[Byte](length)
-  in.read(bytes)
-  KafkaSourceOffset(SerializedOffset(new String(bytes, 
StandardCharsets.UTF_8)))
+  in.read()
+  val reader = new BufferedReader(new InputStreamReader(in, 
StandardCharsets.UTF_8))
+  val version = reader.readLine()
--- End diff --

I prefer to not assume the json is only one line. How about:
```Scala
override def deserialize(in: InputStream): KafkaSourceOffset = {
  in.read()
  val content = IOUtils.toString(new InputStreamReader(in, 
StandardCharsets.UTF_8))
  // HDFSMetadataLog guarantees that it never creates a partial 
file.
  assert(content.length != 0)
  if (content(0) == 'v') {
if (content.startsWith(VERSION)) {
  
KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length)))
} else {
  val versionInFile = content.substring(0, 
content.indexOf("\n"))
  throw new IllegalArgumentException(
s"Unsupported format. Expected version is 
${VERSION.stripLineEnd} " +
  s"but was $versionInFile. Please upgrade your Spark.")
}
  } else {
KafkaSourceOffset(SerializedOffset(content))
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16857: [SPARK-19517][SS] KafkaSource fails to initialize...

2017-02-08 Thread vitillo
GitHub user vitillo opened a pull request:

https://github.com/apache/spark/pull/16857

[SPARK-19517][SS] KafkaSource fails to initialize partition offsets

## What changes were proposed in this pull request?

This patch fixes a bug in `KafkaSource` with the (de)serialization of the 
length of the JSON string that contains the initial partition offsets.

## How was this patch tested?

I ran the test suite for spark-sql-kafka-0-10.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vitillo/spark kafka_source_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16857.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16857


commit b2523b920de2329878a37f7efc1e9dda5d969b79
Author: Roberto Agostino Vitillo 
Date:   2017-02-08T15:07:40Z

Fix (de)serialization of initial partition offsets.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org