Hi Alexis,

Yes, the job cannot be executed until the required number of processing
slots becomes available.
IIRC, there is a timeout and a job gets canceled once the waiting time
exceeds the threshold.

Best, Fabian

2018-08-10 15:35 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:

> It ended up being a wrong configuration of the cluster; there was only 1
> task manager with 1 slot.
>
> If I submit a job with "flink run -p 24 ...", will the job hang until at
> least 24 slots are available?
>
> Regards,
> Alexis.
>
> On Fri, 10 Aug 2018, 14:01 Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Can you share the plan for the program?
>>
>> Are you sure that more than 1 split is generated by the JdbcInputFormat?
>>
>> 2018-08-10 12:04 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:
>>
>>> It seems I may have spoken too soon. After executing the job with more
>>> data, I can see the following things in the Flink dashboard:
>>>
>>> - The first subtask is a chained DataSource -> GroupCombine. Even with
>>> parallelism set to 24 and a ParameterValuesProvider returning
>>> Array(Array("first"), Array("second")), only 1 thread processed all records.
>>> - The second subtask is a Sorted Group Reduce, and I see two weird
>>> things:
>>>   + The first subtask sent 5,923,802 records, yet the second subtask
>>> only received 5,575,154 records?
>>>   + Again, everything was done in a single thread, even though a groupBy
>>> was used.
>>> - The third and final subtask is a sink that saves back to the database.
>>>
>>> Does anyone know why parallelism is not being used?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>
>>> On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <alexis.sa...@gmail.com>
>>> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thanks a lot for the help. The scala DataSet, at least in version
>>>> 1.5.0, declares javaSet as private[flink], so I cannot access it directly.
>>>> Nevertheless, I managed to get around it by using the java environment:
>>>>
>>>> val env = org.apache.flink.api.java.ExecutionEnvironment.getExecut
>>>> ionEnvironment
>>>>
>>>> val inputFormat = getInputFormat(query, dbUrl, properties)
>>>> val outputFormat = getOutputFormat(dbUrl, properties)
>>>>
>>>> val source = env.createInput(inputFormat)
>>>> val sdp = source.getSplitDataProperties
>>>> sdp.splitsPartitionedBy(0)
>>>> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>>>>
>>>> // transform java DataSet to scala DataSet...
>>>> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>>>>   .groupBy(0, 1)
>>>>   .combineGroup(groupCombiner)
>>>>   .withForwardedFields("f0->_1")
>>>>   .groupBy(0, 1)
>>>>   .reduceGroup(groupReducer)
>>>>   .withForwardedFields("_1")
>>>>   .output(outputFormat)
>>>>
>>>> It seems to work well, and the semantic annotation does remove a hash
>>>> partition from the execution plan.
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>
>>>> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Alexis,
>>>>>
>>>>> The Scala API does not expose a DataSource object but only a Scala
>>>>> DataSet which wraps the Java object.
>>>>> You can get the SplitDataProperties from the Scala DataSet as follows:
>>>>>
>>>>> val dbData: DataSet[...] = ???
>>>>> val sdp = dbData.javaSet.asInstanceOf[DataSource].
>>>>> getSplitDataProperties
>>>>>
>>>>> So you first have to get the wrapped Java DataSet, cast it to
>>>>> DataSource and then get the properties.
>>>>> It's not very nice, but should work.
>>>>>
>>>>> In order to use SDPs, you should be a bit familiar how physical data
>>>>> properties are propagated and discarded in the optimizer.
>>>>> For example, applying a simple MapFunction removes all properties
>>>>> because the function might have changed the fields on which a DataSet is
>>>>> partitioned or sorted.
>>>>> You can expose the behavior of a function to the optimizer by using
>>>>> Semantic Annotations [1]
>>>>>
>>>>> Some comments on the code and plan you shared:
>>>>> - You might want to add hostname to ORDER BY to have the output
>>>>> grouped by (ts, hostname).
>>>>> - Check the Global and Local data properties in the plan to validate
>>>>> that the SDP were correctly interpreted.
>>>>> - If the data is already correctly partitioned and sorted, you might
>>>>> not need the Combiners. In either case, you properly want to annotate them
>>>>> with Forward Field annoations.
>>>>>
>>>>> The number of source tasks is unrelated to the number of splits. If
>>>>> you have more tasks than splits, some tasks won't process any data.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>>>> release-1.5/dev/batch/#semantic-annotations
>>>>>
>>>>>
>>>>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> Thanks for the clarification. I have a few remarks, but let me
>>>>>> provide more concrete information. You can find the query I'm using, the
>>>>>> JDBCInputFormat creation, and the execution plan in this github gist:
>>>>>>
>>>>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>>>>>
>>>>>> I cannot call getSplitDataProperties because
>>>>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the
>>>>>> code, I do this instead:
>>>>>>
>>>>>> val javaEnv = org.apache.flink.api.java.ExecutionEnvironment.
>>>>>> getExecutionEnvironment
>>>>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>>>>>> "example")
>>>>>>
>>>>>> which feels wrong (the constructor doesn't accept a Scala
>>>>>> environment). Is there a better alternative?
>>>>>>
>>>>>> I see absolutely no difference in the execution plan whether I use
>>>>>> SDP or not, so therefore the results are indeed the same. Is this 
>>>>>> expected?
>>>>>>
>>>>>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>>>>>> shows Parallelism=24. Even the source code is a bit ambiguous, 
>>>>>> considering
>>>>>> that the constructor for GenericInputSplit takes two parameters:
>>>>>> partitionNumber and totalNumberOfPartitions. Should I assume that there 
>>>>>> are
>>>>>> 2 splits divided into 24 partitions?
>>>>>>
>>>>>> Regards,
>>>>>> Alexis.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Alexis,
>>>>>>>
>>>>>>> First of all, I think you leverage the partitioning and sorting
>>>>>>> properties of the data returned by the database using 
>>>>>>> SplitDataProperties.
>>>>>>> However, please be aware that SplitDataProperties are a rather
>>>>>>> experimental feature.
>>>>>>>
>>>>>>> If used without query parameters, the JDBCInputFormat generates a
>>>>>>> single split and queries the database just once. If you want to leverage
>>>>>>> parallelism, you have to specify a query with parameters in the WHERE
>>>>>>> clause to read different parts of the table.
>>>>>>> Note, depending on the configuration of the database, multiple
>>>>>>> queries result in multiple full scans. Hence, it might make sense to 
>>>>>>> have
>>>>>>> an index on the partitioning columns.
>>>>>>>
>>>>>>> If properly configured, the JDBCInputFormat generates multiple
>>>>>>> splits which are partitioned. Since the partitioning is encoded in the
>>>>>>> query, it is opaque to Flink and must be explicitly declared.
>>>>>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method
>>>>>>> tells Flink that all records with the same value in the partitioning 
>>>>>>> field
>>>>>>> are read from the same split, i.e, the full data is partitioned on the
>>>>>>> attribute across splits.
>>>>>>> The same can be done for ordering if the queries of the
>>>>>>> JDBCInputFormat is specified with an ORDER BY clause.
>>>>>>> Partitioning and grouping are two different things. You can define a
>>>>>>> query that partitions on hostname and orders by hostname and timestamp 
>>>>>>> and
>>>>>>> declare these properties in the SDP.
>>>>>>>
>>>>>>> You can get a SDP object by calling DataSource.getSplitDataProperties().
>>>>>>> In your example this would be source.getSplitDataProperties().
>>>>>>>
>>>>>>> Whatever you do, you should carefully check the execution plan
>>>>>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer
>>>>>>> [1] and validate that the result are identical whether you use SDP or 
>>>>>>> not.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> [1] https://flink.apache.org/visualizer/
>>>>>>>
>>>>>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:
>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> I have the following scenario: I have a database table with 3
>>>>>>>> columns: a host (string), a timestamp, and an integer ID. Conceptually,
>>>>>>>> what I'd like to do is:
>>>>>>>>
>>>>>>>> group by host and timestamp -> based on all the IDs in each group,
>>>>>>>> create a mapping to n new tuples -> for each unique tuple, count how 
>>>>>>>> many
>>>>>>>> times it appeared across the resulting data
>>>>>>>>
>>>>>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>>>>>>>
>>>>>>>> What I'm currently doing is roughly:
>>>>>>>>
>>>>>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>>>>>>>> val source = environment.createInput(inut)
>>>>>>>> source.partitionByHash("host", 
>>>>>>>> "timestamp").mapPartition(...).groupBy(0,
>>>>>>>> 1).aggregate(SUM, 2)
>>>>>>>>
>>>>>>>> The query given to JDBCInputFormat provides results ordered by host
>>>>>>>> and timestamp, and I was wondering if performance can be improved by
>>>>>>>> specifying this in the code. I've looked at
>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>>>>>> n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
>>>>>>>> and http://apache-flink-user-mailing-list-archive.2336050.
>>>>>>>> n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
>>>>>>>> but I still have some questions:
>>>>>>>>
>>>>>>>> - If a split is a subset of a partition, what is the meaning of
>>>>>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me
>>>>>>>> thing that a split is divided into partitions, meaning that a partition
>>>>>>>> would be a subset of a split.
>>>>>>>> - At which point can I retrieve and adjust a SplitDataProperties
>>>>>>>> instance, if possible at all?
>>>>>>>> - If I wanted a coarser parallelization where each slot gets all
>>>>>>>> the data for the same host, would I have to manually create the 
>>>>>>>> sub-groups
>>>>>>>> based on timestamp?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Alexis.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>

Reply via email to