Hi Jie,

I am curious what library do you use to get the ClickHouseTableBuilder

On Wed, Mar 24, 2021 at 8:41 PM jie mei <meijie.w...@gmail.com> wrote:

> Hi, Community
>
> I run a jmh benchmark task get blew error, which use flink sql consuming
> data from data-gen connector(10_000_000) and write data to clickhouse. blew
> is partly log and you can see completable log by attached file
>
> *My jmh benchmark code as blew:*
>
> @Benchmark
> @Threads(1)
> @Fork(1)
> public void sinkBenchmark() throws IOException {
>
>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>       .getExecutionEnvironment();
>   streamEnv.enableCheckpointing(60000);
>
>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>       .useBlinkPlanner()
>       .inStreamingMode().build();
>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
> settings);
>
>   // create clickhouse table
>   new ClickHouseTableBuilder(tableEnv,
>       parseSchema("clickhouse_sink_table.sql"))
>       .database("benchmark")
>       .table("bilophus_sink_benchmark")
>       .address("jdbc:clickhouse://localhost:8123")
>       .build();
>
>   // create mock data table
>   tableEnv.executeSql(
>       parseSchema("clickhouse_source_table.sql") +
>           "WITH (" +
>           "'connector' = 'datagen'," +
>           "'number-of-rows' = '10000000')");
>
>   tableEnv.executeSql(
>       "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM 
> CLICKHOUSE_SOURCE_BENCHMARK");
>
> }
>
> *running command:*
>
> mvn clean package -DskipTests
>
> <plugin>
>   <groupId>org.codehaus.mojo</groupId>
>   <artifactId>exec-maven-plugin</artifactId>
>   <version>1.6.0</version>
>   <executions>
>     <execution>
>       <id>test-benchmarks</id>
>       <phase>test</phase>
>       <goals>
>         <goal>exec</goal>
>       </goals>
>     </execution>
>   </executions>
>   <configuration>
>     <skip>false</skip>
>     <classpathScope>test</classpathScope>
>     <executable>java</executable>
>     <arguments>
>       <argument>-Xmx6g</argument>
>       <argument>-classpath</argument>
>       <classpath/>
>       <argument>org.openjdk.jmh.Main</argument>
>       <!--shouldFailOnError-->
>       <argument>-foe</argument>
>       <argument>true</argument>
>       <!--speed up tests-->
>       <argument>-f</argument>
>       <argument>1</argument>
>       <argument>-i</argument>
>       <argument>1</argument>
>       <argument>-wi</argument>
>       <argument>0</argument>
>       <argument>-rf</argument>
>       <argument>csv</argument>
>       <argument>.*</argument>
>     </arguments>
>   </configuration>
> </plugin>
>
>
> Non-finished threads:
>
> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, s
> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
> first_bigint, second_bigint, first_int, second_int, first_float,
> second_float, first_double, second_double, first_string, second_string]) ->
> Sink: Sink(table=[default_catal
> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
> second_bigint, first_int, second_int, first_float, second_float,
> first_double, second_double, first_string, second_string]) (1/6),5,Flink
> Task Threads]
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>  at java.lang.Thread.run(Thread.java:748)
>
> Thread[flink-akka.actor.default-dispatcher-8,5,main]
>  at sun.misc.Unsafe.park(Native Method)
>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Thread[flink-akka.actor.default-dispatcher-2,5,main]
>  at sun.misc.Unsafe.park(Native Method)
>  at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Thread[Source: TableSourceScan(table=[[default_catalog, default_database,
> CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
> first_int, second_int, first_float, second_float, first_double,
> second_double, first_string, s
> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
> first_bigint, second_bigint, first_int, second_int, first_float,
> second_float, first_double, second_double, first_string, second_string]) ->
> Sink: Sink(table=[default_catal
> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint,
> second_bigint, first_int, second_int, first_float, second_float,
> first_double, second_double, first_string, second_string]) (4/6),5,Flink
> Task Threads]
>  at sun.misc.Unsafe.park(Native Method)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
>
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>  at java.lang.Thread.run(Thread.java:748)
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>

Reply via email to