Hello!

For starters, I don't see that you do stmr.close() anywhere.

Data Streamer is only guaranteed to write all data to cache after it is
closed properly.

Regards,

-- 
Ilya Kasnacheev

2018-08-16 16:19 GMT+03:00 Huang Meilong <[email protected]>:

> Hi all,
>
>
> I'm use IgniteDataStreamer API to ingest 1,000,000 record to a sql table
> cache, but only 996,626 records,
>
>
> 0: jdbc:ignite:thin://127.0.0.1/> select count(*) from APMMETRIC;
> +--------------------------------+
> |            COUNT(*)            |
> +--------------------------------+
> | 996626                         |
> +--------------------------------+
> 1 row selected (0.057 seconds)
> 0: jdbc:ignite:thin://127.0.0.1/>
>
>
> does ignite data streamer lose data? code snippet as below
>
>
> IgniteDataStreamer<ApmMetricKey, ApmMetric> stmr = 
> ignite.dataStreamer("APM_METRIC_CACHE");
>
> long start = System.currentTimeMillis();
> for (Long l = 0L; l < 1000L; l++) {
> long start1 = System.currentTimeMillis();
> for (Integer j = 0; j < 1000; j++) {
> ApmMetric metric = new ApmMetric(l, "metric_" + j,"CLUSTER-XXX", "host-1",
> 80.0 + (j.doubleValue() / 100.0));
> ApmMetricKey metricKey = new ApmMetricKey(l, "metric_" + j,"CLUSTER-XXX",
> "host-1");
> stmr.addData(metricKey, metric);
> }
> long end1 = System.currentTimeMillis();
> System.out.println("stream 1000 records cost " + (end1 - start1) + " ms.")
> ;
> }
> long end = System.currentTimeMillis();
> System.out.println("stream 1000000 records cost " + (end - start) + " ms."
> );
>
>
> public class ApmMetricKey implements Serializable {
>     private Long timeStamp;
>     private String metricName;
>     private String clusterId;
>     private String hostName;
>
>     public Long getTimeStamp() {
>         return timeStamp;
>     }
>
>     public void setTimeStamp(Long timeStamp) {
>         this.timeStamp = timeStamp;
>     }
>
>     public String getMetricName() {
>         return metricName;
>     }
>
>     public void setMetricName(String metricName) {
>         this.metricName = metricName;
>     }
>
>     public String getClusterId() {
>         return clusterId;
>     }
>
>     public void setClusterId(String clusterId) {
>         this.clusterId = clusterId;
>     }
>
>     public String getHostName() {
>         return hostName;
>     }
>
>     public void setHostName(String hostName) {
>         this.hostName = hostName;
>     }
>
>     public ApmMetricKey() {}
>
>     public ApmMetricKey(Long timeStamp, String metricName, String clusterId, 
> String hostName) {
>         this.timeStamp = timeStamp;
>         this.metricName = metricName;
>         this.clusterId = clusterId;
>         this.hostName = hostName;
>     }
>
>     @Override
>     public boolean equals(Object o) {
>         if (this == o) return true;
>         if (o == null || getClass() != o.getClass()) return false;
>         ApmMetricKey that = (ApmMetricKey) o;
>         return Objects.equals(timeStamp, that.timeStamp) &&
>                 Objects.equals(metricName, that.metricName) &&
>                 Objects.equals(clusterId, that.clusterId) &&
>                 Objects.equals(hostName, that.hostName);
>     }
>
>     @Override
>     public int hashCode() {
>         return Objects.hash(timeStamp, metricName, clusterId, hostName);
>     }
>
>     @Override
>     public String toString() {
>         return "ApmMetricKey{" +
>                 "timeStamp=" + timeStamp +
>                 ", metricName='" + metricName + '\'' +
>                 ", clusterId='" + clusterId + '\'' +
>                 ", hostName='" + hostName + '\'' +
>                 '}';
>     }
> }
>
>
>

Reply via email to