答复: data loss using IgniteDataStreamer API

2018-08-16 Thread Huang Meilong
Thank you, i found that autoflus is what i need.


发件人: Ilya Kasnacheev 
发送时间: 2018年8月16日 21:22:10
收件人: user@ignite.apache.org
主题: Re: data loss using IgniteDataStreamer API

Hello!

For starters, I don't see that you do stmr.close() anywhere.

Data Streamer is only guaranteed to write all data to cache after it is closed 
properly.

Regards,

--
Ilya Kasnacheev

2018-08-16 16:19 GMT+03:00 Huang Meilong 
mailto:ims...@outlook.com>>:

Hi all,


I'm use IgniteDataStreamer API to ingest 1,000,000 record to a sql table cache, 
but only 996,626 records,


0: jdbc:ignite:thin://127.0.0.1/<http://127.0.0.1/>> select count(*) from 
APMMETRIC;
++
|COUNT(*)|
++
| 996626 |
++
1 row selected (0.057 seconds)
0: jdbc:ignite:thin://127.0.0.1/<http://127.0.0.1/>>



does ignite data streamer lose data? code snippet as below


IgniteDataStreamer stmr = 
ignite.dataStreamer("APM_METRIC_CACHE");

long start = System.currentTimeMillis();
for (Long l = 0L; l < 1000L; l++) {
long start1 = System.currentTimeMillis();
for (Integer j = 0; j < 1000; j++) {
ApmMetric metric = new ApmMetric(l, "metric_" + j,"CLUSTER-XXX", "host-1", 80.0 
+ (j.doubleValue() / 100.0));
ApmMetricKey metricKey = new ApmMetricKey(l, "metric_" + j,"CLUSTER-XXX", 
"host-1");
stmr.addData(metricKey, metric);
}
long end1 = System.currentTimeMillis();
System.out.println("stream 1000 records cost " + (end1 - start1) + " ms.");
}
long end = System.currentTimeMillis();
System.out.println("stream 100 records cost " + (end - start) + " ms.");


public class ApmMetricKey implements Serializable {
private Long timeStamp;
private String metricName;
private String clusterId;
private String hostName;

public Long getTimeStamp() {
return timeStamp;
}

public void setTimeStamp(Long timeStamp) {
this.timeStamp = timeStamp;
}

public String getMetricName() {
return metricName;
}

public void setMetricName(String metricName) {
this.metricName = metricName;
}

public String getClusterId() {
return clusterId;
}

public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}

public String getHostName() {
return hostName;
}

public void setHostName(String hostName) {
this.hostName = hostName;
}

public ApmMetricKey() {}

public ApmMetricKey(Long timeStamp, String metricName, String clusterId, 
String hostName) {
this.timeStamp = timeStamp;
this.metricName = metricName;
this.clusterId = clusterId;
this.hostName = hostName;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ApmMetricKey that = (ApmMetricKey) o;
return Objects.equals(timeStamp, that.timeStamp) &&
Objects.equals(metricName, that.metricName) &&
Objects.equals(clusterId, that.clusterId) &&
Objects.equals(hostName, that.hostName);
}

@Override
public int hashCode() {
return Objects.hash(timeStamp, metricName, clusterId, hostName);
}

@Override
public String toString() {
return "ApmMetricKey{" +
"timeStamp=" + timeStamp +
", metricName='" + metricName + '\'' +
", clusterId='" + clusterId + '\'' +
", hostName='" + hostName + '\'' +
'}';
}
}




答复: data loss using IgniteDataStreamer API

2018-08-16 Thread Huang Meilong
Thank you, i found that autoflus is what i need.


发件人: Ilya Kasnacheev 
发送时间: 2018年8月16日 21:22:10
收件人: user@ignite.apache.org
主题: Re: data loss using IgniteDataStreamer API

Hello!

For starters, I don't see that you do stmr.close() anywhere.

Data Streamer is only guaranteed to write all data to cache after it is closed 
properly.

Regards,

--
Ilya Kasnacheev

2018-08-16 16:19 GMT+03:00 Huang Meilong 
mailto:ims...@outlook.com>>:

Hi all,


I'm use IgniteDataStreamer API to ingest 1,000,000 record to a sql table cache, 
but only 996,626 records,


0: jdbc:ignite:thin://127.0.0.1/<http://127.0.0.1/>> select count(*) from 
APMMETRIC;
++
|COUNT(*)|
++
| 996626 |
++
1 row selected (0.057 seconds)
0: jdbc:ignite:thin://127.0.0.1/<http://127.0.0.1/>>



