Yes, you can transform the broadcast set when it is accessed with RuntimeContext.getBroadcastVariableWithInitializer() and a BroadcastVariableInitializer.
2016-05-06 14:07 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > That was more or less what I was thinking. The only thing I'm not sure is > the usage of the broadcasted dataset, since I'd need to access tot the > MetaData dataset by sourceId (so I'd need an Map<String, Metadata>. > Probably I'd do: > > Map<String, Metadata> meta = ...;//preparing metadata lookUp table > ... > ds.map(MetaMapFunctionWrapper(new MetaMapFunction(meta))) > > What do you think? Is there the possibility to open a broadcasted Dataset > as a Map instead of a List? > > Best, > Flavio > > > On Fri, May 6, 2016 at 12:06 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Flavio, >> >> I'll open a JIRA for de/serializing TableSource to textual JSON. >> >> Would something like this work for you? >> >> main() { >> ExecutionEnvironment env = ... >> TableEnvironment tEnv = ... >> >> // accessing an external catalog >> YourTableSource ts = Catalog.getTableSource("someIdentifier"); >> tEnv.registerTableSource("someId", ts); >> >> // preparing meta data >> MetaData meta = ts.getMetaData() >> DataSet<MetaData> metaDS = env.fromElements(meta); >> >> // read data, table transformations + conversion to DataSet >> Table t = tEnv.scan("someId"); // apply some Table transformations if >> necessary >> DataSet<TupleX<...>> ds = tEnv.toDataSet(t, TupleX); >> >> // apply custom functions on data set >> ds.map(MetaMapFunctionWrapper(new >> MetaMapFunction())).withBroadcastSet(metaDS, "meta"); >> >> // continue program >> >> } >> >> The YourMapFunctionWrapper could be a RichMapFunction that accesses the >> meta data from the broadcasted set and provides it to a wrapped >> MetaMapFunction (an extended MapFunction with custom interface for meta >> data). >> >> Depending on what kind of interface you plan to offer, you can hide most >> of the complexity, e.g, users would only have to implement a >> MetaMapFunction not have to deal with the broadcasting and accessing of >> meta data (this would be done by your wrapper). >> >> Fabian >> >> >> >> 2016-05-05 10:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> HI Fabian, >>> thanks for your detailed answer, as usual ;) >>> >>> I think that an external service it's ok,actually I wasn't aware of the >>> TableSource interface. >>> As you said, an utility to serialize and deserialize them would be very >>> helpful and will ease this thing. >>> However, registering metadata for a table is a very common task to do. >>> Wouldn't be of useful for other Flink-related projects (I was thinking to >>> Nifi for example) to define a common minimal set of (optional) metadata to >>> display in a UI for a TableSource (like name, description, creationDate, >>> creator, field aliases)? >>> >>> About point 2, I think that dataset broadcasting or closure variables >>> are useful when you write a program, not if you try to "compose" it using >>> reusable UDFs (using a script like in Pig). >>> Of course, the worst case scenario for us (e.g. right now) is to connect >>> to our repository within rich operators but I thought that it could be easy >>> to define a link from operators to TableEnvironment and then to TableSource >>> (using the lineage tag/source-id you said) and, finally to its metadata. I >>> don't know whether this is specific only to us, I just wanted to share our >>> needs and see if the table API development could benefit from them. >>> >>> Best, >>> Flavio >>> >>> On Wed, May 4, 2016 at 10:35 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Flavio, >>>> >>>> I thought a bit about your proposal. I am not sure if it is actually >>>> necessary to integrate a central source repository into Flink. It should be >>>> possible to offer this as an external service which is based on the >>>> recently added TableSource interface. TableSources could be extended to be >>>> able to serialize and descerialize their configuration to/from JSON. When >>>> the external repository service starts, it can read the JSON fields and >>>> instantiate and register TableSource objectes. The repository could also >>>> hold metadata about the sources and serve a (web) UI to list available >>>> source. When a Flink program wants to access a data source which is >>>> registered in the repository, it could lookup the respective TableSouce >>>> object from the repository. >>>> >>>> Given that an integration of metadata with Flink user functions (point >>>> 2. in your proposal) is a very special requirement, I am not sure how much >>>> "native" support should be added to Flink. Would it be possible to add a >>>> lineage tag to each record and ship the metadata of all sources as >>>> broadcast set to each operator? Then user functions could lookup the >>>> metadata from the broadcast set. >>>> >>>> Best, Fabian >>>> >>>> 2016-04-29 12:49 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>> >>>>> Hi to all, >>>>> >>>>> as discussed briefly with Fabian, for our products in Okkam we need a >>>>> central repository of DataSources processed by Flink. >>>>> With respect to existing external catalogs, such as Hive or Confluent's >>>>> SchemaRegistry, whose objective is to provide necessary metadata to >>>>> read/write the registered tables, we would also need a way to acess to >>>>> other general metadata (e.g. name, description, creator, creation date, >>>>> lastUpdate date, processedRecords, certificationLevel of provided data, >>>>> provenance, language, etc). >>>>> >>>>> This integration has 2 main goals: >>>>> >>>>> 1. In a UI: to enable the user to choose (or even create) a >>>>> datasource to process with some task (e.g. quality assessment) and >>>>> then see >>>>> its metadata (name, description, creator user, etc) >>>>> 2. During a Flink job: when 2 datasource gets joined and we have >>>>> multiple values for an attribute (e.g. name or lastname) we can access >>>>> the >>>>> datasource metadata to decide which value to retain (e.g. the one >>>>> coming >>>>> from the most authoritative/certified source for that attribute) >>>>> >>>>> We also think that this could be of interest for projects like Apache >>>>> Zeppelin or Nifi enabling them to suggest to the user the sources to start >>>>> from. >>>>> >>>>> Do you think it makes sense to think about designing such a module for >>>>> Flink? >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>> >>>> >>> >> >