Hi Alexis, Yes, the job cannot be executed until the required number of processing slots becomes available. IIRC, there is a timeout and a job gets canceled once the waiting time exceeds the threshold.
Best, Fabian 2018-08-10 15:35 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: > It ended up being a wrong configuration of the cluster; there was only 1 > task manager with 1 slot. > > If I submit a job with "flink run -p 24 ...", will the job hang until at > least 24 slots are available? > > Regards, > Alexis. > > On Fri, 10 Aug 2018, 14:01 Fabian Hueske <fhue...@gmail.com> wrote: > >> Can you share the plan for the program? >> >> Are you sure that more than 1 split is generated by the JdbcInputFormat? >> >> 2018-08-10 12:04 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: >> >>> It seems I may have spoken too soon. After executing the job with more >>> data, I can see the following things in the Flink dashboard: >>> >>> - The first subtask is a chained DataSource -> GroupCombine. Even with >>> parallelism set to 24 and a ParameterValuesProvider returning >>> Array(Array("first"), Array("second")), only 1 thread processed all records. >>> - The second subtask is a Sorted Group Reduce, and I see two weird >>> things: >>> + The first subtask sent 5,923,802 records, yet the second subtask >>> only received 5,575,154 records? >>> + Again, everything was done in a single thread, even though a groupBy >>> was used. >>> - The third and final subtask is a sink that saves back to the database. >>> >>> Does anyone know why parallelism is not being used? >>> >>> Regards, >>> Alexis. >>> >>> >>> On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <alexis.sa...@gmail.com> >>> wrote: >>> >>>> Hi Fabian, >>>> >>>> Thanks a lot for the help. The scala DataSet, at least in version >>>> 1.5.0, declares javaSet as private[flink], so I cannot access it directly. >>>> Nevertheless, I managed to get around it by using the java environment: >>>> >>>> val env = org.apache.flink.api.java.ExecutionEnvironment.getExecut >>>> ionEnvironment >>>> >>>> val inputFormat = getInputFormat(query, dbUrl, properties) >>>> val outputFormat = getOutputFormat(dbUrl, properties) >>>> >>>> val source = env.createInput(inputFormat) >>>> val sdp = source.getSplitDataProperties >>>> sdp.splitsPartitionedBy(0) >>>> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING)) >>>> >>>> // transform java DataSet to scala DataSet... >>>> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]]) >>>> .groupBy(0, 1) >>>> .combineGroup(groupCombiner) >>>> .withForwardedFields("f0->_1") >>>> .groupBy(0, 1) >>>> .reduceGroup(groupReducer) >>>> .withForwardedFields("_1") >>>> .output(outputFormat) >>>> >>>> It seems to work well, and the semantic annotation does remove a hash >>>> partition from the execution plan. >>>> >>>> Regards, >>>> Alexis. >>>> >>>> >>>> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Alexis, >>>>> >>>>> The Scala API does not expose a DataSource object but only a Scala >>>>> DataSet which wraps the Java object. >>>>> You can get the SplitDataProperties from the Scala DataSet as follows: >>>>> >>>>> val dbData: DataSet[...] = ??? >>>>> val sdp = dbData.javaSet.asInstanceOf[DataSource]. >>>>> getSplitDataProperties >>>>> >>>>> So you first have to get the wrapped Java DataSet, cast it to >>>>> DataSource and then get the properties. >>>>> It's not very nice, but should work. >>>>> >>>>> In order to use SDPs, you should be a bit familiar how physical data >>>>> properties are propagated and discarded in the optimizer. >>>>> For example, applying a simple MapFunction removes all properties >>>>> because the function might have changed the fields on which a DataSet is >>>>> partitioned or sorted. >>>>> You can expose the behavior of a function to the optimizer by using >>>>> Semantic Annotations [1] >>>>> >>>>> Some comments on the code and plan you shared: >>>>> - You might want to add hostname to ORDER BY to have the output >>>>> grouped by (ts, hostname). >>>>> - Check the Global and Local data properties in the plan to validate >>>>> that the SDP were correctly interpreted. >>>>> - If the data is already correctly partitioned and sorted, you might >>>>> not need the Combiners. In either case, you properly want to annotate them >>>>> with Forward Field annoations. >>>>> >>>>> The number of source tasks is unrelated to the number of splits. If >>>>> you have more tasks than splits, some tasks won't process any data. >>>>> >>>>> Best, Fabian >>>>> >>>>> [1] https://ci.apache.org/projects/flink/flink-docs- >>>>> release-1.5/dev/batch/#semantic-annotations >>>>> >>>>> >>>>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: >>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> Thanks for the clarification. I have a few remarks, but let me >>>>>> provide more concrete information. You can find the query I'm using, the >>>>>> JDBCInputFormat creation, and the execution plan in this github gist: >>>>>> >>>>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d >>>>>> >>>>>> I cannot call getSplitDataProperties because >>>>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the >>>>>> code, I do this instead: >>>>>> >>>>>> val javaEnv = org.apache.flink.api.java.ExecutionEnvironment. >>>>>> getExecutionEnvironment >>>>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo, >>>>>> "example") >>>>>> >>>>>> which feels wrong (the constructor doesn't accept a Scala >>>>>> environment). Is there a better alternative? >>>>>> >>>>>> I see absolutely no difference in the execution plan whether I use >>>>>> SDP or not, so therefore the results are indeed the same. Is this >>>>>> expected? >>>>>> >>>>>> My ParameterValuesProvider specifies 2 splits, yet the execution plan >>>>>> shows Parallelism=24. Even the source code is a bit ambiguous, >>>>>> considering >>>>>> that the constructor for GenericInputSplit takes two parameters: >>>>>> partitionNumber and totalNumberOfPartitions. Should I assume that there >>>>>> are >>>>>> 2 splits divided into 24 partitions? >>>>>> >>>>>> Regards, >>>>>> Alexis. >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Alexis, >>>>>>> >>>>>>> First of all, I think you leverage the partitioning and sorting >>>>>>> properties of the data returned by the database using >>>>>>> SplitDataProperties. >>>>>>> However, please be aware that SplitDataProperties are a rather >>>>>>> experimental feature. >>>>>>> >>>>>>> If used without query parameters, the JDBCInputFormat generates a >>>>>>> single split and queries the database just once. If you want to leverage >>>>>>> parallelism, you have to specify a query with parameters in the WHERE >>>>>>> clause to read different parts of the table. >>>>>>> Note, depending on the configuration of the database, multiple >>>>>>> queries result in multiple full scans. Hence, it might make sense to >>>>>>> have >>>>>>> an index on the partitioning columns. >>>>>>> >>>>>>> If properly configured, the JDBCInputFormat generates multiple >>>>>>> splits which are partitioned. Since the partitioning is encoded in the >>>>>>> query, it is opaque to Flink and must be explicitly declared. >>>>>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method >>>>>>> tells Flink that all records with the same value in the partitioning >>>>>>> field >>>>>>> are read from the same split, i.e, the full data is partitioned on the >>>>>>> attribute across splits. >>>>>>> The same can be done for ordering if the queries of the >>>>>>> JDBCInputFormat is specified with an ORDER BY clause. >>>>>>> Partitioning and grouping are two different things. You can define a >>>>>>> query that partitions on hostname and orders by hostname and timestamp >>>>>>> and >>>>>>> declare these properties in the SDP. >>>>>>> >>>>>>> You can get a SDP object by calling DataSource.getSplitDataProperties(). >>>>>>> In your example this would be source.getSplitDataProperties(). >>>>>>> >>>>>>> Whatever you do, you should carefully check the execution plan >>>>>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer >>>>>>> [1] and validate that the result are identical whether you use SDP or >>>>>>> not. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> [1] https://flink.apache.org/visualizer/ >>>>>>> >>>>>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: >>>>>>> >>>>>>>> Hi everyone, >>>>>>>> >>>>>>>> I have the following scenario: I have a database table with 3 >>>>>>>> columns: a host (string), a timestamp, and an integer ID. Conceptually, >>>>>>>> what I'd like to do is: >>>>>>>> >>>>>>>> group by host and timestamp -> based on all the IDs in each group, >>>>>>>> create a mapping to n new tuples -> for each unique tuple, count how >>>>>>>> many >>>>>>>> times it appeared across the resulting data >>>>>>>> >>>>>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1 >>>>>>>> >>>>>>>> What I'm currently doing is roughly: >>>>>>>> >>>>>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish() >>>>>>>> val source = environment.createInput(inut) >>>>>>>> source.partitionByHash("host", >>>>>>>> "timestamp").mapPartition(...).groupBy(0, >>>>>>>> 1).aggregate(SUM, 2) >>>>>>>> >>>>>>>> The query given to JDBCInputFormat provides results ordered by host >>>>>>>> and timestamp, and I was wondering if performance can be improved by >>>>>>>> specifying this in the code. I've looked at >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050. >>>>>>>> n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html >>>>>>>> and http://apache-flink-user-mailing-list-archive.2336050. >>>>>>>> n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html, >>>>>>>> but I still have some questions: >>>>>>>> >>>>>>>> - If a split is a subset of a partition, what is the meaning of >>>>>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me >>>>>>>> thing that a split is divided into partitions, meaning that a partition >>>>>>>> would be a subset of a split. >>>>>>>> - At which point can I retrieve and adjust a SplitDataProperties >>>>>>>> instance, if possible at all? >>>>>>>> - If I wanted a coarser parallelization where each slot gets all >>>>>>>> the data for the same host, would I have to manually create the >>>>>>>> sub-groups >>>>>>>> based on timestamp? >>>>>>>> >>>>>>>> Regards, >>>>>>>> Alexis. >>>>>>>> >>>>>>>> >>>>>>> >>>>> >>