The problem is solved.
The actual schema of Kafka message is different from documentation.
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
The documentation says the format of "timestamp" column is Long type,
but the actual format is timestamp.
The followings are my code and result to check schema.
-code
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.printSchema()
-result
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
Regards,
Yuta
On 2018/05/09 16:14, Yuta Morisawa wrote:
Hi All
I'm trying to extract Kafka-timestamp from Kafka topics.
The timestamp does not contain milli-seconds information,
but it should contain because ConsumerRecord class of Kafka 0.10
supports milli-second timestamp.
How can I get milli-second timestamp from Kafka topics?
These are websites I refer to.
https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html
And this is my code.
----
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
.selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
.as[(Long, String)]
----
Regards,
Yuta
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org