Hi all,

I'm use IgniteDataStreamer API to ingest 1,000,000 record to a sql table cache, 
but only 996,626 records,


0: jdbc:ignite:thin://127.0.0.1/> select count(*) from APMMETRIC;
+--------------------------------+
|            COUNT(*)            |
+--------------------------------+
| 996626                         |
+--------------------------------+
1 row selected (0.057 seconds)
0: jdbc:ignite:thin://127.0.0.1/>



does ignite data streamer lose data? code snippet as below


IgniteDataStreamer<ApmMetricKey, ApmMetric> stmr = 
ignite.dataStreamer("APM_METRIC_CACHE");

long start = System.currentTimeMillis();
for (Long l = 0L; l < 1000L; l++) {
long start1 = System.currentTimeMillis();
for (Integer j = 0; j < 1000; j++) {
ApmMetric metric = new ApmMetric(l, "metric_" + j,"CLUSTER-XXX", "host-1", 80.0 
+ (j.doubleValue() / 100.0));
ApmMetricKey metricKey = new ApmMetricKey(l, "metric_" + j,"CLUSTER-XXX", 
"host-1");
stmr.addData(metricKey, metric);
}
long end1 = System.currentTimeMillis();
System.out.println("stream 1000 records cost " + (end1 - start1) + " ms.");
}
long end = System.currentTimeMillis();
System.out.println("stream 1000000 records cost " + (end - start) + " ms.");


public class ApmMetricKey implements Serializable {
    private Long timeStamp;
    private String metricName;
    private String clusterId;
    private String hostName;

    public Long getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(Long timeStamp) {
        this.timeStamp = timeStamp;
    }

    public String getMetricName() {
        return metricName;
    }

    public void setMetricName(String metricName) {
        this.metricName = metricName;
    }

    public String getClusterId() {
        return clusterId;
    }

    public void setClusterId(String clusterId) {
        this.clusterId = clusterId;
    }

    public String getHostName() {
        return hostName;
    }

    public void setHostName(String hostName) {
        this.hostName = hostName;
    }

    public ApmMetricKey() {}

    public ApmMetricKey(Long timeStamp, String metricName, String clusterId, 
String hostName) {
        this.timeStamp = timeStamp;
        this.metricName = metricName;
        this.clusterId = clusterId;
        this.hostName = hostName;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ApmMetricKey that = (ApmMetricKey) o;
        return Objects.equals(timeStamp, that.timeStamp) &&
                Objects.equals(metricName, that.metricName) &&
                Objects.equals(clusterId, that.clusterId) &&
                Objects.equals(hostName, that.hostName);
    }

    @Override
    public int hashCode() {
        return Objects.hash(timeStamp, metricName, clusterId, hostName);
    }

    @Override
    public String toString() {
        return "ApmMetricKey{" +
                "timeStamp=" + timeStamp +
                ", metricName='" + metricName + '\'' +
                ", clusterId='" + clusterId + '\'' +
                ", hostName='" + hostName + '\'' +
                '}';
    }
}

Reply via email to