We are using Spark structured streaming to make the join association between
two data streams. Use Kafka to collect data in the earliest way (the sender
sends data cyclically, sending only one data message at a time).

The following are our kafka configuration parameters:
<code>
def setKafkaConsumerParam(topic: String,
                            servers: String =
Config.data.parseString(Constants.KAFKA_BROKER_LIST)): Map[String, String] =
{
    Map[String, String](
      "kafka.bootstrap.servers" -> servers,
      "startingOffsets" -> "earliest",
//      "failOnDataLoss" -> "false",
      "subscribe" -> topic
    )
  }
</code>

The following is our kafka message read:
<code>
def onMessage(serverName: String, traceId: String, tableStruct:
TableStruct): TableDataFrame = {
    LOG.info("onMessage,DIB")
    val topic = "test"
     val message =
spark.readStream.format(Constants.KAFKA).options(ParamUtil.setKafkaConsumerParam(topic)).load()
// value like this: {"id": "1", "name": "DJ"}
val dataFrame = message.selectExpr("CAST(value AS STRING)", "CAST(timestamp
as Long)").as[(String, Long)]
  .select(from_json($"value",
SparkUtil.createSchema(tableStruct.structModels:_*)).as(tableStruct.tableName),
                $"timestamp".as(tableStruct.tableName + ":timestamp"))
    
    TableDataFrame(tableStruct.tableName, dataFrame)
  }
</code>

Join process:
<code>
joinDataFrame = joinDataFrame1.join(joinDataFrame2,
expr("customer.c_custkey") === expr("orders.o_orderkey"))
</code>

However, the result is wrong. The correct join result for 1500 and 15000
data should be 15000. But we only got 375 pieces of data.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to