Hi,

conf are as follows:

<beansxmlns="http://www.springframework.org/schema/beans";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd";>
<!--
Alter configuration below as needed.
-->
<beanclass="org.apache.ignite.configuration.IgniteConfiguration">
<propertyname="peerClassLoadingEnabled"value="true"/>
<!-- Enabling Apache Ignite Persistent Store. -->
<propertyname="dataStorageConfiguration">
<beanclass="org.apache.ignite.configuration.DataStorageConfiguration">
<propertyname="defaultDataRegionConfiguration">
<beanclass="org.apache.ignite.configuration.DataRegionConfiguration">
<propertyname="persistenceEnabled"value="true"/>
</bean>
</property>
</bean>
</property>
<propertyname="clientMode"value="true"/>
<propertyname="discoverySpi">
<beanclass="org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi">
<propertyname="zkConnectionString"value="172.20.20.149:2181,172.20.20.150:2181,172.20.20.151:2181"/>
<propertyname="sessionTimeout"value="30000"/>
<propertyname="zkRootPath"value="/apacheIgnite"/>
<propertyname="joinTimeout"value="10000"/>
</bean>
</property>
</bean>
</beans>

Kotlin code are follows:

fun main(args: Array<String>) {
    if (args.isNullOrEmpty()) {
        println("pls set accountSize orderSize threadSize")
        return
    }
    val executor = Executors.newFixedThreadPool(args[2].toInt())
    val ignite = Ignition.start("zk.xml")
    ignite.cluster().active(true)

    val accountCfg = CacheConfiguration<Long, Account>(CACHE_ACCOUNT)
    accountCfg.cacheMode = CacheMode.REPLICATED
    accountCfg.atomicityMode = CacheAtomicityMode.TRANSACTIONAL
    accountCfg.setIndexedTypes(Long::class.java, Account::class.java)
    val accounts = ignite.getOrCreateCache(accountCfg)

    val orderCfg = CacheConfiguration<Long, Order>(CACHE_ORDER)
    orderCfg.cacheMode = CacheMode.REPLICATED
    orderCfg.atomicityMode = CacheAtomicityMode.TRANSACTIONAL
    orderCfg.setIndexedTypes(Long::class.java, Order::class.java)
    val orders = ignite.getOrCreateCache(orderCfg)

    val positionCfg = CacheConfiguration<Long, Position>(CACHE_POSITION)
    positionCfg.cacheMode = CacheMode.REPLICATED
    positionCfg.atomicityMode = CacheAtomicityMode.TRANSACTIONAL
    positionCfg.setIndexedTypes(Long::class.java, Position::class.java)
    val positions = ignite.getOrCreateCache(positionCfg)

    val tradeResultCfg = CacheConfiguration<Long, MatchTradeResult>(CACHE_MATCH_TRADE_RESULT)
    tradeResultCfg.cacheMode = CacheMode.REPLICATED
    tradeResultCfg.atomicityMode = CacheAtomicityMode.TRANSACTIONAL
    tradeResultCfg.setIndexedTypes(Long::class.java, MatchTradeResult::class.java)
    val tradeResults = ignite.getOrCreateCache(tradeResultCfg)

    val cancelResultCfg = CacheConfiguration<Long, MatchCancelResult>(CACHE_MATCH_CANCEL_RESULT)
    cancelResultCfg.cacheMode = CacheMode.REPLICATED
    cancelResultCfg.atomicityMode = CacheAtomicityMode.TRANSACTIONAL
    cancelResultCfg.setIndexedTypes(Long::class.java, MatchCancelResult::class.java)
    val cancelResults = ignite.getOrCreateCache(cancelResultCfg)

    // clean
    orders.clear()
    tradeResults.clear()
    positions.clear()
    accounts.clear()
    cancelResults.clear()

    // init Account
    val accountSize = args[0].toInt()
    val orderSize = args[1].toInt()
    val accountDownLatch = CountDownLatch(accountSize)
    repeat(accountSize) {
        val account = AccountHelper.genAccount()
        accounts.put(account.id, account)
        accountDownLatch.countDown()
    }
    accountDownLatch.await()

    //init order
    val orderDownLatch = CountDownLatch(orderSize)
    repeat(orderSize) {
        executor.submit {
            val order = OrderHelper.genOrder(accountSize)
            val result = TradeResultHelper.genResult(order)
            orders.put(order.id, order)
            tradeResults.put(result.id, result)
            orderDownLatch.countDown()
        }
    }
    orderDownLatch.await()

    // clear
    val traded = tradeResults.query(
        SqlQuery<Long, MatchTradeResult>(MatchTradeResult::class.java, " status = ?").setArgs(CLEAR_STATUS_INIT)
    ).all

    for (a in TransactionConcurrency.values()) {
        for (b in TransactionIsolation.values()) {
            val countDownLatch = CountDownLatch(traded.size)
            val begin = System.currentTimeMillis()
            for (item in traded) {
                executor.submit {
                    val result = item.value
                    var done = false
                    while (!done) {
                        try {
                            ignite.transactions().txStart(a, b).use {
                                result.status = CLEAR_STATUS_INIT
                                val accountBuy = accounts.get(result.buyUserId)                                 val accountSell = accounts.get(result.sellUserId)                                 val positionBuy = PositionHelp.genPosition(result.buyUserId, result.contractId)                                 val positionSell = PositionHelp.genPosition(result.sellUserId, result.contractId)
                                accountBuy?.let {
                                    accountBuy.balance -= result.amount
                                    accounts.put(accountBuy.id, accountBuy)
                                }
                                accountSell?.let {
                                    accountSell.balance += result.amount
                                    accounts.put(accountSell.id, accountSell)
                                }
                                tradeResults.put(result.id, result)
                                positions.put(positionBuy.id, positionBuy)
                                positions.put(positionSell.id, positionSell)
                                it.commit()
                            }
                            done = true
                        } catch (e: Exception) {
//                            println(e.message)
                        }
                    }
                    countDownLatch.countDown()
                }
            }
            countDownLatch.await()
            val batch = traded.size
            val duration = System.currentTimeMillis() - begin
            println("concurrency=$a isolation=$b size=$batch used ${duration}ms  qps:${batch.toDouble() / duration.toDouble() * 1000}")
            TimeUnit.SECONDS.sleep(2)
        }
    }
}

在 2019/1/22 下午7:17, Ilya Kasnacheev 写道:
Hello!

Do you have a reproducer project?

Regards,
--
Ilya Kasnacheev


вт, 22 янв. 2019 г. в 13:31, 李玉珏@163 <[email protected] <mailto:[email protected]>>:

    Hi,

    If so, is this a bug?


Reply via email to