Hi Fabian,

Thanks for the clarification. I have a few remarks, but let me provide more
concrete information. You can find the query I'm using, the JDBCInputFormat
creation, and the execution plan in this github gist:

https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d

I cannot call getSplitDataProperties because env.createInput(inputFormat)
returns a DataSet, not a DataSource. In the code, I do this instead:

val javaEnv =
org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
"example")

which feels wrong (the constructor doesn't accept a Scala environment). Is
there a better alternative?

I see absolutely no difference in the execution plan whether I use SDP or
not, so therefore the results are indeed the same. Is this expected?

My ParameterValuesProvider specifies 2 splits, yet the execution plan shows
Parallelism=24. Even the source code is a bit ambiguous, considering that
the constructor for GenericInputSplit takes two parameters: partitionNumber
and totalNumberOfPartitions. Should I assume that there are 2 splits
divided into 24 partitions?

Regards,
Alexis.



On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Alexis,
>
> First of all, I think you leverage the partitioning and sorting properties
> of the data returned by the database using SplitDataProperties.
> However, please be aware that SplitDataProperties are a rather
> experimental feature.
>
> If used without query parameters, the JDBCInputFormat generates a single
> split and queries the database just once. If you want to leverage
> parallelism, you have to specify a query with parameters in the WHERE
> clause to read different parts of the table.
> Note, depending on the configuration of the database, multiple queries
> result in multiple full scans. Hence, it might make sense to have an index
> on the partitioning columns.
>
> If properly configured, the JDBCInputFormat generates multiple splits
> which are partitioned. Since the partitioning is encoded in the query, it
> is opaque to Flink and must be explicitly declared.
> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
> Flink that all records with the same value in the partitioning field are
> read from the same split, i.e, the full data is partitioned on the
> attribute across splits.
> The same can be done for ordering if the queries of the JDBCInputFormat is
> specified with an ORDER BY clause.
> Partitioning and grouping are two different things. You can define a query
> that partitions on hostname and orders by hostname and timestamp and
> declare these properties in the SDP.
>
> You can get a SDP object by calling DataSource.getSplitDataProperties().
> In your example this would be source.getSplitDataProperties().
>
> Whatever you do, you should carefully check the execution plan
> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and
> validate that the result are identical whether you use SDP or not.
>
> Best, Fabian
>
> [1] https://flink.apache.org/visualizer/
>
> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:
>
>> Hi everyone,
>>
>> I have the following scenario: I have a database table with 3 columns: a
>> host (string), a timestamp, and an integer ID. Conceptually, what I'd like
>> to do is:
>>
>> group by host and timestamp -> based on all the IDs in each group, create
>> a mapping to n new tuples -> for each unique tuple, count how many times it
>> appeared across the resulting data
>>
>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>
>> What I'm currently doing is roughly:
>>
>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>> val source = environment.createInput(inut)
>> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
>> 1).aggregate(SUM, 2)
>>
>> The query given to JDBCInputFormat provides results ordered by host and
>> timestamp, and I was wondering if performance can be improved by specifying
>> this in the code. I've looked at
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
>> and
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
>> but I still have some questions:
>>
>> - If a split is a subset of a partition, what is the meaning of
>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a
>> split is divided into partitions, meaning that a partition would be a
>> subset of a split.
>> - At which point can I retrieve and adjust a SplitDataProperties
>> instance, if possible at all?
>> - If I wanted a coarser parallelization where each slot gets all the data
>> for the same host, would I have to manually create the sub-groups based on
>> timestamp?
>>
>> Regards,
>> Alexis.
>>
>>
>

Reply via email to