Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20096#discussion_r160559942
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
 ---
    @@ -0,0 +1,348 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.util.Locale
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import org.apache.kafka.clients.producer.ProducerConfig
    +import org.apache.kafka.common.serialization.ByteArraySerializer
    +import org.scalatest.time.SpanSugar._
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
    +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SpecificInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.streaming._
    +import org.apache.spark.sql.types.{BinaryType, DataType}
    +import org.apache.spark.util.Utils
    +
    +/**
    + * This is a temporary port of KafkaSinkSuite, since we do not yet have a 
V2 memory stream.
    + * Once we have one, this will be changed to a specialization of 
KafkaSinkSuite and we won't have
    + * to duplicate all the code.
    + */
    +class KafkaContinuousSinkSuite extends KafkaContinuousTest {
    +  import testImplicits._
    +
    +  override val streamingTimeout = 30.seconds
    +
    +  override def beforeAll(): Unit = {
    +    super.beforeAll()
    +    testUtils = new KafkaTestUtils(
    +      withBrokerProps = Map("auto.create.topics.enable" -> "false"))
    +    testUtils.setup()
    +  }
    +
    +  override def afterAll(): Unit = {
    +    if (testUtils != null) {
    +      testUtils.teardown()
    +      testUtils = null
    +    }
    +    super.afterAll()
    +  }
    +
    +  test("streaming - write to kafka with topic field") {
    +    val inputTopic = newTopic()
    +    testUtils.createTopic(inputTopic, partitions = 1)
    +
    +    val input = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("subscribe", inputTopic)
    +      .option("startingOffsets", "earliest")
    +      .load()
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic)
    +
    +    val writer = createKafkaWriter(
    +      input.toDF(),
    +      withTopic = None,
    +      withOutputMode = Some(OutputMode.Append))(
    +      withSelectExpr = s"'$topic' as topic", "value")
    +
    +    val reader = createKafkaReader(topic)
    +      .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
    +      .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
    +      .as[(Int, Int)]
    +      .map(_._2)
    +
    +    try {
    +      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
    +      failAfter(streamingTimeout) {
    +        writer.processAllAvailable()
    +      }
    +      checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
    +      testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
    +      failAfter(streamingTimeout) {
    +        writer.processAllAvailable()
    +      }
    +      checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    +    } finally {
    +      writer.stop()
    +    }
    +  }
    +
    +  test("streaming - write data with bad schema") {
    --- End diff --
    
    missing tests for ."w/o topic field, with topic option" and "topic field 
and topic option". 
    and also test for the case when topic field is null.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to