Can you share the plan for the program?

Are you sure that more than 1 split is generated by the JdbcInputFormat?

2018-08-10 12:04 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:

> It seems I may have spoken too soon. After executing the job with more
> data, I can see the following things in the Flink dashboard:
>
> - The first subtask is a chained DataSource -> GroupCombine. Even with
> parallelism set to 24 and a ParameterValuesProvider returning
> Array(Array("first"), Array("second")), only 1 thread processed all records.
> - The second subtask is a Sorted Group Reduce, and I see two weird things:
>   + The first subtask sent 5,923,802 records, yet the second subtask only
> received 5,575,154 records?
>   + Again, everything was done in a single thread, even though a groupBy
> was used.
> - The third and final subtask is a sink that saves back to the database.
>
> Does anyone know why parallelism is not being used?
>
> Regards,
> Alexis.
>
>
> On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <alexis.sa...@gmail.com>
> wrote:
>
>> Hi Fabian,
>>
>> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
>> declares javaSet as private[flink], so I cannot access it directly.
>> Nevertheless, I managed to get around it by using the java environment:
>>
>> val env = org.apache.flink.api.java.ExecutionEnvironment.getExecut
>> ionEnvironment
>>
>> val inputFormat = getInputFormat(query, dbUrl, properties)
>> val outputFormat = getOutputFormat(dbUrl, properties)
>>
>> val source = env.createInput(inputFormat)
>> val sdp = source.getSplitDataProperties
>> sdp.splitsPartitionedBy(0)
>> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>>
>> // transform java DataSet to scala DataSet...
>> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>>   .groupBy(0, 1)
>>   .combineGroup(groupCombiner)
>>   .withForwardedFields("f0->_1")
>>   .groupBy(0, 1)
>>   .reduceGroup(groupReducer)
>>   .withForwardedFields("_1")
>>   .output(outputFormat)
>>
>> It seems to work well, and the semantic annotation does remove a hash
>> partition from the execution plan.
>>
>> Regards,
>> Alexis.
>>
>>
>> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Alexis,
>>>
>>> The Scala API does not expose a DataSource object but only a Scala
>>> DataSet which wraps the Java object.
>>> You can get the SplitDataProperties from the Scala DataSet as follows:
>>>
>>> val dbData: DataSet[...] = ???
>>> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>>>
>>> So you first have to get the wrapped Java DataSet, cast it to DataSource
>>> and then get the properties.
>>> It's not very nice, but should work.
>>>
>>> In order to use SDPs, you should be a bit familiar how physical data
>>> properties are propagated and discarded in the optimizer.
>>> For example, applying a simple MapFunction removes all properties
>>> because the function might have changed the fields on which a DataSet is
>>> partitioned or sorted.
>>> You can expose the behavior of a function to the optimizer by using
>>> Semantic Annotations [1]
>>>
>>> Some comments on the code and plan you shared:
>>> - You might want to add hostname to ORDER BY to have the output grouped
>>> by (ts, hostname).
>>> - Check the Global and Local data properties in the plan to validate
>>> that the SDP were correctly interpreted.
>>> - If the data is already correctly partitioned and sorted, you might not
>>> need the Combiners. In either case, you properly want to annotate them with
>>> Forward Field annoations.
>>>
>>> The number of source tasks is unrelated to the number of splits. If you
>>> have more tasks than splits, some tasks won't process any data.
>>>
>>> Best, Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.5/dev/batch/#semantic-annotations
>>>
>>>
>>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thanks for the clarification. I have a few remarks, but let me provide
>>>> more concrete information. You can find the query I'm using, the
>>>> JDBCInputFormat creation, and the execution plan in this github gist:
>>>>
>>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>>>
>>>> I cannot call getSplitDataProperties because
>>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the
>>>> code, I do this instead:
>>>>
>>>> val javaEnv = org.apache.flink.api.java.ExecutionEnvironment.
>>>> getExecutionEnvironment
>>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>>>> "example")
>>>>
>>>> which feels wrong (the constructor doesn't accept a Scala environment).
>>>> Is there a better alternative?
>>>>
>>>> I see absolutely no difference in the execution plan whether I use SDP
>>>> or not, so therefore the results are indeed the same. Is this expected?
>>>>
>>>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>>>> shows Parallelism=24. Even the source code is a bit ambiguous, considering
>>>> that the constructor for GenericInputSplit takes two parameters:
>>>> partitionNumber and totalNumberOfPartitions. Should I assume that there are
>>>> 2 splits divided into 24 partitions?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>
>>>>
>>>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Alexis,
>>>>>
>>>>> First of all, I think you leverage the partitioning and sorting
>>>>> properties of the data returned by the database using SplitDataProperties.
>>>>> However, please be aware that SplitDataProperties are a rather
>>>>> experimental feature.
>>>>>
>>>>> If used without query parameters, the JDBCInputFormat generates a
>>>>> single split and queries the database just once. If you want to leverage
>>>>> parallelism, you have to specify a query with parameters in the WHERE
>>>>> clause to read different parts of the table.
>>>>> Note, depending on the configuration of the database, multiple queries
>>>>> result in multiple full scans. Hence, it might make sense to have an index
>>>>> on the partitioning columns.
>>>>>
>>>>> If properly configured, the JDBCInputFormat generates multiple splits
>>>>> which are partitioned. Since the partitioning is encoded in the query, it
>>>>> is opaque to Flink and must be explicitly declared.
>>>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
>>>>> Flink that all records with the same value in the partitioning field are
>>>>> read from the same split, i.e, the full data is partitioned on the
>>>>> attribute across splits.
>>>>> The same can be done for ordering if the queries of the
>>>>> JDBCInputFormat is specified with an ORDER BY clause.
>>>>> Partitioning and grouping are two different things. You can define a
>>>>> query that partitions on hostname and orders by hostname and timestamp and
>>>>> declare these properties in the SDP.
>>>>>
>>>>> You can get a SDP object by calling DataSource.getSplitDataProperties().
>>>>> In your example this would be source.getSplitDataProperties().
>>>>>
>>>>> Whatever you do, you should carefully check the execution plan
>>>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer
>>>>> [1] and validate that the result are identical whether you use SDP or not.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1] https://flink.apache.org/visualizer/
>>>>>
>>>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> I have the following scenario: I have a database table with 3
>>>>>> columns: a host (string), a timestamp, and an integer ID. Conceptually,
>>>>>> what I'd like to do is:
>>>>>>
>>>>>> group by host and timestamp -> based on all the IDs in each group,
>>>>>> create a mapping to n new tuples -> for each unique tuple, count how many
>>>>>> times it appeared across the resulting data
>>>>>>
>>>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>>>>>
>>>>>> What I'm currently doing is roughly:
>>>>>>
>>>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>>>>>> val source = environment.createInput(inut)
>>>>>> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
>>>>>> 1).aggregate(SUM, 2)
>>>>>>
>>>>>> The query given to JDBCInputFormat provides results ordered by host
>>>>>> and timestamp, and I was wondering if performance can be improved by
>>>>>> specifying this in the code. I've looked at http://apache-flink-user-
>>>>>> mailing-list-archive.2336050.n4.nabble.com/Terminology-
>>>>>> Split-Group-and-Partition-td11030.html and http://apache-flink-user-
>>>>>> mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-
>>>>>> Sorted-Input-Datasets-td20038.html, but I still have some questions:
>>>>>>
>>>>>> - If a split is a subset of a partition, what is the meaning of
>>>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing
>>>>>> that a split is divided into partitions, meaning that a partition would 
>>>>>> be
>>>>>> a subset of a split.
>>>>>> - At which point can I retrieve and adjust a SplitDataProperties
>>>>>> instance, if possible at all?
>>>>>> - If I wanted a coarser parallelization where each slot gets all the
>>>>>> data for the same host, would I have to manually create the sub-groups
>>>>>> based on timestamp?
>>>>>>
>>>>>> Regards,
>>>>>> Alexis.
>>>>>>
>>>>>>
>>>>>
>>>

Reply via email to