Yes, you can transform the broadcast set when it is accessed with
RuntimeContext.getBroadcastVariableWithInitializer() and a
BroadcastVariableInitializer.

2016-05-06 14:07 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> That was more or less what I was thinking. The only thing I'm not sure is
> the usage of the broadcasted dataset, since I'd need to access tot the
> MetaData dataset by sourceId (so I'd need an Map<String, Metadata>.
> Probably I'd do:
>
> Map<String, Metadata> meta = ...;//preparing metadata lookUp table
> ...
> ds.map(MetaMapFunctionWrapper(new MetaMapFunction(meta)))
>
> What do you think? Is there the possibility to open a broadcasted Dataset
> as a Map instead of a List?
>
> Best,
> Flavio
>
>
> On Fri, May 6, 2016 at 12:06 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> I'll open a JIRA for de/serializing TableSource to textual JSON.
>>
>> Would something like this work for you?
>>
>> main() {
>>   ExecutionEnvironment env = ...
>>   TableEnvironment tEnv = ...
>>
>>   // accessing an external catalog
>>   YourTableSource ts = Catalog.getTableSource("someIdentifier");
>>   tEnv.registerTableSource("someId", ts);
>>
>>   // preparing meta data
>>   MetaData meta = ts.getMetaData()
>>   DataSet<MetaData> metaDS = env.fromElements(meta);
>>
>>   // read data, table transformations + conversion to DataSet
>>   Table t = tEnv.scan("someId"); // apply some Table transformations if
>> necessary
>>   DataSet<TupleX<...>> ds = tEnv.toDataSet(t, TupleX);
>>
>>   // apply custom functions on data set
>>   ds.map(MetaMapFunctionWrapper(new
>> MetaMapFunction())).withBroadcastSet(metaDS, "meta");
>>
>>   // continue program
>>
>> }
>>
>> The YourMapFunctionWrapper could be a RichMapFunction that accesses the
>> meta data from the broadcasted set and provides it to a wrapped
>> MetaMapFunction (an extended MapFunction with custom interface for meta
>> data).
>>
>> Depending on what kind of interface you plan to offer, you can hide most
>> of the complexity, e.g, users would only have to implement a
>> MetaMapFunction not have to deal with the broadcasting and accessing of
>> meta data (this would be done by your wrapper).
>>
>> Fabian
>>
>>
>>
>> 2016-05-05 10:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> HI Fabian,
>>> thanks for your detailed answer, as usual ;)
>>>
>>> I think that an external service it's ok,actually I wasn't aware of the
>>> TableSource interface.
>>> As you said, an utility to serialize and deserialize them would be very
>>> helpful and will ease this thing.
>>> However, registering metadata for a table is a very common task to do.
>>> Wouldn't be of useful for other Flink-related projects (I was thinking to
>>> Nifi for example) to define a common minimal set of (optional) metadata to
>>> display in a UI for a TableSource (like name, description, creationDate,
>>> creator, field aliases)?
>>>
>>> About point 2, I think that dataset broadcasting or closure variables
>>> are useful when you write a program, not if you try to "compose" it using
>>> reusable UDFs (using a script like in Pig).
>>> Of course, the worst case scenario for us (e.g. right now) is to connect
>>> to our repository within rich operators but I thought that it could be easy
>>> to define a link from operators to TableEnvironment and then to TableSource
>>> (using the lineage tag/source-id you said) and, finally to its metadata. I
>>> don't know whether this is specific only to us, I just wanted to share our
>>> needs and see if the table API development could benefit from them.
>>>
>>> Best,
>>> Flavio
>>>
>>> On Wed, May 4, 2016 at 10:35 AM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> I thought a bit about your proposal. I am not sure if it is actually
>>>> necessary to integrate a central source repository into Flink. It should be
>>>> possible to offer this as an external service which is based on the
>>>> recently added TableSource interface. TableSources could be extended to be
>>>> able to serialize and descerialize their configuration to/from JSON. When
>>>> the external repository service starts, it can read the JSON fields and
>>>> instantiate and register TableSource objectes. The repository could also
>>>> hold metadata about the sources and serve a (web) UI to list available
>>>> source. When a Flink program wants to access a data source which is
>>>> registered in the repository, it could lookup the respective TableSouce
>>>> object from the repository.
>>>>
>>>> Given that an integration of metadata with Flink user functions (point
>>>> 2. in your proposal) is a very special requirement, I am not sure how much
>>>> "native" support should be added to Flink. Would it be possible to add a
>>>> lineage tag to each record and ship the metadata of all sources as
>>>> broadcast set to each operator? Then user functions could lookup the
>>>> metadata from the broadcast set.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-04-29 12:49 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>
>>>>> Hi to all,
>>>>>
>>>>> as discussed briefly with Fabian, for our products in Okkam we need a
>>>>> central repository of DataSources processed by Flink.
>>>>> With respect to existing external catalogs, such as Hive or Confluent's
>>>>> SchemaRegistry, whose objective is to provide necessary metadata to
>>>>> read/write the registered tables, we would also need a way to acess to
>>>>> other general metadata (e.g. name, description, creator, creation date,
>>>>> lastUpdate date, processedRecords, certificationLevel of provided data,
>>>>> provenance, language, etc).
>>>>>
>>>>> This integration has 2 main goals:
>>>>>
>>>>>    1. In a UI: to enable the user to choose (or even create) a
>>>>>    datasource to process with some task (e.g. quality assessment) and 
>>>>> then see
>>>>>    its metadata (name, description,  creator user, etc)
>>>>>    2. During a Flink job: when 2 datasource gets joined and we have
>>>>>    multiple values for an attribute (e.g. name or lastname) we can access 
>>>>> the
>>>>>    datasource metadata to decide which value to retain (e.g. the one 
>>>>> coming
>>>>>    from the most authoritative/certified source for that attribute)
>>>>>
>>>>> We also think that this could be of interest for projects like Apache
>>>>> Zeppelin or Nifi enabling them to suggest to the user the sources to start
>>>>> from.
>>>>>
>>>>> Do you think it makes sense to think about designing such a module for
>>>>> Flink?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to