Re: SQL Engine Type inference when extending AsyncTableFunction class twice.

2022-09-16 Thread Jonathan Weaver
I think I've narrowed it down to this function in ExtractionUtils

public static Optional> extractSimpleGeneric(
Class baseClass, Class clazz, int pos) {
try {
if (clazz.getSuperclass() != baseClass) {
return Optional.empty();
}
final Type t =
((ParameterizedType) clazz.getGenericSuperclass())
.getActualTypeArguments()[pos];
return Optional.ofNullable(toClass(t));
} catch (Exception unused) {
return Optional.empty();
}
}

clazz.superClasss() == "BaseClass" in my example and baseClass in the
function is expecting AsyncTableFunction .. because that doesn't
compare it returns an empty result, even though it's correctly getting the
type inference elsewise.

Is there a way we could allow multiple extends in the future, instead of
just allowing a direct single subclass?



On Thu, Sep 15, 2022 at 4:42 PM Jonathan Weaver 
wrote:

> I am having an issue with the automatic type inference with SQL engine in
> an AsyncTableFunction class.
>
> I am extending AsyncTableFunction in a BaseClass (common code).
>
> Then extending again for some specific implementations.
>
> FinalClass extends BaseClass
>
> If I use BaseClass it correctly infers the output of the RowData from the
> catalog.
> If I use FinalClass it errors with
>
> Cannot extract a data type from an internal
> 'org.apache.flink.table.data.RowData' class without further information.
> Please use annotations to define the full logical type.
>
> So something with the typeInference is not looking at the right class in
> the hierarchy.
>
> I have tried overriding typeInformation at various points but it doesn't
> seem to help.
>
> Does anyone have an idea of how to have a common base class that gets
> extended with correct automatic typeinference?
>
> I can provide more details if needed.
>


SQL Engine Type inference when extending AsyncTableFunction class twice.

2022-09-15 Thread Jonathan Weaver
I am having an issue with the automatic type inference with SQL engine in
an AsyncTableFunction class.

I am extending AsyncTableFunction in a BaseClass (common code).

Then extending again for some specific implementations.

FinalClass extends BaseClass

If I use BaseClass it correctly infers the output of the RowData from the
catalog.
If I use FinalClass it errors with

Cannot extract a data type from an internal
'org.apache.flink.table.data.RowData' class without further information.
Please use annotations to define the full logical type.

So something with the typeInference is not looking at the right class in
the hierarchy.

I have tried overriding typeInformation at various points but it doesn't
seem to help.

Does anyone have an idea of how to have a common base class that gets
extended with correct automatic typeinference?

I can provide more details if needed.


Re: Flink task lifecycle listener/hook/SPI

2022-08-04 Thread Jonathan Weaver
I think the piece you are missing is you cannot guarantee where the
function will run in general. It may get sent to several different task
executors, and each executor may not be on the same machine or JVM so the
code has to init once distributed at least once.

You have to think that every function/task you write may be instantiated
many times in many different places and each of them has to manage its own
resources separately. That is especially relevant when you are doing things
like database connections or HTTP calls and so on.

On Thu, Aug 4, 2022 at 10:09 AM Allen Zoo  wrote:

