The problem is solved.
The actual schema of Kafka message is different from documentation.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

The documentation says the format of "timestamp" column is Long type, but the actual format is timestamp.


The followings are my code and result to check schema.

-code
val df = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .printSchema()

-result
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)


Regards,
Yuta

On 2018/05/09 16:14, Yuta Morisawa wrote:
Hi All

I'm trying to extract Kafka-timestamp from Kafka topics.

The timestamp does not contain milli-seconds information,
but it should contain because ConsumerRecord class of Kafka 0.10 supports milli-second timestamp.

How can I get milli-second timestamp from Kafka topics?


These are websites I refer to.

https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html

https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html


And this is my code.
----
val df = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1,topic2")
   .load()
   .selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
   .as[(Long, String)]
----

Regards,
Yuta


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to