Hi Jie, I am curious what library do you use to get the ClickHouseTableBuilder
On Wed, Mar 24, 2021 at 8:41 PM jie mei <meijie.w...@gmail.com> wrote: > Hi, Community > > I run a jmh benchmark task get blew error, which use flink sql consuming > data from data-gen connector(10_000_000) and write data to clickhouse. blew > is partly log and you can see completable log by attached file > > *My jmh benchmark code as blew:* > > @Benchmark > @Threads(1) > @Fork(1) > public void sinkBenchmark() throws IOException { > > StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment > .getExecutionEnvironment(); > streamEnv.enableCheckpointing(60000); > > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode().build(); > TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, > settings); > > // create clickhouse table > new ClickHouseTableBuilder(tableEnv, > parseSchema("clickhouse_sink_table.sql")) > .database("benchmark") > .table("bilophus_sink_benchmark") > .address("jdbc:clickhouse://localhost:8123") > .build(); > > // create mock data table > tableEnv.executeSql( > parseSchema("clickhouse_source_table.sql") + > "WITH (" + > "'connector' = 'datagen'," + > "'number-of-rows' = '10000000')"); > > tableEnv.executeSql( > "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM > CLICKHOUSE_SOURCE_BENCHMARK"); > > } > > *running command:* > > mvn clean package -DskipTests > > <plugin> > <groupId>org.codehaus.mojo</groupId> > <artifactId>exec-maven-plugin</artifactId> > <version>1.6.0</version> > <executions> > <execution> > <id>test-benchmarks</id> > <phase>test</phase> > <goals> > <goal>exec</goal> > </goals> > </execution> > </executions> > <configuration> > <skip>false</skip> > <classpathScope>test</classpathScope> > <executable>java</executable> > <arguments> > <argument>-Xmx6g</argument> > <argument>-classpath</argument> > <classpath/> > <argument>org.openjdk.jmh.Main</argument> > <!--shouldFailOnError--> > <argument>-foe</argument> > <argument>true</argument> > <!--speed up tests--> > <argument>-f</argument> > <argument>1</argument> > <argument>-i</argument> > <argument>1</argument> > <argument>-wi</argument> > <argument>0</argument> > <argument>-rf</argument> > <argument>csv</argument> > <argument>.*</argument> > </arguments> > </configuration> > </plugin> > > > Non-finished threads: > > Thread[Source: TableSourceScan(table=[[default_catalog, default_database, > CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, > first_int, second_int, first_float, second_float, first_double, > second_double, first_string, s > econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, > first_bigint, second_bigint, first_int, second_int, first_float, > second_float, first_double, second_double, first_string, second_string]) -> > Sink: Sink(table=[default_catal > og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, > second_bigint, first_int, second_int, first_float, second_float, > first_double, second_double, first_string, second_string]) (1/6),5,Flink > Task Threads] > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > > Thread[flink-akka.actor.default-dispatcher-8,5,main] > at sun.misc.Unsafe.park(Native Method) > at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > Thread[flink-akka.actor.default-dispatcher-2,5,main] > at sun.misc.Unsafe.park(Native Method) > at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > Thread[Source: TableSourceScan(table=[[default_catalog, default_database, > CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, > first_int, second_int, first_float, second_float, first_double, > second_double, first_string, s > econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, > first_bigint, second_bigint, first_int, second_int, first_float, > second_float, first_double, second_double, first_string, second_string]) -> > Sink: Sink(table=[default_catal > og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, > second_bigint, first_int, second_int, first_float, second_float, > first_double, second_double, first_string, second_string]) (4/6),5,Flink > Task Threads] > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > > > -- > > *Best Regards* > *Jeremy Mei* >