> Thanks a lot!
>
> In our scenario, doing init in open function or  at static block is not
> good as excepted.
> 1. it is too late, we expect the init will happen in a task init stage,
> means init it even before the open was called method.
> 2. it is not reusable or not convenient for end user, we have manny
> functions(eg. RichFunctions/SourceFunctions and so on), and we need add
> init code in every function.
>
> We want the init code should at level of Task, in this way there is no
> need to repeat the initialization of whatever functions is running in task.
> I browsed the Flink's source code of version 1.15.1, and I didn't find a
> relevant interface that meet our needs at startup
> process(TaskExecutor#submitTask,Task#doRun)。
> I think Flink has no similar interface available so far , can anyone help
> to confirm?
>
> Best,
> Allen
>
> On Thu, Aug 4, 2022, 12:15 PM Lijie Wang  wrote:
>
>> Hi Allen,
>> From my experience, you can do your init setup by the following 2 ways:
>>
>> 1. Do your init setup in RichFunction#open method, see [1] for details.
>> 2. Do your init setup in static block, it will be executed when the class
>> is loaded.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/user_defined_functions/#rich-functions
>>
>> Best,
>> Lijie
>>
>> Allen Zoo  于2022年8月2日周二 16:11写道:
>>
>>> Hi all,
>>> We went to do some init env setup before the flink task run, And we have
>>> noticed the Task Lifecycle | Apache Flink
>>> 
>>>  doc
>>> described, but we can't find  listener/hook/SPI interface do some custom
>>> init jobs before task  running. Does flink now have relevant interfaces ?
>>>
>>


Source API question around idle (expensive) SplitFetchers being shutdown.

2022-03-04 Thread Jonathan Weaver
I am working on developing a custom source with the new Source api.

What I'm noticing is that during periods of low incoming data it repeatedly
will shutdown and restart the fetchers when the split assignments are empty
and periodically added.

I get log message such as

org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher -
Finished running task FetchTask
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher -
Cleaned wakeup flag.
org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished
reading split(s) [2022-03-04T17:09:29.000Z - 2022-03-04T17:09:34.000Z]
 org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager
- Closing splitFetcher 6412 because it is idle.
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher -
Shutting down split fetcher 6412

But then right at the 5 seconds mark when a new split is available to be
assigned it will go recreate a new fetcher to read it.

The fetchers in this instance are not terribly lightweight with some spinup
time and the constant churn of creation/dropping/recreation is an
unnecessary waste of CPU time and latency.

Is there something I am missing in the new Source API to prevent this
fetcher churn?   It doesn't occur under periods of high activity (splits
constantly available to be assigned), but only when the incoming data is
just a single split here and there. (Think IOT sensor data that arrives in
chunks of records when connectivity is available.)

Any ideas?

Thanks!
Jonathan


Looking for advice on moving Datastream job to Table SQL

2022-03-01 Thread Jonathan Weaver
I'm doing a POC on moving an existing Datastream API job to use Table SQL
to make it more accessible for some of my teammates.

However I'm at a loss on how to handle watermarking in a similar way to how
it was handled in the Datastream API.

In the existing job a CDC stream is read, and 3 SQL tables are written out.

CDC > table1 \
 |---> table2   pk/watermark stream -> recovery table
 |---> table3 /


The 3 tables are all written in an AsyncFunction which writes out the
table primary keys and the CDC watermark which then gets written to a
fourth table for recovery and tracking purposes. (If the job is
stop/started/crashed we are not relying on Flink state currently but on the
recovery table to restart where processing left off).

Is there a way to do something similar in the SQL API where I can store the
LEAST watermark of all 3 table writes in a 4th table?

I'm drawing at a loss on how to do it short of writing a custom sink.
(Currently using the JDBC connector sink).

var statements = tEnv.createStatementSet();
... insert table1 select ... from cdc;
... insert table2 select ... from cdc;
... insert table3 select ... from cdc;
statements.attachAsDataStream();

Or is there a way to do something similar within the Table API?  Use a
completely different approach?

The CDC watermark after inserting into the 3 tables is what I'm after. (CDC
source is custom table source).

Any ideas?

Thanks!


Possible BUG in 1.15 SQL JSON_OBJECT()

2022-02-24 Thread Jonathan Weaver
Using the latest SNAPSHOT BUILD.

If I have a column definition as

 .column(
"events",
DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD("status",
DataTypes.STRING().notNull()),
DataTypes.FIELD("timestamp",
DataTypes.STRING().notNull()),
DataTypes.FIELD("increment_identifier",
DataTypes.STRING().nullable()

And a query as

JSON_OBJECT('events' VALUE events) event_json

Will generate JSON correctly ONLY if increment_identifier is NOT NULL but
will throw a NullPointerException on the first record that has that column
as null.

Exception is not helpful.

Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:259)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: 

Re: Exception Help

2022-02-16 Thread Jonathan Weaver
No, I'm creating a custom SQL lookup table (which uses
AsyncTableFunction) which requires the internal types.

I implement
the LookupTableSource, AsyncTableFunction, DynamicTableSourceFactory
trio as per the examples in the docs.