does ignite data streamer lose data? code snippet as below


IgniteDataStreamer stmr = 
ignite.dataStreamer("APM_METRIC_CACHE");

long start = System.currentTimeMillis();
for (Long l = 0L; l < 1000L; l++) {
long start1 = System.currentTimeMillis();
for (Integer j = 0; j < 1000; j++) {
ApmMetric metric = new ApmMetric(l, "metric_" + j,"CLUSTER-XXX", "host-1", 80.0 
+ (j.doubleValue() / 100.0));
ApmMetricKey metricKey = new ApmMetricKey(l, "metric_" + j,"CLUSTER-XXX", 
"host-1");
stmr.addData(metricKey, metric);
}
long end1 = System.currentTimeMillis();
System.out.println("stream 1000 records cost " + (end1 - start1) + " ms.");
}
long end = System.currentTimeMillis();
System.out.println("stream 100 records cost " + (end - start) + " ms.");


public class ApmMetricKey implements Serializable {
private Long timeStamp;
private String metricName;
private String clusterId;
private String hostName;

public Long getTimeStamp() {
return timeStamp;
}

public void setTimeStamp(Long timeStamp) {
this.timeStamp = timeStamp;
}

public String getMetricName() {
return metricName;
}

public void setMetricName(String metricName) {
this.metricName = metricName;
}

public String getClusterId() {
return clusterId;
}

public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}

public String getHostName() {
return hostName;
}

public void setHostName(String hostName) {
this.hostName = hostName;
}

public ApmMetricKey() {}

public ApmMetricKey(Long timeStamp, String metricName, String clusterId, 
String hostName) {
this.timeStamp = timeStamp;
this.metricName = metricName;
this.clusterId = clusterId;
this.hostName = hostName;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ApmMetricKey that = (ApmMetricKey) o;
return Objects.equals(timeStamp, that.timeStamp) &&
Objects.equals(metricName, that.metricName) &&
Objects.equals(clusterId, that.clusterId) &&
Objects.equals(hostName, that.hostName);
}

@Override
public int hashCode() {
return Objects.hash(timeStamp, metricName, clusterId, hostName);
}

@Override
public String toString() {
return "ApmMetricKey{" +
"timeStamp=" + timeStamp +
", metricName='" + metricName + '\'' +
", clusterId='" + clusterId + '\'' +
", hostName='" + hostName + '\'' +
'}';
}
}




Re: data loss using IgniteDataStreamer API

2018-08-16 Thread Ilya Kasnacheev
Hello!

For starters, I don't see that you do stmr.close() anywhere.

Data Streamer is only guaranteed to write all data to cache after it is
closed properly.

Regards,

-- 
Ilya Kasnacheev

2018-08-16 16:19 GMT+03:00 Huang Meilong :

> Hi all,
>
>
> I'm use IgniteDataStreamer API to ingest 1,000,000 record to a sql table
> cache, but only 996,626 records,
>
>
> 0: jdbc:ignite:thin://127.0.0.1/> select count(*) from APMMETRIC;
> ++
> |COUNT(*)|
> ++
> | 996626 |
> ++
> 1 row selected (0.057 seconds)
> 0: jdbc:ignite:thin://127.0.0.1/>
>
>
> does ignite data streamer lose data? code snippet as below
>
>
> IgniteDataStreamer stmr = 
> ignite.dataStreamer("APM_METRIC_CACHE");
>
> long start = System.currentTimeMillis();
> for (Long l = 0L; l < 1000L; l++) {
> long start1 = System.currentTimeMillis();
> for (Integer j = 0; j < 1000; j++) {
> ApmMetric metric = new ApmMetric(l, "metric_" + j,"CLUSTER-XXX", "host-1",
> 80.0 + (j.doubleValue() / 100.0));
> ApmMetricKey metricKey = new ApmMetricKey(l, "metric_" + j,"CLUSTER-XXX",
> "host-1");
> stmr.addData(metricKey, metric);
> }
> long end1 = System.currentTimeMillis();
> System.out.println("stream 1000 records cost " + (end1 - start1) + " ms.")
> ;
> }
> long end = System.currentTimeMillis();
> System.out.println("stream 100 records cost " + (end - start) + " ms."
> );
>
>
> public class ApmMetricKey implements Serializable {
> private Long timeStamp;
> private String metricName;
> private String clusterId;
> private String hostName;
>
> public Long getTimeStamp() {
> return timeStamp;
> }
>
> public void setTimeStamp(Long timeStamp) {
> this.timeStamp = timeStamp;
> }
>
> public String getMetricName() {
> return metricName;
> }
>
> public void setMetricName(String metricName) {
> this.metricName = metricName;
> }
>
> public String getClusterId() {
> return clusterId;
> }
>
> public void setClusterId(String clusterId) {
> this.clusterId = clusterId;
> }
>
> public String getHostName() {
> return hostName;
> }
>
> public void setHostName(String hostName) {
> this.hostName = hostName;
> }
>
> public ApmMetricKey() {}
>
> public ApmMetricKey(Long timeStamp, String metricName, String clusterId, 
> String hostName) {
> this.timeStamp = timeStamp;
> this.metricName = metricName;
> this.clusterId = clusterId;
> this.hostName = hostName;
> }
>
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> ApmMetricKey that = (ApmMetricKey) o;
> return Objects.equals(timeStamp, that.timeStamp) &&
> Objects.equals(metricName, that.metricName) &&
> Objects.equals(clusterId, that.clusterId) &&
> Objects.equals(hostName, that.hostName);
> }
>
> @Override
> public int hashCode() {
> return Objects.hash(timeStamp, metricName, clusterId, hostName);
> }
>
> @Override
> public String toString() {
> return "ApmMetricKey{" +
> "timeStamp=" + timeStamp +
> ", metricName='" + metricName + '\'' +
> ", clusterId='" + clusterId + '\'' +
> ", hostName='" + hostName + '\'' +
> '}';
> }
> }
>
>
>


