This is a follow-on from a Stack Overflow question a couple of days ago, but some progress has been made.
I'm now using the approach in the EndToEndEnumerableExample but still getting issues. I'm able to execute a query against a streaming data source (Apache Pulsar, based my adapter on the Kafka adapter). When I use a query with no where criteria then the returned iterator gets the results, but as soon as I use a where criteria it fails to work. The call to iterator() never returns and the debugging shows results getting placed in sent to the Interpreter's DuplicatingSink. Isn't this the behaviour of non-streaming queries? Is there something I'm missing that will give me same behaviour as when there is no where criteria? https://github.com/zabetak/calcite/blob/demo-january-2021/core/src/test/java /org/apache/calcite/examples/foodmart/java/EndToEndExampleEnumerable.java My code: RelOptPlanner planner = cluster.getPlanner(); RelOptUtil.registerDefaultRules(planner, true, true); logPlan = planner.changeTraits(logPlan, cluster.traitSet().replace(EnumerableConvention.INSTANCE)); planner.setRoot(logPlan); // Start the optimization process to obtain the most efficient physical plan based on the // provided rule set. RelNode phyPlan = planner.findBestExp(); // Display the physical plan System.out.println( RelOptUtil.dumpPlan("[Physical plan]", phyPlan, SqlExplainFormat.TEXT, SqlExplainLevel.NON_COST_ATTRIBUTES)); final AtomicBoolean cancelFlag = new AtomicBoolean(false); // Run the executable plan using a context simply providing access to the schema Map<String, Object> params = new HashMap<>(); params.put("v1stashed", phyPlan); Iterator<Object[]> resultsIterator = EnumerableInterpretable.toBindable(params, null, (EnumerableRel) phyPlan, EnumerableRel.Prefer.ARRAY) .bind(new CancelableSchemaOnlyDataContext(schema, cancelFlag, params)).iterator(); while (resultsIterator.hasNext()) { Object[] row = resultsIterator.next(); logger.info("{} {} {}", row[0], row[1], row[2]); } Logical and physical plan [Parsed query] SELECT STREAM * FROM `PULSARTRADES` WHERE `ENTITYTYPE` = 'Trade' [Logical plan] LogicalDelta, id = 6 LogicalProject(ROWTIME=[$0], MESSAGEID=[$1], ENTITYTYPE=[$2], JSONPAYLOAD=[$3]), id = 5 LogicalFilter(condition=[=($2, 'Trade')]), id = 2 LogicalTableScan(table=[[PULSARTRADES]]), id = 1 [Physical plan] EnumerableInterpreter, id = 70 BindableFilter(condition=[=($2, 'Trade')]), id = 69 BindableTableScan(table=[[PULSARTRADES, (STREAM)]]), id = 58