Hi Fabian, Thanks for the clarification. I have a few remarks, but let me provide more concrete information. You can find the query I'm using, the JDBCInputFormat creation, and the execution plan in this github gist:
https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d I cannot call getSplitDataProperties because env.createInput(inputFormat) returns a DataSet, not a DataSource. In the code, I do this instead: val javaEnv = org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo, "example") which feels wrong (the constructor doesn't accept a Scala environment). Is there a better alternative? I see absolutely no difference in the execution plan whether I use SDP or not, so therefore the results are indeed the same. Is this expected? My ParameterValuesProvider specifies 2 splits, yet the execution plan shows Parallelism=24. Even the source code is a bit ambiguous, considering that the constructor for GenericInputSplit takes two parameters: partitionNumber and totalNumberOfPartitions. Should I assume that there are 2 splits divided into 24 partitions? Regards, Alexis. On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Alexis, > > First of all, I think you leverage the partitioning and sorting properties > of the data returned by the database using SplitDataProperties. > However, please be aware that SplitDataProperties are a rather > experimental feature. > > If used without query parameters, the JDBCInputFormat generates a single > split and queries the database just once. If you want to leverage > parallelism, you have to specify a query with parameters in the WHERE > clause to read different parts of the table. > Note, depending on the configuration of the database, multiple queries > result in multiple full scans. Hence, it might make sense to have an index > on the partitioning columns. > > If properly configured, the JDBCInputFormat generates multiple splits > which are partitioned. Since the partitioning is encoded in the query, it > is opaque to Flink and must be explicitly declared. > This can be done with SDPs. The SDP.splitsPartitionedBy() method tells > Flink that all records with the same value in the partitioning field are > read from the same split, i.e, the full data is partitioned on the > attribute across splits. > The same can be done for ordering if the queries of the JDBCInputFormat is > specified with an ORDER BY clause. > Partitioning and grouping are two different things. You can define a query > that partitions on hostname and orders by hostname and timestamp and > declare these properties in the SDP. > > You can get a SDP object by calling DataSource.getSplitDataProperties(). > In your example this would be source.getSplitDataProperties(). > > Whatever you do, you should carefully check the execution plan > (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and > validate that the result are identical whether you use SDP or not. > > Best, Fabian > > [1] https://flink.apache.org/visualizer/ > > 2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: > >> Hi everyone, >> >> I have the following scenario: I have a database table with 3 columns: a >> host (string), a timestamp, and an integer ID. Conceptually, what I'd like >> to do is: >> >> group by host and timestamp -> based on all the IDs in each group, create >> a mapping to n new tuples -> for each unique tuple, count how many times it >> appeared across the resulting data >> >> Each new tuple has 3 fields: the host, a new ID, and an Integer=1 >> >> What I'm currently doing is roughly: >> >> val input = JDBCInputFormat.buildJDBCInputFormat()...finish() >> val source = environment.createInput(inut) >> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0, >> 1).aggregate(SUM, 2) >> >> The query given to JDBCInputFormat provides results ordered by host and >> timestamp, and I was wondering if performance can be improved by specifying >> this in the code. I've looked at >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html >> and >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html, >> but I still have some questions: >> >> - If a split is a subset of a partition, what is the meaning of >> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a >> split is divided into partitions, meaning that a partition would be a >> subset of a split. >> - At which point can I retrieve and adjust a SplitDataProperties >> instance, if possible at all? >> - If I wanted a coarser parallelization where each slot gets all the data >> for the same host, would I have to manually create the sub-groups based on >> timestamp? >> >> Regards, >> Alexis. >> >> >