答复: data loss using IgniteDataStreamer API
Thank you, i found that autoflus is what i need. 发件人: Ilya Kasnacheev 发送时间: 2018年8月16日 21:22:10 收件人: user@ignite.apache.org 主题: Re: data loss using IgniteDataStreamer API Hello! For starters, I don't see that you do stmr.close() anywhere. Data Streamer is only guaranteed to write all data to cache after it is closed properly. Regards, -- Ilya Kasnacheev 2018-08-16 16:19 GMT+03:00 Huang Meilong mailto:ims...@outlook.com>>: Hi all, I'm use IgniteDataStreamer API to ingest 1,000,000 record to a sql table cache, but only 996,626 records, 0: jdbc:ignite:thin://127.0.0.1/<http://127.0.0.1/>> select count(*) from APMMETRIC; ++ |COUNT(*)| ++ | 996626 | ++ 1 row selected (0.057 seconds) 0: jdbc:ignite:thin://127.0.0.1/<http://127.0.0.1/>> does ignite data streamer lose data? code snippet as below IgniteDataStreamer stmr = ignite.dataStreamer("APM_METRIC_CACHE"); long start = System.currentTimeMillis(); for (Long l = 0L; l < 1000L; l++) { long start1 = System.currentTimeMillis(); for (Integer j = 0; j < 1000; j++) { ApmMetric metric = new ApmMetric(l, "metric_" + j,"CLUSTER-XXX", "host-1", 80.0 + (j.doubleValue() / 100.0)); ApmMetricKey metricKey = new ApmMetricKey(l, "metric_" + j,"CLUSTER-XXX", "host-1"); stmr.addData(metricKey, metric); } long end1 = System.currentTimeMillis(); System.out.println("stream 1000 records cost " + (end1 - start1) + " ms."); } long end = System.currentTimeMillis(); System.out.println("stream 100 records cost " + (end - start) + " ms."); public class ApmMetricKey implements Serializable { private Long timeStamp; private String metricName; private String clusterId; private String hostName; public Long getTimeStamp() { return timeStamp; } public void setTimeStamp(Long timeStamp) { this.timeStamp = timeStamp; } public String getMetricName() { return metricName; } public void setMetricName(String metricName) { this.metricName = metricName; } public String getClusterId() { return clusterId; } public void setClusterId(String clusterId) { this.clusterId = clusterId; } public String getHostName() { return hostName; } public void setHostName(String hostName) { this.hostName = hostName; } public ApmMetricKey() {} public ApmMetricKey(Long timeStamp, String metricName, String clusterId, String hostName) { this.timeStamp = timeStamp; this.metricName = metricName; this.clusterId = clusterId; this.hostName = hostName; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ApmMetricKey that = (ApmMetricKey) o; return Objects.equals(timeStamp, that.timeStamp) && Objects.equals(metricName, that.metricName) && Objects.equals(clusterId, that.clusterId) && Objects.equals(hostName, that.hostName); } @Override public int hashCode() { return Objects.hash(timeStamp, metricName, clusterId, hostName); } @Override public String toString() { return "ApmMetricKey{" + "timeStamp=" + timeStamp + ", metricName='" + metricName + '\'' + ", clusterId='" + clusterId + '\'' + ", hostName='" + hostName + '\'' + '}'; } }
答复: data loss using IgniteDataStreamer API
Thank you, i found that autoflus is what i need. 发件人: Ilya Kasnacheev 发送时间: 2018年8月16日 21:22:10 收件人: user@ignite.apache.org 主题: Re: data loss using IgniteDataStreamer API Hello! For starters, I don't see that you do stmr.close() anywhere. Data Streamer is only guaranteed to write all data to cache after it is closed properly. Regards, -- Ilya Kasnacheev 2018-08-16 16:19 GMT+03:00 Huang Meilong mailto:ims...@outlook.com>>: Hi all, I'm use IgniteDataStreamer API to ingest 1,000,000 record to a sql table cache, but only 996,626 records, 0: jdbc:ignite:thin://127.0.0.1/<http://127.0.0.1/>> select count(*) from APMMETRIC; ++ |COUNT(*)| ++ | 996626 | ++ 1 row selected (0.057 seconds) 0: jdbc:ignite:thin://127.0.0.1/<http://127.0.0.1/>> does ignite data streamer lose data? code snippet as below IgniteDataStreamer stmr = ignite.dataStreamer("APM_METRIC_CACHE"); long start = System.currentTimeMillis(); for (Long l = 0L; l < 1000L; l++) { long start1 = System.currentTimeMillis(); for (Integer j = 0; j < 1000; j++) { ApmMetric metric = new ApmMetric(l, "metric_" + j,"CLUSTER-XXX", "host-1", 80.0 + (j.doubleValue() / 100.0)); ApmMetricKey metricKey = new ApmMetricKey(l, "metric_" + j,"CLUSTER-XXX", "host-1"); stmr.addData(metricKey, metric); } long end1 = System.currentTimeMillis(); System.out.println("stream 1000 records cost " + (end1 - start1) + " ms."); } long end = System.currentTimeMillis(); System.out.println("stream 100 records cost " + (end - start) + " ms."); public class ApmMetricKey implements Serializable { private Long timeStamp; private String metricName; private String clusterId; private String hostName; public Long getTimeStamp() { return timeStamp; } public void setTimeStamp(Long timeStamp) { this.timeStamp = timeStamp; } public String getMetricName() { return metricName; } public void setMetricName(String metricName) { this.metricName = metricName; } public String getClusterId() { return clusterId; } public void setClusterId(String clusterId) { this.clusterId = clusterId; } public String getHostName() { return hostName; } public void setHostName(String hostName) { this.hostName = hostName; } public ApmMetricKey() {} public ApmMetricKey(Long timeStamp, String metricName, String clusterId, String hostName) { this.timeStamp = timeStamp; this.metricName = metricName; this.clusterId = clusterId; this.hostName = hostName; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ApmMetricKey that = (ApmMetricKey) o; return Objects.equals(timeStamp, that.timeStamp) && Objects.equals(metricName, that.metricName) && Objects.equals(clusterId, that.clusterId) && Objects.equals(hostName, that.hostName); } @Override public int hashCode() { return Objects.hash(timeStamp, metricName, clusterId, hostName); } @Override public String toString() { return "ApmMetricKey{" + "timeStamp=" + timeStamp + ", metricName='" + metricName + '\'' + ", clusterId='" + clusterId + '\'' + ", hostName='" + hostName + '\'' + '}'; } }
Re: data loss using IgniteDataStreamer API
Hello! For starters, I don't see that you do stmr.close() anywhere. Data Streamer is only guaranteed to write all data to cache after it is closed properly. Regards, -- Ilya Kasnacheev 2018-08-16 16:19 GMT+03:00 Huang Meilong : > Hi all, > > > I'm use IgniteDataStreamer API to ingest 1,000,000 record to a sql table > cache, but only 996,626 records, > > > 0: jdbc:ignite:thin://127.0.0.1/> select count(*) from APMMETRIC; > ++ > |COUNT(*)| > ++ > | 996626 | > ++ > 1 row selected (0.057 seconds) > 0: jdbc:ignite:thin://127.0.0.1/> > > > does ignite data streamer lose data? code snippet as below > > > IgniteDataStreamer stmr = > ignite.dataStreamer("APM_METRIC_CACHE"); > > long start = System.currentTimeMillis(); > for (Long l = 0L; l < 1000L; l++) { > long start1 = System.currentTimeMillis(); > for (Integer j = 0; j < 1000; j++) { > ApmMetric metric = new ApmMetric(l, "metric_" + j,"CLUSTER-XXX", "host-1", > 80.0 + (j.doubleValue() / 100.0)); > ApmMetricKey metricKey = new ApmMetricKey(l, "metric_" + j,"CLUSTER-XXX", > "host-1"); > stmr.addData(metricKey, metric); > } > long end1 = System.currentTimeMillis(); > System.out.println("stream 1000 records cost " + (end1 - start1) + " ms.") > ; > } > long end = System.currentTimeMillis(); > System.out.println("stream 100 records cost " + (end - start) + " ms." > ); > > > public class ApmMetricKey implements Serializable { > private Long timeStamp; > private String metricName; > private String clusterId; > private String hostName; > > public Long getTimeStamp() { > return timeStamp; > } > > public void setTimeStamp(Long timeStamp) { > this.timeStamp = timeStamp; > } > > public String getMetricName() { > return metricName; > } > > public void setMetricName(String metricName) { > this.metricName = metricName; > } > > public String getClusterId() { > return clusterId; > } > > public void setClusterId(String clusterId) { > this.clusterId = clusterId; > } > > public String getHostName() { > return hostName; > } > > public void setHostName(String hostName) { > this.hostName = hostName; > } > > public ApmMetricKey() {} > > public ApmMetricKey(Long timeStamp, String metricName, String clusterId, > String hostName) { > this.timeStamp = timeStamp; > this.metricName = metricName; > this.clusterId = clusterId; > this.hostName = hostName; > } > > @Override > public boolean equals(Object o) { > if (this == o) return true; > if (o == null || getClass() != o.getClass()) return false; > ApmMetricKey that = (ApmMetricKey) o; > return Objects.equals(timeStamp, that.timeStamp) && > Objects.equals(metricName, that.metricName) && > Objects.equals(clusterId, that.clusterId) && > Objects.equals(hostName, that.hostName); > } > > @Override > public int hashCode() { > return Objects.hash(timeStamp, metricName, clusterId, hostName); > } > > @Override > public String toString() { > return "ApmMetricKey{" + > "timeStamp=" + timeStamp + > ", metricName='" + metricName + '\'' + > ", clusterId='" + clusterId + '\'' + > ", hostName='" + hostName + '\'' + > '}'; > } > } > > >
data loss using IgniteDataStreamer API
Hi all, I'm use IgniteDataStreamer API to ingest 1,000,000 record to a sql table cache, but only 996,626 records, 0: jdbc:ignite:thin://127.0.0.1/> select count(*) from APMMETRIC; ++ |COUNT(*)| ++ | 996626 | ++ 1 row selected (0.057 seconds) 0: jdbc:ignite:thin://127.0.0.1/> does ignite data streamer lose data? code snippet as below IgniteDataStreamer stmr = ignite.dataStreamer("APM_METRIC_CACHE"); long start = System.currentTimeMillis(); for (Long l = 0L; l < 1000L; l++) { long start1 = System.currentTimeMillis(); for (Integer j = 0; j < 1000; j++) { ApmMetric metric = new ApmMetric(l, "metric_" + j,"CLUSTER-XXX", "host-1", 80.0 + (j.doubleValue() / 100.0)); ApmMetricKey metricKey = new ApmMetricKey(l, "metric_" + j,"CLUSTER-XXX", "host-1"); stmr.addData(metricKey, metric); } long end1 = System.currentTimeMillis(); System.out.println("stream 1000 records cost " + (end1 - start1) + " ms."); } long end = System.currentTimeMillis(); System.out.println("stream 100 records cost " + (end - start) + " ms."); public class ApmMetricKey implements Serializable { private Long timeStamp; private String metricName; private String clusterId; private String hostName; public Long getTimeStamp() { return timeStamp; } public void setTimeStamp(Long timeStamp) { this.timeStamp = timeStamp; } public String getMetricName() { return metricName; } public void setMetricName(String metricName) { this.metricName = metricName; } public String getClusterId() { return clusterId; } public void setClusterId(String clusterId) { this.clusterId = clusterId; } public String getHostName() { return hostName; } public void setHostName(String hostName) { this.hostName = hostName; } public ApmMetricKey() {} public ApmMetricKey(Long timeStamp, String metricName, String clusterId, String hostName) { this.timeStamp = timeStamp; this.metricName = metricName; this.clusterId = clusterId; this.hostName = hostName; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ApmMetricKey that = (ApmMetricKey) o; return Objects.equals(timeStamp, that.timeStamp) && Objects.equals(metricName, that.metricName) && Objects.equals(clusterId, that.clusterId) && Objects.equals(hostName, that.hostName); } @Override public int hashCode() { return Objects.hash(timeStamp, metricName, clusterId, hostName); } @Override public String toString() { return "ApmMetricKey{" + "timeStamp=" + timeStamp + ", metricName='" + metricName + '\'' + ", clusterId='" + clusterId + '\'' + ", hostName='" + hostName + '\'' + '}'; } }