Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory
Yea definitely not. The only requirement is, the DataReader/WriterFactory must support at least one DataFormat. > how are we going to express capability of the given reader of its supported format(s), or specific support for each of “real-time data in row format, and history data in columnar format”? When DataSourceReader/Writer create factories, the factory must contain enough information to decide the data format. Let's take ORC as an example. In OrcReaderFactory, it knows which files to read, and which columns to output. Since now Spark only support columnar scan for simple types, OrcReaderFactory will only output ColumnarBatch if the columns to scan are all simple types. On Tue, Apr 17, 2018 at 11:38 AM, Felix Cheung wrote: > Is it required for DataReader to support all known DataFormat? > > Hopefully, not, as assumed by the ‘throw’ in the interface. Then > specifically how are we going to express capability of the given reader of > its supported format(s), or specific support for each of “real-time data in > row format, and history data in columnar format”? > > > -- > *From:* Wenchen Fan > *Sent:* Sunday, April 15, 2018 7:45:01 PM > *To:* Spark dev list > *Subject:* [discuss][data source v2] remove type parameter in > DataReader/WriterFactory > > Hi all, > > I'd like to propose an API change to the data source v2. > > One design goal of data source v2 is API type safety. The FileFormat API > is a bad example, it asks the implementation to return InternalRow even > it's actually ColumnarBatch. In data source v2 we add a type parameter to > DataReader/WriterFactoty and DataReader/Writer, so that data source > supporting columnar scan returns ColumnarBatch at API level. > > However, we met some problems when migrating streaming and file-based data > source to data source v2. > > For the streaming side, we need a variant of DataReader/WriterFactory to > add streaming specific concept like epoch id and offset. For details please > see ContinuousDataReaderFactory and https://docs.google.com/document/d/ > 1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit# > > But this conflicts with the special format mixin traits like > SupportsScanColumnarBatch. We have to make the streaming variant of > DataReader/WriterFactory to extend the original DataReader/WriterFactory, > and do type cast at runtime, which is unnecessary and violate the type > safety. > > For the file-based data source side, we have a problem with code > duplication. Let's take ORC data source as an example. To support both > unsafe row and columnar batch scan, we need something like > > // A lot of parameters to carry to the executor side > class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] { > def createDataReader ... > } > > class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch] > { > def createDataReader ... > } > > class OrcDataSourceReader extends DataSourceReader { > def createUnsafeRowFactories = ... // logic to prepare the parameters > and create factories > > def createColumnarBatchFactories = ... // logic to prepare the > parameters and create factories > } > > You can see that we have duplicated logic for preparing parameters and > defining the factory. > > Here I propose to remove all the special format mixin traits and change > the factory interface to > > public enum DataFormat { > ROW, > INTERNAL_ROW, > UNSAFE_ROW, > COLUMNAR_BATCH > } > > interface DataReaderFactory { > DataFormat dataFormat; > > default DataReader createRowDataReader() { > throw new IllegalStateException(); > } > > default DataReader createUnsafeRowDataReader() { > throw new IllegalStateException(); > } > > default DataReader createColumnarBatchDataReader() { > throw new IllegalStateException(); > } > } > > Spark will look at the dataFormat and decide which create data reader > method to call. > > Now we don't have the problem for the streaming side as these special > format mixin traits go away. And the ORC data source can also be simplified > to > > class OrcReaderFactory(...) extends DataReaderFactory { > def createUnsafeRowReader ... > > def createColumnarBatchReader ... > } > > class OrcDataSourceReader extends DataSourceReader { > def createReadFactories = ... // logic to prepare the parameters and > create factories > } > > We also have a potential benefit of supporting hybrid storage data source, > which may keep real-time data in row format, and history data in columnar > format. Then they can make some DataReaderFactory output InternalRow and > some output ColumnarBatch. > > Thoughts? >
Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory
Is it required for DataReader to support all known DataFormat? Hopefully, not, as assumed by the 'throw' in the interface. Then specifically how are we going to express capability of the given reader of its supported format(s), or specific support for each of "real-time data in row format, and history data in columnar format"? From: Wenchen Fan Sent: Sunday, April 15, 2018 7:45:01 PM To: Spark dev list Subject: [discuss][data source v2] remove type parameter in DataReader/WriterFactory Hi all, I'd like to propose an API change to the data source v2. One design goal of data source v2 is API type safety. The FileFormat API is a bad example, it asks the implementation to return InternalRow even it's actually ColumnarBatch. In data source v2 we add a type parameter to DataReader/WriterFactoty and DataReader/Writer, so that data source supporting columnar scan returns ColumnarBatch at API level. However, we met some problems when migrating streaming and file-based data source to data source v2. For the streaming side, we need a variant of DataReader/WriterFactory to add streaming specific concept like epoch id and offset. For details please see ContinuousDataReaderFactory and https://docs.google.com/document/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit# But this conflicts with the special format mixin traits like SupportsScanColumnarBatch. We have to make the streaming variant of DataReader/WriterFactory to extend the original DataReader/WriterFactory, and do type cast at runtime, which is unnecessary and violate the type safety. For the file-based data source side, we have a problem with code duplication. Let's take ORC data source as an example. To support both unsafe row and columnar batch scan, we need something like // A lot of parameters to carry to the executor side class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] { def createDataReader ... } class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch] { def createDataReader ... } class OrcDataSourceReader extends DataSourceReader { def createUnsafeRowFactories = ... // logic to prepare the parameters and create factories def createColumnarBatchFactories = ... // logic to prepare the parameters and create factories } You can see that we have duplicated logic for preparing parameters and defining the factory. Here I propose to remove all the special format mixin traits and change the factory interface to public enum DataFormat { ROW, INTERNAL_ROW, UNSAFE_ROW, COLUMNAR_BATCH } interface DataReaderFactory { DataFormat dataFormat; default DataReader createRowDataReader() { throw new IllegalStateException(); } default DataReader createUnsafeRowDataReader() { throw new IllegalStateException(); } default DataReader createColumnarBatchDataReader() { throw new IllegalStateException(); } } Spark will look at the dataFormat and decide which create data reader method to call. Now we don't have the problem for the streaming side as these special format mixin traits go away. And the ORC data source can also be simplified to class OrcReaderFactory(...) extends DataReaderFactory { def createUnsafeRowReader ... def createColumnarBatchReader ... } class OrcDataSourceReader extends DataSourceReader { def createReadFactories = ... // logic to prepare the parameters and create factories } We also have a potential benefit of supporting hybrid storage data source, which may keep real-time data in row format, and history data in columnar format. Then they can make some DataReaderFactory output InternalRow and some output ColumnarBatch. Thoughts?
Re: Maintenance releases for SPARK-23852?
Yes, it sounds good to me. We can upgrade both Parquet 1.8.2 to 1.8.3 and ORC 1.4.1 to 1.4.3 in our upcoming Spark 2.3.1 release. Thanks for your efforts! @Henry and @Dongjoon Xiao 2018-04-16 14:41 GMT-07:00 Henry Robinson : > Seems like there aren't any objections. I'll pick this thread back up when > a Parquet maintenance release has happened. > > Henry > > On 11 April 2018 at 14:00, Dongjoon Hyun wrote: > >> Great. >> >> If we can upgrade the parquet dependency from 1.8.2 to 1.8.3 in Apache >> Spark 2.3.1, let's upgrade orc dependency from 1.4.1 to 1.4.3 together. >> >> Currently, the patch is only merged into master branch now. 1.4.1 has the >> following issue. >> >> https://issues.apache.org/jira/browse/SPARK-23340 >> >> Bests, >> Dongjoon. >> >> >> >> On Wed, Apr 11, 2018 at 1:23 PM, Reynold Xin wrote: >> >>> Seems like this would make sense... we usually make maintenance releases >>> for bug fixes after a month anyway. >>> >>> >>> On Wed, Apr 11, 2018 at 12:52 PM, Henry Robinson >>> wrote: >>> On 11 April 2018 at 12:47, Ryan Blue wrote: > I think a 1.8.3 Parquet release makes sense for the 2.3.x releases of > Spark. > > To be clear though, this only affects Spark when reading data written > by Impala, right? Or does Parquet CPP also produce data like this? > I don't know about parquet-cpp, but yeah, the only implementation I've seen writing the half-completed stats is Impala. (as you know, that's compliant with the spec, just an unusual choice). > > On Wed, Apr 11, 2018 at 12:35 PM, Henry Robinson > wrote: > >> Hi all - >> >> SPARK-23852 (where a query can silently give wrong results thanks to >> a predicate pushdown bug in Parquet) is a fairly bad bug. In other >> projects >> I've been involved with, we've released maintenance releases for bugs of >> this severity. >> >> Since Spark 2.4.0 is probably a while away, I wanted to see if there >> was any consensus over whether we should consider (at least) a 2.3.1. >> >> The reason this particular issue is a bit tricky is that the Parquet >> community haven't yet produced a maintenance release that fixes the >> underlying bug, but they are in the process of releasing a new minor >> version, 1.10, which includes a fix. Having spoken to a couple of Parquet >> developers, they'd be willing to consider a maintenance release, but >> would >> probably only bother if we (or another affected project) asked them to. >> >> My guess is that we wouldn't want to upgrade to a new minor version >> of Parquet for a Spark maintenance release, so asking for a Parquet >> maintenance release makes sense. >> >> What does everyone think? >> >> Best, >> Henry >> > > > > -- > Ryan Blue > Software Engineer > Netflix > >>> >> >
Re: Maintenance releases for SPARK-23852?
Seems like there aren't any objections. I'll pick this thread back up when a Parquet maintenance release has happened. Henry On 11 April 2018 at 14:00, Dongjoon Hyun wrote: > Great. > > If we can upgrade the parquet dependency from 1.8.2 to 1.8.3 in Apache > Spark 2.3.1, let's upgrade orc dependency from 1.4.1 to 1.4.3 together. > > Currently, the patch is only merged into master branch now. 1.4.1 has the > following issue. > > https://issues.apache.org/jira/browse/SPARK-23340 > > Bests, > Dongjoon. > > > > On Wed, Apr 11, 2018 at 1:23 PM, Reynold Xin wrote: > >> Seems like this would make sense... we usually make maintenance releases >> for bug fixes after a month anyway. >> >> >> On Wed, Apr 11, 2018 at 12:52 PM, Henry Robinson >> wrote: >> >>> >>> >>> On 11 April 2018 at 12:47, Ryan Blue wrote: >>> I think a 1.8.3 Parquet release makes sense for the 2.3.x releases of Spark. To be clear though, this only affects Spark when reading data written by Impala, right? Or does Parquet CPP also produce data like this? >>> >>> I don't know about parquet-cpp, but yeah, the only implementation I've >>> seen writing the half-completed stats is Impala. (as you know, that's >>> compliant with the spec, just an unusual choice). >>> >>> On Wed, Apr 11, 2018 at 12:35 PM, Henry Robinson wrote: > Hi all - > > SPARK-23852 (where a query can silently give wrong results thanks to a > predicate pushdown bug in Parquet) is a fairly bad bug. In other projects > I've been involved with, we've released maintenance releases for bugs of > this severity. > > Since Spark 2.4.0 is probably a while away, I wanted to see if there > was any consensus over whether we should consider (at least) a 2.3.1. > > The reason this particular issue is a bit tricky is that the Parquet > community haven't yet produced a maintenance release that fixes the > underlying bug, but they are in the process of releasing a new minor > version, 1.10, which includes a fix. Having spoken to a couple of Parquet > developers, they'd be willing to consider a maintenance release, but would > probably only bother if we (or another affected project) asked them to. > > My guess is that we wouldn't want to upgrade to a new minor version of > Parquet for a Spark maintenance release, so asking for a Parquet > maintenance release makes sense. > > What does everyone think? > > Best, > Henry > -- Ryan Blue Software Engineer Netflix >>> >>> >> >
Re: Isolate 1 partition and perform computations
Hello, Thank you very much for your response Anastasie! Today I think I made it through dropping partitions in (runJob or submitJob) - I don’t remember exactly, in DAGScheduler. If it doesn’t work properly after some tests, I will follow your approach. Thank you, Thodoris > On 16 Apr 2018, at 20:11, Anastasios Zouzias wrote: > > Hi all, > > I think this is doable using the mapPartitionsWithIndex method of RDD. > > Example: > > val partitionIndex = 0 // Your favorite partition index here > val rdd = spark.sparkContext.parallelize(Array.range(0, 1000)) > // Replace elements of partitionIndex with [-10, .. ,0] > val fixed = rdd.mapPartitionsWithIndex{case (idx, iter) => if (idx == > partitionIndex) Array.range(-10, 0).toIterator else iter} > > > Best regards, > Anastasios > > >> On Sun, Apr 15, 2018 at 12:59 AM, Thodoris Zois wrote: >> I forgot to mention that I would like my approach to be independent from the >> application that user is going to submit to Spark. >> >> Assume that I don’t know anything about user’s application… I expected to >> find a simpler approach. I saw in RDD.scala that an RDD is characterized by >> a list of partitions. If I modify this list and keep only one partition, is >> it going to work? >> >> - Thodoris >> >> >> > On 15 Apr 2018, at 01:40, Matthias Boehm wrote: >> > >> > you might wanna have a look into using a PartitionPruningRDD to select >> > a subset of partitions by ID. This approach worked very well for >> > multi-key lookups for us [1]. >> > >> > A major advantage compared to scan-based operations is that, if your >> > source RDD has an existing partitioner, only relevant partitions are >> > accessed. >> > >> > [1] >> > https://github.com/apache/systemml/blob/master/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java#L603 >> > >> > Regards, >> > Matthias >> > >> > On Sat, Apr 14, 2018 at 3:12 PM, Thodoris Zois wrote: >> >> Hello list, >> >> >> >> I am sorry for sending this message here, but I could not manage to get >> >> any response in “users”. For specific purposes I would like to isolate 1 >> >> partition of the RDD and perform computations only to this. >> >> >> >> For instance, suppose that a user asks Spark to create 500 partitions for >> >> the RDD. I would like Spark to create the partitions but perform >> >> computations only in one partition from those 500 ignoring the other 499. >> >> >> >> At first I tried to modify executor in order to run only 1 partition >> >> (task) but I didn’t manage to make it work. Then I tried the DAG >> >> Scheduler but I think that I should modify the code in a higher level and >> >> let Spark make the partitioning but at the end see only one partition and >> >> throw throw away all the others. >> >> >> >> My question is which file should I modify in order to achieve isolating 1 >> >> partition of the RDD? Where does the actual partitioning is made? >> >> >> >> I hope it is clear! >> >> >> >> Thank you very much, >> >> Thodoris >> >> >> >> >> >> - >> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> >> >> >> >> - >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> > > > > -- > -- Anastasios Zouzias
Re: Isolate 1 partition and perform computations
Hi all, I think this is doable using the mapPartitionsWithIndex method of RDD. Example: val partitionIndex = 0 // Your favorite partition index here val rdd = spark.sparkContext.parallelize(Array.range(0, 1000)) // Replace elements of partitionIndex with [-10, .. ,0] val fixed = rdd.mapPartitionsWithIndex{case (idx, iter) => if (idx == partitionIndex) Array.range(-10, 0).toIterator else iter} Best regards, Anastasios On Sun, Apr 15, 2018 at 12:59 AM, Thodoris Zois wrote: > I forgot to mention that I would like my approach to be independent from > the application that user is going to submit to Spark. > > Assume that I don’t know anything about user’s application… I expected to > find a simpler approach. I saw in RDD.scala that an RDD is characterized by > a list of partitions. If I modify this list and keep only one partition, is > it going to work? > > - Thodoris > > > > On 15 Apr 2018, at 01:40, Matthias Boehm wrote: > > > > you might wanna have a look into using a PartitionPruningRDD to select > > a subset of partitions by ID. This approach worked very well for > > multi-key lookups for us [1]. > > > > A major advantage compared to scan-based operations is that, if your > > source RDD has an existing partitioner, only relevant partitions are > > accessed. > > > > [1] https://github.com/apache/systemml/blob/master/src/main/ > java/org/apache/sysml/runtime/instructions/spark/ > MatrixIndexingSPInstruction.java#L603 > > > > Regards, > > Matthias > > > > On Sat, Apr 14, 2018 at 3:12 PM, Thodoris Zois > wrote: > >> Hello list, > >> > >> I am sorry for sending this message here, but I could not manage to get > any response in “users”. For specific purposes I would like to isolate 1 > partition of the RDD and perform computations only to this. > >> > >> For instance, suppose that a user asks Spark to create 500 partitions > for the RDD. I would like Spark to create the partitions but perform > computations only in one partition from those 500 ignoring the other 499. > >> > >> At first I tried to modify executor in order to run only 1 partition > (task) but I didn’t manage to make it work. Then I tried the DAG Scheduler > but I think that I should modify the code in a higher level and let Spark > make the partitioning but at the end see only one partition and throw throw > away all the others. > >> > >> My question is which file should I modify in order to achieve isolating > 1 partition of the RDD? Where does the actual partitioning is made? > >> > >> I hope it is clear! > >> > >> Thank you very much, > >> Thodoris > >> > >> > >> - > >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >> > > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias