Thank you, i found that autoflus is what i need. ________________________________ 发件人: Ilya Kasnacheev <ilya.kasnach...@gmail.com> 发送时间: 2018年8月16日 21:22:10 收件人: user@ignite.apache.org 主题: Re: data loss using IgniteDataStreamer API
Hello! For starters, I don't see that you do stmr.close() anywhere. Data Streamer is only guaranteed to write all data to cache after it is closed properly. Regards, -- Ilya Kasnacheev 2018-08-16 16:19 GMT+03:00 Huang Meilong <ims...@outlook.com<mailto:ims...@outlook.com>>: Hi all, I'm use IgniteDataStreamer API to ingest 1,000,000 record to a sql table cache, but only 996,626 records, 0: jdbc:ignite:thin://127.0.0.1/<http://127.0.0.1/>> select count(*) from APMMETRIC; +--------------------------------+ | COUNT(*) | +--------------------------------+ | 996626 | +--------------------------------+ 1 row selected (0.057 seconds) 0: jdbc:ignite:thin://127.0.0.1/<http://127.0.0.1/>> does ignite data streamer lose data? code snippet as below IgniteDataStreamer<ApmMetricKey, ApmMetric> stmr = ignite.dataStreamer("APM_METRIC_CACHE"); long start = System.currentTimeMillis(); for (Long l = 0L; l < 1000L; l++) { long start1 = System.currentTimeMillis(); for (Integer j = 0; j < 1000; j++) { ApmMetric metric = new ApmMetric(l, "metric_" + j,"CLUSTER-XXX", "host-1", 80.0 + (j.doubleValue() / 100.0)); ApmMetricKey metricKey = new ApmMetricKey(l, "metric_" + j,"CLUSTER-XXX", "host-1"); stmr.addData(metricKey, metric); } long end1 = System.currentTimeMillis(); System.out.println("stream 1000 records cost " + (end1 - start1) + " ms."); } long end = System.currentTimeMillis(); System.out.println("stream 1000000 records cost " + (end - start) + " ms."); public class ApmMetricKey implements Serializable { private Long timeStamp; private String metricName; private String clusterId; private String hostName; public Long getTimeStamp() { return timeStamp; } public void setTimeStamp(Long timeStamp) { this.timeStamp = timeStamp; } public String getMetricName() { return metricName; } public void setMetricName(String metricName) { this.metricName = metricName; } public String getClusterId() { return clusterId; } public void setClusterId(String clusterId) { this.clusterId = clusterId; } public String getHostName() { return hostName; } public void setHostName(String hostName) { this.hostName = hostName; } public ApmMetricKey() {} public ApmMetricKey(Long timeStamp, String metricName, String clusterId, String hostName) { this.timeStamp = timeStamp; this.metricName = metricName; this.clusterId = clusterId; this.hostName = hostName; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ApmMetricKey that = (ApmMetricKey) o; return Objects.equals(timeStamp, that.timeStamp) && Objects.equals(metricName, that.metricName) && Objects.equals(clusterId, that.clusterId) && Objects.equals(hostName, that.hostName); } @Override public int hashCode() { return Objects.hash(timeStamp, metricName, clusterId, hostName); } @Override public String toString() { return "ApmMetricKey{" + "timeStamp=" + timeStamp + ", metricName='" + metricName + '\'' + ", clusterId='" + clusterId + '\'' + ", hostName='" + hostName + '\'' + '}'; } }