Can you share the plan for the program? Are you sure that more than 1 split is generated by the JdbcInputFormat?
2018-08-10 12:04 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: > It seems I may have spoken too soon. After executing the job with more > data, I can see the following things in the Flink dashboard: > > - The first subtask is a chained DataSource -> GroupCombine. Even with > parallelism set to 24 and a ParameterValuesProvider returning > Array(Array("first"), Array("second")), only 1 thread processed all records. > - The second subtask is a Sorted Group Reduce, and I see two weird things: > + The first subtask sent 5,923,802 records, yet the second subtask only > received 5,575,154 records? > + Again, everything was done in a single thread, even though a groupBy > was used. > - The third and final subtask is a sink that saves back to the database. > > Does anyone know why parallelism is not being used? > > Regards, > Alexis. > > > On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <alexis.sa...@gmail.com> > wrote: > >> Hi Fabian, >> >> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0, >> declares javaSet as private[flink], so I cannot access it directly. >> Nevertheless, I managed to get around it by using the java environment: >> >> val env = org.apache.flink.api.java.ExecutionEnvironment.getExecut >> ionEnvironment >> >> val inputFormat = getInputFormat(query, dbUrl, properties) >> val outputFormat = getOutputFormat(dbUrl, properties) >> >> val source = env.createInput(inputFormat) >> val sdp = source.getSplitDataProperties >> sdp.splitsPartitionedBy(0) >> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING)) >> >> // transform java DataSet to scala DataSet... >> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]]) >> .groupBy(0, 1) >> .combineGroup(groupCombiner) >> .withForwardedFields("f0->_1") >> .groupBy(0, 1) >> .reduceGroup(groupReducer) >> .withForwardedFields("_1") >> .output(outputFormat) >> >> It seems to work well, and the semantic annotation does remove a hash >> partition from the execution plan. >> >> Regards, >> Alexis. >> >> >> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Alexis, >>> >>> The Scala API does not expose a DataSource object but only a Scala >>> DataSet which wraps the Java object. >>> You can get the SplitDataProperties from the Scala DataSet as follows: >>> >>> val dbData: DataSet[...] = ??? >>> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties >>> >>> So you first have to get the wrapped Java DataSet, cast it to DataSource >>> and then get the properties. >>> It's not very nice, but should work. >>> >>> In order to use SDPs, you should be a bit familiar how physical data >>> properties are propagated and discarded in the optimizer. >>> For example, applying a simple MapFunction removes all properties >>> because the function might have changed the fields on which a DataSet is >>> partitioned or sorted. >>> You can expose the behavior of a function to the optimizer by using >>> Semantic Annotations [1] >>> >>> Some comments on the code and plan you shared: >>> - You might want to add hostname to ORDER BY to have the output grouped >>> by (ts, hostname). >>> - Check the Global and Local data properties in the plan to validate >>> that the SDP were correctly interpreted. >>> - If the data is already correctly partitioned and sorted, you might not >>> need the Combiners. In either case, you properly want to annotate them with >>> Forward Field annoations. >>> >>> The number of source tasks is unrelated to the number of splits. If you >>> have more tasks than splits, some tasks won't process any data. >>> >>> Best, Fabian >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs- >>> release-1.5/dev/batch/#semantic-annotations >>> >>> >>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: >>> >>>> Hi Fabian, >>>> >>>> Thanks for the clarification. I have a few remarks, but let me provide >>>> more concrete information. You can find the query I'm using, the >>>> JDBCInputFormat creation, and the execution plan in this github gist: >>>> >>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d >>>> >>>> I cannot call getSplitDataProperties because >>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the >>>> code, I do this instead: >>>> >>>> val javaEnv = org.apache.flink.api.java.ExecutionEnvironment. >>>> getExecutionEnvironment >>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo, >>>> "example") >>>> >>>> which feels wrong (the constructor doesn't accept a Scala environment). >>>> Is there a better alternative? >>>> >>>> I see absolutely no difference in the execution plan whether I use SDP >>>> or not, so therefore the results are indeed the same. Is this expected? >>>> >>>> My ParameterValuesProvider specifies 2 splits, yet the execution plan >>>> shows Parallelism=24. Even the source code is a bit ambiguous, considering >>>> that the constructor for GenericInputSplit takes two parameters: >>>> partitionNumber and totalNumberOfPartitions. Should I assume that there are >>>> 2 splits divided into 24 partitions? >>>> >>>> Regards, >>>> Alexis. >>>> >>>> >>>> >>>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Alexis, >>>>> >>>>> First of all, I think you leverage the partitioning and sorting >>>>> properties of the data returned by the database using SplitDataProperties. >>>>> However, please be aware that SplitDataProperties are a rather >>>>> experimental feature. >>>>> >>>>> If used without query parameters, the JDBCInputFormat generates a >>>>> single split and queries the database just once. If you want to leverage >>>>> parallelism, you have to specify a query with parameters in the WHERE >>>>> clause to read different parts of the table. >>>>> Note, depending on the configuration of the database, multiple queries >>>>> result in multiple full scans. Hence, it might make sense to have an index >>>>> on the partitioning columns. >>>>> >>>>> If properly configured, the JDBCInputFormat generates multiple splits >>>>> which are partitioned. Since the partitioning is encoded in the query, it >>>>> is opaque to Flink and must be explicitly declared. >>>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells >>>>> Flink that all records with the same value in the partitioning field are >>>>> read from the same split, i.e, the full data is partitioned on the >>>>> attribute across splits. >>>>> The same can be done for ordering if the queries of the >>>>> JDBCInputFormat is specified with an ORDER BY clause. >>>>> Partitioning and grouping are two different things. You can define a >>>>> query that partitions on hostname and orders by hostname and timestamp and >>>>> declare these properties in the SDP. >>>>> >>>>> You can get a SDP object by calling DataSource.getSplitDataProperties(). >>>>> In your example this would be source.getSplitDataProperties(). >>>>> >>>>> Whatever you do, you should carefully check the execution plan >>>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer >>>>> [1] and validate that the result are identical whether you use SDP or not. >>>>> >>>>> Best, Fabian >>>>> >>>>> [1] https://flink.apache.org/visualizer/ >>>>> >>>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: >>>>> >>>>>> Hi everyone, >>>>>> >>>>>> I have the following scenario: I have a database table with 3 >>>>>> columns: a host (string), a timestamp, and an integer ID. Conceptually, >>>>>> what I'd like to do is: >>>>>> >>>>>> group by host and timestamp -> based on all the IDs in each group, >>>>>> create a mapping to n new tuples -> for each unique tuple, count how many >>>>>> times it appeared across the resulting data >>>>>> >>>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1 >>>>>> >>>>>> What I'm currently doing is roughly: >>>>>> >>>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish() >>>>>> val source = environment.createInput(inut) >>>>>> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0, >>>>>> 1).aggregate(SUM, 2) >>>>>> >>>>>> The query given to JDBCInputFormat provides results ordered by host >>>>>> and timestamp, and I was wondering if performance can be improved by >>>>>> specifying this in the code. I've looked at http://apache-flink-user- >>>>>> mailing-list-archive.2336050.n4.nabble.com/Terminology- >>>>>> Split-Group-and-Partition-td11030.html and http://apache-flink-user- >>>>>> mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing- >>>>>> Sorted-Input-Datasets-td20038.html, but I still have some questions: >>>>>> >>>>>> - If a split is a subset of a partition, what is the meaning of >>>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing >>>>>> that a split is divided into partitions, meaning that a partition would >>>>>> be >>>>>> a subset of a split. >>>>>> - At which point can I retrieve and adjust a SplitDataProperties >>>>>> instance, if possible at all? >>>>>> - If I wanted a coarser parallelization where each slot gets all the >>>>>> data for the same host, would I have to manually create the sub-groups >>>>>> based on timestamp? >>>>>> >>>>>> Regards, >>>>>> Alexis. >>>>>> >>>>>> >>>>> >>>