[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20689 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20689#discussion_r174667490 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -164,7 +164,15 @@ case class KafkaContinuousDataReaderFactory( startOffset: Long, kafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, -failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] { +failOnDataLoss: Boolean) extends ContinuousDataReaderFactory[UnsafeRow] { + + override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = { +val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset] +assert(kafkaOffset.topicPartition == topicPartition) --- End diff -- Got it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20689#discussion_r174585423 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -164,7 +164,15 @@ case class KafkaContinuousDataReaderFactory( startOffset: Long, kafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, -failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] { +failOnDataLoss: Boolean) extends ContinuousDataReaderFactory[UnsafeRow] { + + override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = { +val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset] +assert(kafkaOffset.topicPartition == topicPartition) --- End diff -- This may happen. I prefer to use `require` like this: ``` require(kafkaOffset.topicPartition == topicPartition, s"expected: $topicPartition actual: ${kafkaOffset.topicPartition}") ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20689#discussion_r174585821 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -106,7 +106,19 @@ case class RateStreamContinuousDataReaderFactory( partitionIndex: Int, increment: Long, rowsPerSecond: Double) - extends DataReaderFactory[Row] { + extends ContinuousDataReaderFactory[Row] { + + override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[Row] = { +val rateStreamOffset = offset.asInstanceOf[RateStreamPartitionOffset] +assert(rateStreamOffset.partition == partitionIndex) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20689#discussion_r174585843 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; + +/** + * A mix-in interface for {@link DataReaderFactory}. Continuous data reader factories can + * implement this interface to provide creating {@link DataReader} with particular offset. + */ +@InterfaceStability.Evolving +public interface ContinuousDataReaderFactory extends DataReaderFactory { + /** + * Create a DataReader with particular offset as its startOffset. + * + * @param offset offset want to set as the DataReader's startOffset. + */ + default DataReader createDataReaderWithOffset(PartitionOffset offset) { +throw new IllegalStateException( --- End diff -- +1 to make this one just abstract. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20689#discussion_r171332700 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; + +/** + * A mix-in interface for {@link DataReaderFactory}. Continuous data reader factories can + * implement this interface to provide creating {@link DataReader} with particular offset. + */ +@InterfaceStability.Evolving +public interface ContinuousDataReaderFactory extends DataReaderFactory { + /** + * Create a DataReader with particular offset as its startOffset. + * + * @param offset offset want to set as the DataReader's startOffset. + */ + default DataReader createDataReaderWithOffset(PartitionOffset offset) { +throw new IllegalStateException( --- End diff -- I don't know if we want a default here - it seems like subclasses should always be able to provide an implementation, and thus that we should always require them to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20689: [SPARK-23533][SS] Add support for changing Contin...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/20689 [SPARK-23533][SS] Add support for changing ContinuousDataReader's startOffset ## What changes were proposed in this pull request? As discussion in #20675, we need add a new interface `ContinuousDataReaderFactory` to support the requirements of setting start offset in Continuous Processing. ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-23533 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20689.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20689 commit 59cef98868586a4f039b04e74c32c94eaff965c0 Author: Yuanjian LiDate: 2018-02-28T07:29:57Z [SPARK-23533][SS] Add support for changing ContinousDataReader's startOffset --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org