Traceback (most recent call last):
  File
"/Users/amir/PycharmProjects/VariMat/org/varimat/model/test/sample2.py",
line 59, in <module>
    log_processing()
  File
"/Users/amir/PycharmProjects/VariMat/org/varimat/model/test/sample2.py",
line 34, in log_processing
    table_result = t_env.execute_sql("select raw_id, raw_data from
raw_table")
  File
"/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/pyflink/table/table_environment.py",
line 836, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File
"/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/py4j/java_gateway.py",
line 1321, in __call__
    return_value = get_return_value(
  File
"/Users/amir/.local/share/virtualenvs/VariMat-dmrWdVEG/lib/python3.9/site-packages/pyflink/util/exceptions.py",
line 158, in deco
    raise java_exception
pyflink.util.exceptions.TableException:
org.apache.flink.table.api.TableException: Failed to execute sql
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'collect'.
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
at
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
at
org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
... 13 more
Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobInitializationException: Could not start
the JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.client.JobInitializationException:
Could not start the JobMaster.
at
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException:
java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot
instantiate the coordinator for operator Source: raw_table[1]
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
... 3 more
Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.JobException: Cannot instantiate the coordinator
for operator Source: raw_table[1]
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 3 more
Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the
coordinator for operator Source: raw_table[1]
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
... 4 more
Caused by: java.lang.ClassCastException: cannot assign instance of
org.apache.kafka.clients.consumer.OffsetResetStrategy to field
org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy
of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance
of
org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2460)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:488)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
... 20 more


Process finished with exit code 1

On Mon, Feb 6, 2023 at 11:05 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:

> Hi, could you please share us the root cause?
> Seems the error message you posted hadn't contained the root cause. Maybe
> you can post the full error message .
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com>
> *收件人: *"yuxia" <luoyu...@alumni.sjtu.edu.cn>
> *抄送: *"dev" <dev@flink.apache.org>
> *发送时间: *星期二, 2023年 2 月 07日 上午 10:39:25
> *主题: *Re: Need help how to use Table API to join two Kafka streams
>
> Thank you for your reply. I tied it with a sample stream but it did not
> work. I am trying to get the results from my producer here with a very
> simple query. I want to see results in the console/output.
> This is my code:
>
> // Docker: docker-compose.yml
>
> version: '2'
> services:
>   zookeeper:
>     image: confluentinc/cp-zookeeper:6.1.1
>     hostname: zookeeper
>     container_name: zookeeper
>     ports:
>       - "2181:2181"
>     environment:
>       ZOOKEEPER_CLIENT_PORT: 2181
>       ZOOKEEPER_TICK_TIME: 2000
>
>   broker:
>     image: confluentinc/cp-kafka:6.1.1
>     hostname: broker
>     container_name: broker
>     depends_on:
>       - zookeeper
>     ports:
>       - "29092:29092"
>       - "9092:9092"
>       - "9101:9101"
>     environment:
>       KAFKA_BROKER_ID: 1
>       KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
>       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
> PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
>       KAFKA_ADVERTISED_LISTENERS: 
> PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
>       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
>       KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
>       KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
>       KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
>
>
>
> // Producer
>
> import json
> import sys
>
> from kafka import KafkaProducer
>
> KAFKA_SERVER = "127.0.0.1:9092"
>
> def serializer(dictionary):
> try:
> message = json.dumps(dictionary)
> except Exception as e:
> sys.stderr.write(str(e) + '\n')
> message = str(dictionary)
> return message.encode('utf8')
>
> def create_sample_empad_json(raw_id):
>     return {'raw_id':int(raw_id), 'raw_data': str(int(raw_id) + 7)}
>
> def do_produce():
> producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, value_serializer
> =serializer)
> for raw_id in range(1,10):
> empad_json = data_helper.create_sample_empad_json(raw_id)
> producer.send('EMPAD', empad_json)
>
>         producer.flush()
>
> if __name__ == '__main__':
>     do_produce(XRD_PATH)
>
>
> // Flink
>
> from pyflink.datastream.stream_execution_environment import 
> StreamExecutionEnvironment
> from pyflink.table import  EnvironmentSettings
> from pyflink.table.table_environment import StreamTableEnvironment
>
> def data_processing():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar")
>     
> env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar")
>     
> env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar")
>
>     settings = EnvironmentSettings.new_instance() \
>         .in_streaming_mode() \
>         .build()
>
>     t_env = StreamTableEnvironment.create(stream_execution_environment=env, 
> environment_settings=settings)
>
>     t1 = f"""
>             CREATE TEMPORARY TABLE raw_table(
>                 raw_id INT,
>                 raw_data STRING
>             ) WITH (
>               'connector' = 'kafka',
>               'topic' = 'EMPAD',
>               'properties.bootstrap.servers' = 'localhost:9092',
>               'properties.group.id' = 'MY_GRP',
>               'scan.startup.mode' = 'latest-offset',
>               'format' = 'json'
>             )
>             """
>
>     t_env.execute_sql(t1)
>
>     table_result = t_env.execute_sql("select raw_id, raw_data from raw_table")
>
>     with table_result.collect() as results:
>         for result in results:
>             print(result)
>
> if __name__ == '__main__':
>     data_processing()
>
>
>
> getting this error message:
>
> pyflink.util.exceptions.TableException: 
> org.apache.flink.table.api.TableException: Failed to execute sql
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>       at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>       at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>       at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>       at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
> 'collect'.
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
>       at 
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
>       at 
> org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
>       ... 13 more
>
>
> Best,
>
> Amir
>
>
> On Sun, Feb 5, 2023 at 8:20 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:
>
>> Hi, thanks for reaching me out.
>> For your question, you don't need to cosume data in my cosumer class
>> seperately and then insert them into those tables. The data will
>> be consumed from what we implemented here.
>>
>> Best regards,
>> Yuxia
>>
>> ------------------------------
>> *发件人: *"Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com>
>> *收件人: *luoyu...@alumni.sjtu.edu.cn
>> *发送时间: *星期日, 2023年 2 月 05日 上午 6:07:02
>> *主题: *Re: Need help how to use Table API to join two Kafka streams
>>
>> Dear Yuxia, dev@flink.apache.org
>> Thank you again for your help. I am implementing code in Python. But I am
>> still have some confusion about my application.
>> As I mentioned before, I am sending two simple messages (JSON) on two
>> different topics:
>> This is my Kafka producer class:
>>
>> import json
>> import sys
>>
>> from kafka import KafkaProducer
>>
>> def serializer(dictionary):
>>     try:
>>         message = json.dumps(dictionary)
>>     except Exception as e:
>>         sys.stderr.write(str(e) + '\n')
>>         message = str(dictionary)
>>     return message.encode('utf8')
>>
>> def create_sample_json(row_id):
>>     return {'row_id':int(row_id), 'my_data': str(int(row_id) + 7)}
>>
>> def do_produce(topic_name):
>>     producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER, 
>> value_serializer=serializer)
>>     for row_id in range(1,10):
>>         my_data = data_helper.create_sample_json(row_id)
>>         producer.send(topic_name, my_data)
>>         producer.flush()
>>
>> if __name__ == '__main__':
>>     do_produce('topic1')
>>     do_produce('topic2')
>>
>> ==================================================================================
>>
>> As you helped me, this is my Flink Consumer that I want to cosnume data from 
>> producer and run queries on them:
>>
>> from pyflink.datastream.stream_execution_environment import 
>> StreamExecutionEnvironment
>> from pyflink.table import  EnvironmentSettings
>> from pyflink.table.expressions import col
>> from pyflink.table.table_environment import StreamTableEnvironment
>>
>> from org.varimat.model.com.varimat_constants import EMPAD_TOPIC
>>
>> KAFKA_SERVERS = 'localhost:9092'
>>
>> def log_processing():
>>     env = StreamExecutionEnvironment.get_execution_environment()
>>     env.add_jars("file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar")
>>     
>> env.add_jars("file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar")
>>     
>> env.add_jars("file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar")
>>
>>     settings = EnvironmentSettings.new_instance() \
>>         .in_streaming_mode() \
>>         .build()
>>
>>     t_env = StreamTableEnvironment.create(stream_execution_environment=env, 
>> environment_settings=settings)
>>
>>     t1 = f"""
>>             CREATE TEMPORARY TABLE table1(
>>                 row_id INT,
>>                 row_data STRING
>>             ) WITH (
>>               'connector' = 'kafka',
>>               'topic' = 'topic1',
>>               'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
>>               'properties.group.id' = 'MY_GRP',
>>               'scan.startup.mode' = 'latest-offset',
>>               'format' = 'json'
>>             )
>>             """
>>
>>     t2 = f"""
>>             CREATE TEMPORARY TABLE table2(
>>                 row_id INT,
>>                 row_data STRING
>>             ) WITH (
>>               'connector' = 'kafka',
>>               'topic' = 'table2',
>>               'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
>>               'properties.group.id' = 'MY_GRP',
>>               'scan.startup.mode' = 'latest-offset',
>>               'format' = 'json'
>>             )
>>             """
>>
>>     t_env.execute_sql(t1)
>>     t_env.execute_sql(t2)
>>
>>     t3 = t_env.sql_query("SELECT row_id, row_data as my_raw_data FROM 
>> table2")
>>
>> // please tell me what should I do next:
>>
>> // Questions:
>>
>> // 1) Do I need to cosume data in my cosumer class seperately and then 
>> insert them into those tables or data will be consumed from
>>
>>       what we implemented here (as I passed the name of the connector, 
>> toipc, bootstartap.servers, etc...)?
>>
>> // 2) If so:
>>
>>        2.1) how can I make join from those streams in Python?
>>
>>        2.2) How can I prevant the previous data as my rocedure will send 
>> thousands messages in each topic. I want to make sure that
>>
>>             not to make duplicate queries.
>>
>> // 3) If not, what should I do?
>>
>>
>> Thank you very much.
>>
>> Amir
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Feb 3, 2023 at 5:45 AM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:
>>
>>> Hi, Amir.
>>> May look like using scala code:
>>>
>>> val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn
>>> string) WITH ('connector' = 'kafka', ...);
>>> val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn
>>> string) WITH ('connector' = 'kafka', ...);
>>>
>>> // you will need to rename the field to join, otherwise, it'll
>>> "org.apache.flink.table.api.ValidationException: Ambiguous column name:
>>> ssn".
>>> val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2")
>>> val result = t1.join(t3).where($"ssn" === $"ssn1");
>>>
>>> Also, you can refer here for more detail[1].
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins
>>>
>>> Best regards,
>>> Yuxia
>>>
>>> ----- 原始邮件 -----
>>> 发件人: "Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com>
>>> 收件人: "dev" <dev@flink.apache.org>
>>> 发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08
>>> 主题: Need help how to use Table API to join two Kafka streams
>>>
>>> Hello,
>>>
>>> I have a Kafka producer and a Kafka consumer that produces and consumes
>>> multiple data respectively. You can think of two data sets here. Both
>>> datasets have a similar structure but carry different data.
>>>
>>> I want to implement a Table API to join two Kafka streams while I
>>> consume them. For example, data1.ssn==data2.ssn
>>>
>>> Constraints:
>>> I don't want to change my producer or use FlinkKafkaProducer.
>>>
>>> Thank you very much.
>>>
>>> Best,
>>> Amir
>>>
>>
>

Reply via email to