data loss using IgniteDataStreamer API

2018-08-16 Thread Huang Meilong
Hi all,


I'm use IgniteDataStreamer API to ingest 1,000,000 record to a sql table cache, 
but only 996,626 records,


0: jdbc:ignite:thin://127.0.0.1/> select count(*) from APMMETRIC;
++
|COUNT(*)|
++
| 996626 |
++
1 row selected (0.057 seconds)
0: jdbc:ignite:thin://127.0.0.1/>



does ignite data streamer lose data? code snippet as below


IgniteDataStreamer stmr = 
ignite.dataStreamer("APM_METRIC_CACHE");

long start = System.currentTimeMillis();
for (Long l = 0L; l < 1000L; l++) {
long start1 = System.currentTimeMillis();
for (Integer j = 0; j < 1000; j++) {
ApmMetric metric = new ApmMetric(l, "metric_" + j,"CLUSTER-XXX", "host-1", 80.0 
+ (j.doubleValue() / 100.0));
ApmMetricKey metricKey = new ApmMetricKey(l, "metric_" + j,"CLUSTER-XXX", 
"host-1");
stmr.addData(metricKey, metric);
}
long end1 = System.currentTimeMillis();
System.out.println("stream 1000 records cost " + (end1 - start1) + " ms.");
}
long end = System.currentTimeMillis();
System.out.println("stream 100 records cost " + (end - start) + " ms.");


public class ApmMetricKey implements Serializable {
private Long timeStamp;
private String metricName;
private String clusterId;
private String hostName;

public Long getTimeStamp() {
return timeStamp;
}

public void setTimeStamp(Long timeStamp) {
this.timeStamp = timeStamp;
}

public String getMetricName() {
return metricName;
}

public void setMetricName(String metricName) {
this.metricName = metricName;
}

public String getClusterId() {
return clusterId;
}

public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}

public String getHostName() {
return hostName;
}

public void setHostName(String hostName) {
this.hostName = hostName;
}

public ApmMetricKey() {}

public ApmMetricKey(Long timeStamp, String metricName, String clusterId, 
String hostName) {
this.timeStamp = timeStamp;
this.metricName = metricName;
this.clusterId = clusterId;
this.hostName = hostName;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ApmMetricKey that = (ApmMetricKey) o;
return Objects.equals(timeStamp, that.timeStamp) &&
Objects.equals(metricName, that.metricName) &&
Objects.equals(clusterId, that.clusterId) &&
Objects.equals(hostName, that.hostName);
}

@Override
public int hashCode() {
return Objects.hash(timeStamp, metricName, clusterId, hostName);
}

@Override
public String toString() {
return "ApmMetricKey{" +
"timeStamp=" + timeStamp +
", metricName='" + metricName + '\'' +
", clusterId='" + clusterId + '\'' +
", hostName='" + hostName + '\'' +
'}';
}
}