Hi All,
I wanted to give a quick update on Apex-Calcite integration work.
Currently I'm able to run SQL statement as a DAG against registered table
abstractions of data endpoint and message type.
Here is the SQL support that is currently implemented:
1. Data Endpoint (Source/Destination):
- File
- Kafka
2. Message Types from Data endpoint (source/destination):
- CSV
3. SQL Functionality Support:
- SELECT (Projection) - Select from Source
- INSERT - Insert into Destination
- WHERE (Filter)
- Scalar functions which are provided in Calcite core
- Custom sclar function can be defined as provided to SQL.
4. Table can be defined as abstraction of Data Endpoint (source/dest) and
message type
Currently Calcite integration with Apex is exposed as a small boiler plate
code in populateDAG as follows:
SQLExecEnvironment.getEnvironment(dag)
.registerTable("ORDERS", new KafkaEndpoint(broker, sourceTopic,
new CSVMessageFormat(schemaIn)))
.registerTable("SALES", new KafkaEndpoint(broker, destTopic, new
CSVMessageFormat(schemaOut)))
.registerFunction("APEXCONCAT", FileEndpointTest.class,
"apex_concat_str")
.executeSQL("INSERT INTO SALES " + "SELECT STREAM ROWTIME, " +
"FLOOR(ROWTIME TO DAY), " +
"APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM
ORDERS WHERE ID > 3 " + "AND " +
"PRODUCT LIKE 'paint%'");
Following is a video recording of the demo of apex-capcite integration:
https://drive.google.com/open?id=0B_Tb-ZDtsUHeUVM5NWRYSFg0Z3c
Currently I'm working on addition of inner join functionality.
Once the inner join functionality is implemented, I think the code is good
to create a Review Only PR for first cut of calcite integration.
Please share your opinion on above.
Thanks,
Chinmay.
On Fri, Aug 12, 2016 at 9:55 PM, Chinmay Kolhatkar <[email protected]>
wrote:
> Hi All,
>
> I wanted to give update on Apex-Calcite Integration work being done for
> visibility and feedback from the community.
>
> In the first phase, target is to use Calcite core library for SQL parsing
> and transformation of relation algebra to apex specific component
> (operators).
> Once this is achieved one would be able to define input and outputs using
> Calcite model file and define the processing from input to output using SQL
> statement.
>
> The status for above work as of now is as follows:
> 1. I'm able to traverse relational algebra for simple select statement.
> 2. DAG is getting generated for simple statement SELECT STREAM * FROM
> TABLE.
> 3. DAG is getting validated.
> 4. Operators are being set with properties, streams and schema is also
> being set using TUPLE_CLASS attr. For schema the class is generated on the
> fly and put in classpath using LIBRARY_JAR attr.
> 5. Able to run generated DAG in local mode.
> 6. The code is currently being developed at (WIP):
> Currently for each of development and code being farely large, I've added
> a new module malhar-sql in malhar in my fork. But I'm open to other
> suggestions here.
> https://github.com/chinmaykolhatkar/apex-malhar/tree/calcite/sql
>
> Next step:
> 1. Run the generate DAG in distributed mode.
> 2. Expand the source and destination definition (calcite model file) to
> include Kafka as source schema and destination.
> 3. Expand the scope to include filter operator (WHERE clause, HAVING too
> if possible) and inner join when it gets merged.
> 4. Write extensive unit tests for above.
>
> I'll send an update on this thread at every logical step of achieving
> something.
>
> I request the community to provide the feedback on above approach/targets
> and if possible take a look at the code in above link.
>
> Thanks,
> Chinmay.
>
>