My construction is the equivalent of this, and it still errors with that
exception when using exactly this.

  Map foo = new HashMap();
  foo.put(
  StringData.fromString("foo"),
  new GenericArrayData(new Object[]
{StringData.fromString("bar")}));
  MapData mapColumn = new GenericMapData(foo);

  return (RowData)GenericRowData(new Object[] { mapColumn }
);




On Wed, Feb 16, 2022 at 8:02 AM Francesco Guardiani 
wrote:

> Hi,
>
> From what I understand, you're creating a scalar function taking a string
> with json and then converting it to a map using a custom function.
>
> Assuming I understood correctly, I think the problem here is that you're
> using internal data types for UDFs, which is discouraged in most of the use
> cases. Rather than using StringData, MapData, ArrayData etc you should just
> use Java's String, Map and arrays. Check out this particular paragraph of
> our docs that shows using complex types for scalar functions:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference.
> Please try to convert
> Looking only at the exception you provide here, it definitely seems like a
> wrong usage of the internal data types, like that Tuple2 inserted into a
> GenericMapData. There are no checks in GenericMapData to check that you're
> constructing it with the correct types, and since Tuple2 is not a correct
> type, the serializer just fails hard.
>
> Please correct me if I misunderstood what you're doing, and in case
> provide more info about what your goal and how you've implemented the job.
>
> FG
>
> On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver 
> wrote:
>
>> I've narrowed it down to a TableSource that is returning a MAP type as a
>> column. Only errors when the column is referenced, and not on the first
>> row, but somewhere in the stream of rows.
>>
>> On 1.15 master branch (I need the new JSON features in 1.15 for this
>> project so riding the daily snapshot during development)
>>
>> In catalog column is defined as
>> .column("vc", DataTypes.MAP(DataTypes.STRING(),
>> DataTypes.ARRAY(DataTypes.STRING(
>>
>> My TableFunction is returning the following for the column
>>
>>   return new GenericMapData(
>>   fields.toJavaMap(
>>   v ->
>>   new Tuple2(
>>   StringData.fromString(v.getKey()),
>>   new GenericArrayData(
>>   v.getValue().isArray()
>>   ? List.ofAll(() -> ((ArrayNode)
>> v.getValue()).elements())
>>   .map(vv ->
>> StringData.fromString(vv.asText()))
>>
>> .toJavaArray(StringData[]::new)
>>   :
>> List.of(StringData.fromString(v.getValue().asText()))
>>
>> .toJavaArray(StringData[]::new);
>> });
>>
>> Where it's basically looping over a jackson JsonNode parsed from a DB
>> table and returning as a MAP (the keys and values are sparse amongst
>> hundreds of possibilities). The values in the Json are either a single text
>> value, or an array of text values so I'm just turning all values into an
>> array.
>>
>> There are around ~190 key-values in the map on average.
>>
>> The SQL that references the column is just
>>
>> COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type,
>>
>> So looks up a specific key and uses it if it exists, otherwise coalesces
>> to a generic string.
>>
>> And I keep getting this exception during the processing on a random row.
>>
>> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
>> targetOffset=24, numBytes=8, address=16, targetAddress=16
>> at
>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93)
>> at
>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30)
>> at
>>

Re: Exception Help

2022-02-15 Thread Jonathan Weaver
I've narrowed it down to a TableSource that is returning a MAP type as a
column. Only errors when the column is referenced, and not on the first
row, but somewhere in the stream of rows.

On 1.15 master branch (I need the new JSON features in 1.15 for this
project so riding the daily snapshot during development)

In catalog column is defined as
.column("vc", DataTypes.MAP(DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.STRING(

My TableFunction is returning the following for the column

  return new GenericMapData(
  fields.toJavaMap(
  v ->
  new Tuple2(
  StringData.fromString(v.getKey()),
  new GenericArrayData(
  v.getValue().isArray()
  ? List.ofAll(() -> ((ArrayNode)
v.getValue()).elements())
  .map(vv ->
StringData.fromString(vv.asText()))

.toJavaArray(StringData[]::new)
  :
List.of(StringData.fromString(v.getValue().asText()))

.toJavaArray(StringData[]::new);
});

Where it's basically looping over a jackson JsonNode parsed from a DB table
and returning as a MAP (the keys and values are sparse amongst hundreds of
possibilities). The values in the Json are either a single text value, or
an array of text values so I'm just turning all values into an array.

There are around ~190 key-values in the map on average.

The SQL that references the column is just

COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type,

So looks up a specific key and uses it if it exists, otherwise coalesces to
a generic string.

And I keep getting this exception during the processing on a random row.

Caused by: java.lang.IndexOutOfBoundsException: offset=0, targetOffset=24,
numBytes=8, address=16, targetAddress=16
at
org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93)
at
org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30)
at
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
at
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(ArrayDataSerializer.java:210)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:109)
at
org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
at
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
at
org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
at
org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
at TableCalcMapFunction$130.flatMap_split26(Unknown Source)
at TableCalcMapFunction$130.flatMap(Unknown Source)
at
org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)

Is that enough context or is there something else I can give you all?

Thanks!




On Tue, Feb 15, 2022 at 1:24 PM Sid Kal  wrote:

> Hi Jonathan,
>
> It would be better if you describe your scenario along with the code. It
> would be easier for the community to help.
>
> On Tue, 15 Feb 2022, 23:33 Jonathan Weaver, 
> wrote:
>
>> I'm getting the following exception running locally from my IDE
>> (IntelliJ) but seems to not occur
>> when running on a cluster. I'm assuming it may be related to memory
>> settings on the runtime (machine has 64GB of ram avail) but not sure what
>> setting to try and change.
>>
>> Caused by: java.lang.IndexOutOfBoundsException: offset=0,
>> targetOffset=3568, numBytes=40, address=16, targetAddress=16
>> at
>> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
>> at
>> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:110)
>> at
>> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
>> at
>> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
>> at
>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
>> at
>> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
>> at TableCalcMapFunction$148.flatMap_split18(Unknown Source)
>

Exception Help

2022-02-15 Thread Jonathan Weaver
I'm getting the following exception running locally from my IDE (IntelliJ)
but seems to not occur
when running on a cluster. I'm assuming it may be related to memory
settings on the runtime (machine has 64GB of ram avail) but not sure what
setting to try and change.

Caused by: java.lang.IndexOutOfBoundsException: offset=0,
targetOffset=3568, numBytes=40, address=16, targetAddress=16
at
org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
at
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:110)
at
org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
at
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
at
org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
at
org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
at TableCalcMapFunction$148.flatMap_split18(Unknown Source)
at TableCalcMapFunction$148.flatMap(Unknown Source)
at
org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)

Was wondering if anyone had any insights or pointers on what could be
causing that?

Thanks!
Jonathan


Parallelism of Flink SQL LookupTableSource in 1.14 ..

2022-01-10 Thread Jonathan Weaver
I'm attempting to do a proof of concept conversion of a DataStream based
Flink program over to using almost entirely Table SQL.

I have a primary CDC stream (an unbounded scan table source) that does two
joins to LookupTableSource tables and then on to a sink.

In the datastream program the only way to maintain throughput and not get
backpressured on the CDC stream was to set a carefully tuned parallelism on
the lookup functions to maximize the lookup capacity in the source systems.

However in the SQL programs it appears there is no setting I can find to
set a parallelism on the LookupTableSource tables, and the planner is
setting the parallelism to 1 which is only allowing roughly 1/10 the
capacity the source system can handle and backpressuring the CDC stream.

So my question is, is there a way to have the benefits of Table SQL
interface but also allow performance tuning on LookupTableSource tables? A
max parallelism of 1 will kill the attempted conversion immediately.

I love the Catalog interface and am attempting to turn all the custom
functions and lookups into tables that other developers can just write SQL
on.. But the performance tuning is critical.

All the tables are being registered in a catalog using the
DynamicTableSource factories.

My SQL is basically of the form of
INSERT INTO sink
SELECT
   ...
FROM cdc_table cdc
JOIN lookup1 FOR SYSTEM_TIME AS OF cdc.proc_time look1 ON cdc.identifier =
look1.identifier
LEFT OUTER JOIN lookup2 FOR SYSTEM_TIME AS OF cdc.proc_time look2 ON
cdc.identifier = look2.identifier
WHERE conditions;

Any ways to force the planner to a specific parallelism?

Thanks for your time,
Jonathan