This is a follow-on from a Stack Overflow question a couple of days ago, but
some progress has been made.

I'm now using the approach in the EndToEndEnumerableExample but still
getting issues. I'm able to execute a query against a streaming data source
(Apache Pulsar, based my adapter on the Kafka adapter). When I use a query
with no where criteria then the returned iterator gets the results, but as
soon as I use a where criteria it fails to work. The call to iterator()
never returns and the debugging shows results getting placed in sent to the
Interpreter's DuplicatingSink. Isn't this the behaviour of non-streaming
queries? Is there something I'm missing that will give me same behaviour as
when there is no where criteria?

https://github.com/zabetak/calcite/blob/demo-january-2021/core/src/test/java
/org/apache/calcite/examples/foodmart/java/EndToEndExampleEnumerable.java

My code:
        RelOptPlanner planner = cluster.getPlanner();
        RelOptUtil.registerDefaultRules(planner, true, true);
        logPlan = planner.changeTraits(logPlan,
cluster.traitSet().replace(EnumerableConvention.INSTANCE));
        planner.setRoot(logPlan);

        // Start the optimization process to obtain the most efficient
physical plan based on the
        // provided rule set.
        RelNode phyPlan = planner.findBestExp();

        // Display the physical plan
        System.out.println(
            RelOptUtil.dumpPlan("[Physical plan]", phyPlan,
SqlExplainFormat.TEXT,
                SqlExplainLevel.NON_COST_ATTRIBUTES));
        final AtomicBoolean cancelFlag = new AtomicBoolean(false);

        // Run the executable plan using a context simply providing access
to the schema
        Map<String, Object> params = new HashMap<>();
        params.put("v1stashed", phyPlan);
        Iterator<Object[]> resultsIterator =
            EnumerableInterpretable.toBindable(params, null, (EnumerableRel)
phyPlan, EnumerableRel.Prefer.ARRAY)
                .bind(new CancelableSchemaOnlyDataContext(schema,
cancelFlag, params)).iterator();

        while (resultsIterator.hasNext()) {
            Object[] row = resultsIterator.next();
            logger.info("{} {} {}", row[0], row[1], row[2]);
        }

Logical and physical plan

[Parsed query]
SELECT STREAM *
FROM `PULSARTRADES`
WHERE `ENTITYTYPE` = 'Trade'

[Logical plan]
LogicalDelta, id = 6
  LogicalProject(ROWTIME=[$0], MESSAGEID=[$1], ENTITYTYPE=[$2],
JSONPAYLOAD=[$3]), id = 5
    LogicalFilter(condition=[=($2, 'Trade')]), id = 2
      LogicalTableScan(table=[[PULSARTRADES]]), id = 1

[Physical plan]
EnumerableInterpreter, id = 70
  BindableFilter(condition=[=($2, 'Trade')]), id = 69
    BindableTableScan(table=[[PULSARTRADES, (STREAM)]]), id = 58



Reply via email to