[jira] [Created] (FLINK-20768) Support routing field for Elasticsearch connector

2020-12-25 Thread wangsan (Jira)
wangsan created FLINK-20768:
---

 Summary: Support routing field for Elasticsearch connector
 Key: FLINK-20768
 URL: https://issues.apache.org/jira/browse/FLINK-20768
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Reporter: wangsan


Routing in Elasticsearch can help with search efficency for large scale 
dataset, we should support this feature as an optional config in Elasticsearch 
connector.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20768) Support routing field for Elasticsearch connector

2020-12-25 Thread wangsan (Jira)
wangsan created FLINK-20768:
---

 Summary: Support routing field for Elasticsearch connector
 Key: FLINK-20768
 URL: https://issues.apache.org/jira/browse/FLINK-20768
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Reporter: wangsan


Routing in Elasticsearch can help with search efficency for large scale 
dataset, we should support this feature as an optional config in Elasticsearch 
connector.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20624) Refactor StreamExecJoinRule、StreamExecIntervalJoinRule and StreamExecTemporalJoinRule

2020-12-16 Thread wangsan (Jira)
wangsan created FLINK-20624:
---

 Summary: Refactor StreamExecJoinRule、StreamExecIntervalJoinRule 
and StreamExecTemporalJoinRule
 Key: FLINK-20624
 URL: https://issues.apache.org/jira/browse/FLINK-20624
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.13.0
Reporter: wangsan
 Fix For: 1.13.0


Currentlly, some code are duplicated in 
`StreamExecJoinRule`、`StreamExecIntervalJoinRule` and 
`StreamExecTemporalJoinRule`, this JIRA tries to eliminate the code 
duplication, so we can maintain code easier.

`StreamExecJoinRule`、`StreamExecIntervalJoinRule` and 
`StreamExecTemporalJoinRule` 
are rules for stream-stream join with different match condition, we can add an 
abstract class `StreamExecJoinRuleBase` as a base implementation for them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20624) Refactor StreamExecJoinRule、StreamExecIntervalJoinRule and StreamExecTemporalJoinRule

2020-12-16 Thread wangsan (Jira)
wangsan created FLINK-20624:
---

 Summary: Refactor StreamExecJoinRule、StreamExecIntervalJoinRule 
and StreamExecTemporalJoinRule
 Key: FLINK-20624
 URL: https://issues.apache.org/jira/browse/FLINK-20624
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.13.0
Reporter: wangsan
 Fix For: 1.13.0


Currentlly, some code are duplicated in 
`StreamExecJoinRule`、`StreamExecIntervalJoinRule` and 
`StreamExecTemporalJoinRule`, this JIRA tries to eliminate the code 
duplication, so we can maintain code easier.

`StreamExecJoinRule`、`StreamExecIntervalJoinRule` and 
`StreamExecTemporalJoinRule` 
are rules for stream-stream join with different match condition, we can add an 
abstract class `StreamExecJoinRuleBase` as a base implementation for them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18596) Derive format schema from table schema may get error result

2020-07-14 Thread wangsan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17157334#comment-17157334
 ] 

wangsan commented on FLINK-18596:
-

I would like to fix this issue

> Derive format schema from table schema may get error result
> ---
>
> Key: FLINK-18596
> URL: https://issues.apache.org/jira/browse/FLINK-18596
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: wangsan
>Priority: Major
>
> If rowtime attribute references a regular field, derive format schema from 
> table schema may get error result:
> {code}
> Schema schema = new Schema()
>   .field("f1", DataTypes.STRING())
>   .field("f2", DataTypes.BIGINT()).from("t")
>   .field("r", DataTypes.TIMESTAMP(3))
>   .rowtime(
>   new 
> Rowtime().timestampsFromField("t").watermarksPeriodicBounded(3));
> final Map properties = schema.toProperties();
> final TableSchema actualSchema = 
> TableFormatFactoryBase.deriveSchema(properties);
> {code}
> this code snippet will result in `ValidationException("Field names must be 
> unique. Duplicate field: '" + fullFieldName + "'")`, but the excepted result 
> should be:
> {code}
> final TableSchema expectedSchema = TableSchema.builder()
>   .field("f1", Types.STRING)
>   .field("t", Types.LONG)
>   .build();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18596) Derive format schema from table schema may get error result

2020-07-14 Thread wangsan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangsan updated FLINK-18596:

Description: 
If rowtime attribute references a regular field, derive format schema from 
table schema may get error result:

{code}
Schema schema = new Schema()
.field("f1", DataTypes.STRING())
.field("f2", DataTypes.BIGINT()).from("t")
.field("r", DataTypes.TIMESTAMP(3))
.rowtime(
new 
Rowtime().timestampsFromField("t").watermarksPeriodicBounded(3));

final Map properties = schema.toProperties();
final TableSchema actualSchema = 
TableFormatFactoryBase.deriveSchema(properties);
{code}

this code snippet will result in `ValidationException("Field names must be 
unique. Duplicate field: '" + fullFieldName + "'")`, but the excepted result 
should be:

{code}
final TableSchema expectedSchema = TableSchema.builder()
.field("f1", Types.STRING)
.field("t", Types.LONG)
.build();
{code}

  was:
If rowtime attribute references a regular field, derive format schema from 
table schema may get error result:

```java
Schema schema = new Schema()
.field("f1", DataTypes.STRING())
.field("f2", DataTypes.BIGINT()).from("t")
.field("r", DataTypes.TIMESTAMP(3))
.rowtime(
new 
Rowtime().timestampsFromField("t").watermarksPeriodicBounded(3));

final Map properties = schema.toProperties();
final TableSchema actualSchema = 
TableFormatFactoryBase.deriveSchema(properties);
``` 

this code snippet will result in `ValidationException("Field names must be 
unique. Duplicate field: '" + fullFieldName + "'")`, but the excepted result 
should be:

```java
TableSchema expectedSchema = TableSchema.builder()
.field("f1", Types.STRING)
.field("t", Types.LONG)
.build();
```


> Derive format schema from table schema may get error result
> ---
>
> Key: FLINK-18596
> URL: https://issues.apache.org/jira/browse/FLINK-18596
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: wangsan
>Priority: Major
>
> If rowtime attribute references a regular field, derive format schema from 
> table schema may get error result:
> {code}
> Schema schema = new Schema()
>   .field("f1", DataTypes.STRING())
>   .field("f2", DataTypes.BIGINT()).from("t")
>   .field("r", DataTypes.TIMESTAMP(3))
>   .rowtime(
>   new 
> Rowtime().timestampsFromField("t").watermarksPeriodicBounded(3));
> final Map properties = schema.toProperties();
> final TableSchema actualSchema = 
> TableFormatFactoryBase.deriveSchema(properties);
> {code}
> this code snippet will result in `ValidationException("Field names must be 
> unique. Duplicate field: '" + fullFieldName + "'")`, but the excepted result 
> should be:
> {code}
> final TableSchema expectedSchema = TableSchema.builder()
>   .field("f1", Types.STRING)
>   .field("t", Types.LONG)
>   .build();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18596) Derive format schema from table schema may get error result

2020-07-14 Thread wangsan (Jira)
wangsan created FLINK-18596:
---

 Summary: Derive format schema from table schema may get error 
result
 Key: FLINK-18596
 URL: https://issues.apache.org/jira/browse/FLINK-18596
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.11.0
Reporter: wangsan


If rowtime attribute references a regular field, derive format schema from 
table schema may get error result:

```java
Schema schema = new Schema()
.field("f1", DataTypes.STRING())
.field("f2", DataTypes.BIGINT()).from("t")
.field("r", DataTypes.TIMESTAMP(3))
.rowtime(
new 
Rowtime().timestampsFromField("t").watermarksPeriodicBounded(3));

final Map properties = schema.toProperties();
final TableSchema actualSchema = 
TableFormatFactoryBase.deriveSchema(properties);
``` 

this code snippet will result in `ValidationException("Field names must be 
unique. Duplicate field: '" + fullFieldName + "'")`, but the excepted result 
should be:

```java
TableSchema expectedSchema = TableSchema.builder()
.field("f1", Types.STRING)
.field("t", Types.LONG)
.build();
```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18596) Derive format schema from table schema may get error result

2020-07-14 Thread wangsan (Jira)
wangsan created FLINK-18596:
---

 Summary: Derive format schema from table schema may get error 
result
 Key: FLINK-18596
 URL: https://issues.apache.org/jira/browse/FLINK-18596
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.11.0
Reporter: wangsan


If rowtime attribute references a regular field, derive format schema from 
table schema may get error result:

```java
Schema schema = new Schema()
.field("f1", DataTypes.STRING())
.field("f2", DataTypes.BIGINT()).from("t")
.field("r", DataTypes.TIMESTAMP(3))
.rowtime(
new 
Rowtime().timestampsFromField("t").watermarksPeriodicBounded(3));

final Map properties = schema.toProperties();
final TableSchema actualSchema = 
TableFormatFactoryBase.deriveSchema(properties);
``` 

this code snippet will result in `ValidationException("Field names must be 
unique. Duplicate field: '" + fullFieldName + "'")`, but the excepted result 
should be:

```java
TableSchema expectedSchema = TableSchema.builder()
.field("f1", Types.STRING)
.field("t", Types.LONG)
.build();
```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16710) Log Upload blocks Main Thread in TaskExecutor

2020-03-29 Thread wangsan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17070661#comment-17070661
 ] 

wangsan commented on FLINK-16710:
-

Hi [~gjy], I can help with this issue if no one is working on this.

> Log Upload blocks Main Thread in TaskExecutor
> -
>
> Key: FLINK-16710
> URL: https://issues.apache.org/jira/browse/FLINK-16710
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Priority: Critical
> Fix For: 1.11.0
>
>
> Uploading logs to the BlobServer blocks the TaskExecutor's main thread. We 
> should introduce an IO thread pool that carries out file system accesses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15548) Make KeyedCoProcessOperatorWithWatermarkDelay extends KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator

2020-01-10 Thread wangsan (Jira)
wangsan created FLINK-15548:
---

 Summary: Make KeyedCoProcessOperatorWithWatermarkDelay extends 
KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator
 Key: FLINK-15548
 URL: https://issues.apache.org/jira/browse/FLINK-15548
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.10.0
Reporter: wangsan
 Fix For: 1.10.0


`LegacyKeyedCoProcessOperator` is marked as deprecated, we should make 
`KeyedCoProcessOperatorWithWatermarkDelay` extends `KeyedCoProcessOperator` 
instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15548) Make KeyedCoProcessOperatorWithWatermarkDelay extends KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator

2020-01-10 Thread wangsan (Jira)
wangsan created FLINK-15548:
---

 Summary: Make KeyedCoProcessOperatorWithWatermarkDelay extends 
KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator
 Key: FLINK-15548
 URL: https://issues.apache.org/jira/browse/FLINK-15548
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.10.0
Reporter: wangsan
 Fix For: 1.10.0


`LegacyKeyedCoProcessOperator` is marked as deprecated, we should make 
`KeyedCoProcessOperatorWithWatermarkDelay` extends `KeyedCoProcessOperator` 
instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-13823) Incorrect debug log in CompileUtils

2019-08-23 Thread wangsan (Jira)
wangsan created FLINK-13823:
---

 Summary: Incorrect debug log in CompileUtils
 Key: FLINK-13823
 URL: https://issues.apache.org/jira/browse/FLINK-13823
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.0
Reporter: wangsan
 Fix For: 1.9.1


There is a typo in `CompileUtils`:

```java
CODE_LOG.debug("Compiling: %s \n\n Code:\n%s", name, code);
```

The placeholder  should be `{}` instead of `%s`.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13823) Incorrect debug log in CompileUtils

2019-08-23 Thread wangsan (Jira)
wangsan created FLINK-13823:
---

 Summary: Incorrect debug log in CompileUtils
 Key: FLINK-13823
 URL: https://issues.apache.org/jira/browse/FLINK-13823
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.0
Reporter: wangsan
 Fix For: 1.9.1


There is a typo in `CompileUtils`:

```java
CODE_LOG.debug("Compiling: %s \n\n Code:\n%s", name, code);
```

The placeholder  should be `{}` instead of `%s`.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


zk connect loss

2019-05-23 Thread wangsan
13:10:06.119 [main] ERROR   - Failed to resolve default logging config file:
config/java.util.logging.properties
13:10:37.097 [main] ERROR o.a.i.s.d.z.internal.ZookeeperClient  - Operation
failed with unexpected error, connection lost:
org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss for /search
org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss for /search
at
org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
at
org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1102)
at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1130)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperClient.exists(ZookeeperClient.java:280)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.initZkNodes(ZookeeperDiscoveryImpl.java:792)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.startJoin(ZookeeperDiscoveryImpl.java:960)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.joinTopology(ZookeeperDiscoveryImpl.java:778)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.startJoinAndWait(ZookeeperDiscoveryImpl.java:696)
at
org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi.spiStart(ZookeeperDiscoverySpi.java:474)
at
org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:297)
at
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:915)
at
org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1720)
at
org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1033)
at
org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2014)
at
org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1723)
at
org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1151)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:649)
at org.apache.ignite.IgniteSpring.start(IgniteSpring.java:66)



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


[jira] [Closed] (FLINK-11687) Remove useless code in StreamingJobGraphGenerator

2019-02-20 Thread wangsan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangsan closed FLINK-11687.
---
Resolution: Fixed

> Remove useless code in StreamingJobGraphGenerator
> -
>
> Key: FLINK-11687
> URL: https://issues.apache.org/jira/browse/FLINK-11687
> Project: Flink
>  Issue Type: Task
>  Components: DataStream API
>Affects Versions: 1.6.3, 1.7.2
>Reporter: wangsan
>Assignee: wangsan
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> `allOutputs` in `StreamingJobGraphGenerator#setVertexConfig` is useless, we 
> should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11687) Remove useless code in StreamingJobGraphGenerator

2019-02-20 Thread wangsan (JIRA)
wangsan created FLINK-11687:
---

 Summary: Remove useless code in StreamingJobGraphGenerator
 Key: FLINK-11687
 URL: https://issues.apache.org/jira/browse/FLINK-11687
 Project: Flink
  Issue Type: Task
  Components: DataStream API
Affects Versions: 1.7.2, 1.6.3
Reporter: wangsan
Assignee: wangsan


`allOutputs` in `StreamingJobGraphGenerator#setVertexConfig` is useless, we 
should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11687) Remove useless code in StreamingJobGraphGenerator

2019-02-20 Thread wangsan (JIRA)
wangsan created FLINK-11687:
---

 Summary: Remove useless code in StreamingJobGraphGenerator
 Key: FLINK-11687
 URL: https://issues.apache.org/jira/browse/FLINK-11687
 Project: Flink
  Issue Type: Task
  Components: DataStream API
Affects Versions: 1.7.2, 1.6.3
Reporter: wangsan
Assignee: wangsan


`allOutputs` in `StreamingJobGraphGenerator#setVertexConfig` is useless, we 
should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: ZookeeperDiscovery block when communication error

2019-01-29 Thread wangsan
Thank you!
When I use zk discovery.I find many nodes in zookeeper path /jd/ .
In my opinion.When new node join,Then a new /jd/ child node will be
created,When the node join the cluster success,the /jd/ path will be
removed.But in my cluster,That will be many remnant /jd/ nodes.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Why GridDiscoveryManager onSegmentation use StopNodeFailureHandler?

2019-01-29 Thread wangsan
Thanks!
The  IgniteConfiguration.segmentationPolicy RESTART_JVM would be a little
misleading. Exit java with some exit code ,The java application will ignore
the exit code.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


RE: Failed to read data from remote connection

2019-01-29 Thread wangsan
When check connections,Many nio socker will be create(one socker per node)
,Then direct memory will grow   up with the node count?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: NPE when start

2019-01-22 Thread wangsan
I will try to reproduce the excepiton.

I am worried about it will happen as the cache remote listener rmtFilter 

@IgniteAsyncSupported
public  UUID remoteListen(@Nullable
IgniteBiPredicate locLsnr,
@Nullable IgnitePredicate rmtFilter,
@Nullable int... types)
throws IgniteException;

I have defined a class public class CacheNodeFilter implements
IgnitePredicate fro rmtFilter.
And all the nodes have the class in a sdk jar. And the peerclassloading is
false.
Only one node will use the remote listener. 
When I clean all the meta data in the ignite work home.And start all the
nodes(different roles,about 10 nodes,1 server 7 java client 2 cpp client).
Sometimes the exception will throw.




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: ignite zk: Possible starvation in striped pool

2019-01-22 Thread wangsan
Thank you!
I see that this is communication spi not discovery spi. But in other
nodes there are many zk session timeout message or  zk reconnect fail
message. And the starvation message only print in the node which have the zk
server(not cluster,just three zk node in one machine) in the same machine.

In the log:"Completed: 17156 and blockCnt=0, waitCnt=17103 ". Why there
are so many waitCnt? Is this a countdownlatch?





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


ignite zk: Possible starvation in striped pool

2019-01-22 Thread wangsan
10:38:31.577 [grid-timeout-worker-#55%DAEMON-NODE-10-153-106-16-8991%] WARN 
o.a.ignite.internal.util.typedef.G  - >>> Possible starvation in striped
pool.
Thread name: sys-stripe-9-#10%DAEMON-NODE-10-153-106-16-8991%
Queue: []
Deadlock: false
Completed: 17156
Thread [name="sys-stripe-9-#10%DAEMON-NODE-10-153-106-16-8991%", id=38,
state=RUNNABLE, blockCnt=0, waitCnt=17103]  

at sun.nio.ch.Net.poll(Native Method)   


at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)


at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:110) 


at
o.a.i.spi.communication.tcp.TcpCommunicationSpi.createTcpClient(TcpCommunicationSpi.java:3262)
at
o.a.i.spi.communication.tcp.TcpCommunicationSpi.createNioClient(TcpCommunicationSpi.java:2958)
   
at
o.a.i.spi.communication.tcp.TcpCommunicationSpi.reserveClient(TcpCommunicationSpi.java:2841)
 
at
o.a.i.spi.communication.tcp.TcpCommunicationSpi.sendMessage0(TcpCommunicationSpi.java:2692)
  
at
o.a.i.spi.communication.tcp.TcpCommunicationSpi.sendMessage(TcpCommunicationSpi.java:2651)
   
at
o.a.i.i.managers.communication.GridIoManager.send(GridIoManager.java:1643)  

 
at
o.a.i.i.managers.communication.GridIoManager.sendToGridTopic(GridIoManager.java:1715)

at
o.a.i.i.processors.cache.GridCacheIoManager.send(GridCacheIoManager.java:1160)  

 
at
o.a.i.i.processors.cache.GridCacheIoManager.send(GridCacheIoManager.java:1199)  

 
at
o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture.sendDhtRequests(GridDhtAtomicAbstractUpdateFuture.java:466)

at
o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture.map(GridDhtAtomicAbstractUpdateFuture.java:423)

at
o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0(GridDhtAtomicCache.java:1805)
 
at
o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal(GridDhtAtomicCache.java:1628)
  
at
o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.processNearAtomicUpdateRequest(GridDhtAtomicCache.java:3056)
at
o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.access$400(GridDhtAtomicCache.java:130)
   
at
o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$5.apply(GridDhtAtomicCache.java:266)
at
o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$5.apply(GridDhtAtomicCache.java:261)
  
at
o.a.i.i.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:1054)
 
at
o.a.i.i.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:579)

  
at
o.a.i.i.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:378)
   
at
o.a.i.i.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:304)
   
at
o.a.i.i.processors.cache.GridCacheIoManager.access$100(GridCacheIoManager.java:99)

   
at
o.a.i.i.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:293)

 
at

NPE when start

2019-01-11 Thread wangsan
When a client node start with zk discovery and persistence enabled,Some null
point exceptions will be throw (when the node start on a new machine )

The exception traces as follows:

12:26:03.288 [zk-172_22_29_108_SEARCH_NODE_8100-EventThread] ERROR
o.a.i.i.p.c.GridContinuousProcessor  - Failed to unmarshal continuous
routine handler, ignore routine
[routineId=31b253bb-df3a-45f2-b658-3917b82993b2,
srcNodeId=76b83b66-3858-49bd-97d8-38c49333e6f5]
org.apache.ignite.IgniteCheckedException: Failed to unmarshal object with
optimized marshaller
at
org.apache.ignite.internal.util.IgniteUtils.unmarshal(IgniteUtils.java:9968)
at
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.startDiscoveryDataRoutine(GridContinuousProcessor.java:568)
at
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.onGridDataReceived(GridContinuousProcessor.java:529)
at
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.onExchange(GridDiscoveryManager.java:888)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.processLocalJoin(ZookeeperDiscoveryImpl.java:2946)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.processBulkJoin(ZookeeperDiscoveryImpl.java:2772)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.processNewEvents(ZookeeperDiscoveryImpl.java:2638)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.processNewEvents(ZookeeperDiscoveryImpl.java:2610)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.access$2000(ZookeeperDiscoveryImpl.java:108)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl$ZkWatcher.processResult(ZookeeperDiscoveryImpl.java:4120)
at
org.apache.ignite.spi.discovery.zk.internal.ZookeeperClient$DataCallbackWrapper.processResult(ZookeeperClient.java:1163)
at
org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:569)
at
org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:505)
Caused by: org.apache.ignite.binary.BinaryObjectException: Failed to
unmarshal object with optimized marshaller
at
org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1780)
at
org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1962)
at
org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1714)
at
org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(GridBinaryMarshaller.java:310)
at
org.apache.ignite.internal.binary.BinaryMarshaller.unmarshal0(BinaryMarshaller.java:99)
at
org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:82)
at
org.apache.ignite.internal.util.IgniteUtils.unmarshal(IgniteUtils.java:9962)
... 12 common frames omitted
Caused by: org.apache.ignite.IgniteCheckedException: Failed to deserialize
object with given class loader:
[clsLdr=sun.misc.Launcher$AppClassLoader@42a57993, err=null]
at
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller.unmarshal0(OptimizedMarshaller.java:236)
at
org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:94)
at
org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1777)
... 18 common frames omitted
Caused by: java.lang.NullPointerException: null
at
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.aliveServerNodes(GridDiscoveryManager.java:1858)
at
org.apache.ignite.internal.processors.marshaller.ClientRequestFuture.(ClientRequestFuture.java:80)
at
org.apache.ignite.internal.processors.marshaller.MarshallerMappingTransport.requestMapping(MarshallerMappingTransport.java:138)
at
org.apache.ignite.internal.MarshallerContextImpl.getClassName(MarshallerContextImpl.java:375)
at
org.apache.ignite.internal.MarshallerContextImpl.getClass(MarshallerContextImpl.java:344)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java:264)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObject0(OptimizedObjectInputStream.java:341)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:198)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:416)
at
org.apache.ignite.internal.GridEventConsumeHandler.readExternal(GridEventConsumeHandler.java:457)
at
org.apache.ignite.internal.marshaller.optimized.OptimizedObjectInputStream.readExternalizable(OptimizedObjectInputStream.java:555)
at

RE: Failed to read data from remote connection

2019-01-11 Thread wangsan
Yeath, set a larger MaxDirectMemorySize . 
But, I am afraid of when nodes size be more larger.The directmemory will be
larger with node sizes.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Failed to read data from remote connection

2018-12-17 Thread wangsan
Now the cluster have 100+ nodes, when 'Start check connection process'
happens,
Some node will throw oom with Direct buffer memory (java nio).
When check connections,Many nio socker will be create ,Then oom happens?

How to fix the oom except larger xmx?

Thanks.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Failed to read data from remote connection

2018-12-03 Thread wangsan
Do you shut down C++ node properly prior killing the process?
Yeath, c++ node was killed by kill -9 .not sighup. It is a wrong ops,And
I will use kill ops.

Does this exceptions impacts cluster's functionality anyhow?
I am not sure about the exceptions. My cluster will crash with oom
(could not create native thread).And the ulimit and the config show the max
user processes is very large(64k). There are about 20 nodes in ignite. I
don't know why the cluster cost so many threads? So is this exceptions will
trigger two many socket (thread) ?

Thanks



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Questions about UDTF in flink SQL

2018-11-30 Thread wangsan
Hi Rong,

Yes, what Jark described is exactly whet I need. Currently we have a work 
around for this problem, by using a UDF whose result type is a Map. I will took 
a look on your proposals and PR. 

Thanks for your help and suggestions.

Best,
Wangsan


> On Dec 1, 2018, at 7:30 AM, Rong Rong  wrote:
> 
> Hi Wangsan,
> 
> If your require is essentially wha Jark describe, we already have a proposal 
> following up [FLINK-9249] in its related/parent task: [FLINK-9484]. We are 
> already implementing some of these internally and have one PR ready for 
> review for FLINK-9294.
> 
> Please kindly take a look and see if there's any additional features you 
> would like to comment and suggest.
> 
> Thanks,
> Rong
> 
> On Fri, Nov 30, 2018 at 1:54 AM Jark Wu  <mailto:imj...@gmail.com>> wrote:
> Hi Wangsan,
> 
> If I understand correctly, you want the return type of UDTF is determined by 
> the actual arguments, not a fixed result type. For example:
> 
> udtf("int, string, long", inputField)returns  a composite type with [f0: 
> INT, f1: VARCHAR, f2: BIGINT]
> udtf("int", inputField)returns  an atomic type with [f0: INT]
> 
> This is an interesting and useful feature IMO. But it maybe need some 
> modification for the current API of TableFunction to
> provide an additional `TypeInformation[T] getResultType(Object[] arguments, 
> Class[] argTypes)` interface. Which means need 
> more discussion in the community.
> 
> But you can create an issue if this is what you want and we can discuss how 
> to support it.
> 
> Best,
> Jark
> 
> 
> 
> On Thu, 29 Nov 2018 at 19:14, Timo Walther  <mailto:twal...@apache.org>> wrote:
> Hi Wangsan,
> 
> currently, UDFs have very strict result type assumptions. This is 
> necessary to determine the serializers for the cluster. There were 
> multiple requests for more flexible handling of types in UDFs.
> 
> Please have a look at:
> - [FLINK-7358] Add implicitly converts support for User-defined function
> - [FLINK-9294] [table] Improve type inference for UDFs with composite 
> parameter and/or result type
> - [FLINK-10958] [table] Add overload support for user defined function
> 
> I you think those issues do not represent what you need. You can open a 
> new issue with a little example of what feature you think is missing.
> 
> Regards,
> Timo
> 
> 
> Am 28.11.18 um 09:59 schrieb wangsan:
> > Hi all,
> >
> > When using user-defined table function in Flink SQL, it seems that the 
> > result type of a table function must be determinstic.
> >
> > If I want a UDTF whose result type is determined by its input parameters, 
> > what should I do?
> >
> > What I want to do is like this:
> >
> > ```
> > SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, 
> > v1, v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as 
> > T(f3, f4, f5)
> > ```
> >
> > I can surely register the same UDTF with different name and configuration, 
> > but I guess that’s not a good idea :(.
> >
> > If we can not make this in Flink SQL for now , may be we should consider 
> > this feature in future?
> >
> > Best,
> > wangsan
> 
> 



Questions about UDTF in flink SQL

2018-11-28 Thread wangsan
Hi all,

When using user-defined table function in Flink SQL, it seems that the result 
type of a table function must be determinstic. 

If I want a UDTF whose result type is determined by its input parameters, what 
should I do?

What I want to do is like this:

```
SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, 
v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, 
f5)
```

I can surely register the same UDTF with different name and configuration, but 
I guess that’s not a good idea :(. 

If we can not make this in Flink SQL for now , may be we should consider this 
feature in future?

Best,
wangsan


Questions about UDTF in flink SQL

2018-11-28 Thread wangsan
Hi all,

When using user-defined table function in Flink SQL, it seems that the result 
type of a table function must be determinstic. 

If I want a UDTF whose result type is determined by its input parameters, what 
should I do?

What I want to do is like this:

```
SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, 
v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, 
f5)
```

I can surely register the same UDTF with different name and configuration, but 
I guess that’s not a good idea :(. 

If we can not make this in Flink SQL for now , may be we should consider this 
feature in future?

Best,
wangsan


Re: Failed to read data from remote connection

2018-11-27 Thread wangsan
As I restart cpp client many times concurrently ,may be zkcluster(ignite) 
has some node path has been closed.
>From cpp client logs ,
I can see zkdiscovery watch  44 first,but the node has been closed 
watchPath=f78ec20a-5458-47b2-86e9-7b7ed0ee4227:0e508bf8-521f-4898-9b83-fc216b35601c:81|44
About 30 seconds past,Received communication error resolve request
Then it watch another path: 42
watchPath=5cb0efb1-0d1b-4b54-a8b7-ac3414e7735f:23fb17f7-cdbd-4cee-991a-46041bb0fa26:81|42
Then I don't why log "Start check connection process"?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Failed to read data from remote connection

2018-11-27 Thread wangsan
I have a cluster with zookeeper discovery, Eg java server node s1,java client
node jc1 and cpp client node cpp1
Sometimes when cpp1 restart ,s1 and jc1 will throw this exception  many
times 

 Failed to process selector key [ses=GridSelectorNioSessionImpl 

And cpp1 with have many messages likes this :
Established outgoing communication connection
[locAddr=/0:0:0:0:0:0:0:1:35153, rmtAddr=/0:0:0:0:0:0:0:1%lo:47107]
 [TcpCommunicationSpi] Closing NIO session because of unhandled
exception

the details log is : errorlog.zip
  











--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Why GridDiscoveryManager onSegmentation use StopNodeFailureHandler?

2018-11-27 Thread wangsan
Can I use LifecycleEventType.AFTER_NODE_STOP to stop jvm? if segment event
happens?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Why GridDiscoveryManager onSegmentation use StopNodeFailureHandler?

2018-11-27 Thread wangsan
Can I config the FailureHandler when segment error happen?
The IgniteConfiguration FailureHandler only used on other exception(error)
happens?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Failed to read data from remote connection

2018-11-27 Thread wangsan
When client (c++ node) restart mulit times,
The server and other client will throw this  excption

 ERROR o.a.i.s.c.tcp.TcpCommunicationSpi  - Failed to read data from remote
connection (will wait for 2000ms).
org.apache.ignite.IgniteCheckedException: Failed to select events on
selector.
at
org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.bodyInternal(GridNioServer.java:2135)
at
org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.body(GridNioServer.java:1764)
at
org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException: null
at
java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:197)
at
org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.bodyInternal(GridNioServer.java:1958)
... 3 common frames omitted



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Why GridDiscoveryManager onSegmentation use StopNodeFailureHandler?

2018-11-26 Thread wangsan
Why GridDiscoveryManager onSegmentation use StopNodeFailureHandler?
But not the StopNodeOrHaltFailureHandler !
In my case,when net problem happen, I wan't the jvm close?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


ZookeeperDiscovery block when communication error

2018-11-12 Thread wangsan
I have a server node in zone A ,then I start a client from zone B, Now access
between A,B was controlled by firewall,The acl is B can access A,but A can
not access B.
So when client in zone B join the cluster,the communication will fail caused
by firewall.

But when client in zone B closed, The cluster will be crashed(hang on new
join even from same zone without fireWall). And when restart the coordinator
server(If I started two servers in Zone A) .Another server will hang with
communication.

Looks like the whole cluster crashed when a node join failed by firewall.

But when I use tcpDiscovery, I didn't saw the cluster crash. Just saw some
communication errors,And when new node join,It still be well.

Is this a ZookeeperDiscovery bug?

The log is : zkcommuerror.log
  






--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


ZookeeperDiscovery Waiting for local join event

2018-11-08 Thread wangsan
I use ZookeeperDiscovery,but sometimes the cluster will be wried,client can
not join the cluster,and the zookeeper node /jd will have many children
nodes.
When I restart a server node,the exception is:


19:14:06.675 [zk-192_0_0_1_DAEMON_NODE_8991-EventThread] INFO 
o.a.i.s.d.z.internal.ZookeeperClient  - ZooKeeper client state changed
[prevState=Disconnected, newState=Connected]
19:14:06.692 [localhost-startStop-1] INFO 
o.a.i.s.d.z.i.ZookeeperDiscoveryImpl  - Node started join
[nodeId=63554904-e24f-42ab-b41d-c0ca893f4c31,
instanceName=192_0_0_1_DAEMON_NODE_8991, zkSessionId=0x66ebf4bcc80077,
joinDataSize=17572, consistentId=192_0_0_1_DAEMON_NODE_8991, initTime=38,
nodePath=/qatest/n/72a85298-1904-49d8-bc43-457d7af67ac2:63554904-e24f-42ab-b41d-c0ca893f4c31:80|91]
19:14:06.702 [zk-192_0_0_1_DAEMON_NODE_8991-EventThread] INFO 
o.a.i.s.d.z.i.ZookeeperDiscoveryImpl  - Discovery coordinator already
exists, watch for previous server node
[locId=63554904-e24f-42ab-b41d-c0ca893f4c31,
watchPath=25506b95-8a96-482f-b492-14108e9593f7:a5aebbd2-161b-48de-8cfb-76254599b00b:80|90]
19:14:16.701 [localhost-startStop-1] WARN 
o.a.i.s.d.z.i.ZookeeperDiscoveryImpl  - Waiting for local join event
[nodeId=63554904-e24f-42ab-b41d-c0ca893f4c31,
name=192_0_0_1_DAEMON_NODE_8991]
19:14:26.002 [zk-192_0_0_1_DAEMON_NODE_8991-EventThread] INFO 
o.a.i.s.d.z.i.ZookeeperDiscoveryImpl  - Previous server node failed, check
is node new coordinator [locId=63554904-e24f-42ab-b41d-c0ca893f4c31]
19:14:26.003 [zk-192_0_0_1_DAEMON_NODE_8991-EventThread] INFO 
o.a.i.s.d.z.i.ZookeeperDiscoveryImpl  - Discovery coordinator already
exists, watch for previous server node
[locId=63554904-e24f-42ab-b41d-c0ca893f4c31,
watchPath=fad5efb9-8350-447f-8fa0-d989bab96aa0:4ae473f6-8ca2-41dd-8aa2-c19efd90a257:80|06]
19:14:26.702 [localhost-startStop-1] WARN 
o.a.i.s.d.z.i.ZookeeperDiscoveryImpl  - Waiting for local join event
[nodeId=63554904-e24f-42ab-b41d-c0ca893f4c31,
name=192_0_0_1_DAEMON_NODE_8991]
19:14:36.702 [localhost-startStop-1] WARN 
o.a.i.s.d.z.i.ZookeeperDiscoveryImpl  - Waiting for local join event
[nodeId=63554904-e24f-42ab-b41d-c0ca893f4c31,
name=192_0_0_1_DAEMON_NODE_8991]


so is this the zookeeperdiscovery bug? 



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


persistence when disk full

2018-11-07 Thread wangsan
I have two nodes (server and persistence true) in different machines.That
node n1 in machine m1,node n2 in machine m2.  
When m1 disk full,Two node n1,n1 will be failed.Not only node n1?
How can I  only make node n1 failed?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


[jira] [Assigned] (FLINK-10770) Some generated functions are not opened properly.

2018-11-03 Thread wangsan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangsan reassigned FLINK-10770:
---

Assignee: wangsan

> Some generated functions are not opened properly.
> -
>
> Key: FLINK-10770
> URL: https://issues.apache.org/jira/browse/FLINK-10770
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0
>Reporter: wangsan
>Assignee: wangsan
>Priority: Major
>
> Recently I found sometimes UDFs are not open properly. It turns out when 
> transforming sql to execution plan, some generated functions' *open* method 
> are not called. e.g. *NonWindowJoin*, *TimeBoundedStreamJoin*, 
> *FlatJoinRunner*.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10770) Some generated functions are not opened properly.

2018-11-03 Thread wangsan (JIRA)
wangsan created FLINK-10770:
---

 Summary: Some generated functions are not opened properly.
 Key: FLINK-10770
 URL: https://issues.apache.org/jira/browse/FLINK-10770
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.2, 1.7.0
Reporter: wangsan


Recently I found sometimes UDFs are not open properly. It turns out when 
transforming sql to execution plan, some generated functions' *open* method are 
not called. e.g. *NonWindowJoin*, *TimeBoundedStreamJoin*, *FlatJoinRunner*.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10770) Some generated functions are not opened properly.

2018-11-03 Thread wangsan (JIRA)
wangsan created FLINK-10770:
---

 Summary: Some generated functions are not opened properly.
 Key: FLINK-10770
 URL: https://issues.apache.org/jira/browse/FLINK-10770
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.2, 1.7.0
Reporter: wangsan


Recently I found sometimes UDFs are not open properly. It turns out when 
transforming sql to execution plan, some generated functions' *open* method are 
not called. e.g. *NonWindowJoin*, *TimeBoundedStreamJoin*, *FlatJoinRunner*.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: npe: Failed to reinitialize local partition

2018-10-28 Thread wangsan
Hi:
I run the test in v2.5 (with npe patch) , The in_memory cache backups
will be 0 when reconnect. When I use v2.7 ,It's ok!
When  v2.7 release will be released?





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Reverse the order of fields in Flink SQL

2018-10-25 Thread wangsan
Hi Yinhua,

This is actually a bug in Flink table, you can check this issue 
https://issues.apache.org/jira/browse/FLINK-10290 
<https://issues.apache.org/jira/browse/FLINK-10290>.

I opened a PR for this issue a couple of days ago, but there is still some 
problem so it’s not ready to be merged. We have used that in our internal Flink 
version, and for now it works well. May be you can take a look at it.

Best,
wangsan

> On Oct 24, 2018, at 9:31 AM, yinhua.dai  wrote:
> 
> Hi Timo,
> 
> I write simple testing code for the issue, please checkout
> https://gist.github.com/yinhua-dai/143304464270afd19b6a926531f9acb1
> 
> I write a custom table source which just use RowCsvInputformat to create the
> dataset, and use the provided CsvTableSink, and can reproduce the issue.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: npe: Failed to reinitialize local partition

2018-10-24 Thread wangsan
IgniteTwoRegionsRebuildIndexTest.java

  


The file is the test case.
I use getOrCreateCache with name. But the cache template I have beed defined
before.When reconnect ,It looks like the cache template not work.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: npe: Failed to reinitialize local partition

2018-10-23 Thread wangsan
Tks!
I applyed the patch from the issue.And the NPE was fixed.
But another problems happen:

As the issue says:
Steps to reproduce:
Ignite server

region1 (with persistence)
region2 (without persistence)
client

cache1a from region1 – with custom affinity
cache2a fom region2 – with custom affinity  !!! set backups=2(diff from the
issule)

1. Populate data in both cache1a and cache2a.
2. Restart ignite server. It knows about cache1a from the persistent store.
It doesn’t know about cache2a.
3. Don't restart client.Wait for the client to reconnect. Then the cache2a
basckup will be 0 not 2!

It looks like client reconnect did not carry cacheconfiguration(backups=2)
but just use cacheName





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: npe: Failed to reinitialize local partition

2018-10-19 Thread wangsan
Thanks!
The ticket description is especially like my scene. I will apply the patch!

I have a little confused about the persistence file. 
If I have a default region without persistence. and cache2a use default
region. the db file(directory) won't be created.

But if I have

region1 (with persistence)   
region2 (without persistence)

cache1a from region1
cache2a fom region2 

cache2a  use region2 without persistence.Why cache2a still have db
file(directory) in file system?




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


npe: Failed to reinitialize local partition

2018-10-19 Thread wangsan
when a c++ node with cache(c++ defined) join the java server node(persistence
true).
it will throw this exception,When I clear server persistence home then
restart.the exception won't throw
Why?

o.a.i.i.p.c.d.d.p.GridDhtPartitionsExchangeFuture  - Failed to reinitialize
local partitions (preloading will be stopped): GridDhtPartitionExchangeId
[topVer=AffinityTopologyVersion [topVer=5, minorTopVer=0],
discoEvt=DiscoveryEvent [evtNode=ZookeeperClusterNode
[id=4279e27d-0091-47cf-a165-c8b769240aa8, addrs=[172.17.0.1, 10.153.106.14,
0:0:0:0:0:0:0:1%lo, 127.0.0.1], order=5, loc=false, client=true], topVer=5,
nodeId8=56841f60, msg=Node joined: ZookeeperClusterNode
[id=4279e27d-0091-47cf-a165-c8b769240aa8, addrs=[172.17.0.1, 10.153.106.14,
0:0:0:0:0:0:0:1%lo, 127.0.0.1], order=5, loc=false, client=true],
type=NODE_JOINED, tstamp=1539937359544], nodeId=4279e27d, evt=NODE_JOINED]
java.lang.NullPointerException: null
at
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager$11.apply(GridCacheDatabaseSharedManager.java:1243)
at
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager$11.apply(GridCacheDatabaseSharedManager.java:1239)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:383)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.listen(GridFutureAdapter.java:353)
at
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.rebuildIndexesIfNeeded(GridCacheDatabaseSharedManager.java:1239)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:1711)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:126)
at
org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:451)
at
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:729)
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(GridCachePartitionExchangeManager.java:2419)
at
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:2299)
at
org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
at java.lang.Thread.run(Thread.java:745)



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


zookeeper jd(joindata) never delete

2018-10-18 Thread wangsan
ignite version 2.5.0
discovery spi: ZookeeperDiscoverySpi

when I start a client node c. c can join the cluster.
but c just join the cluster never call any cache method. 
the jd path children willl never deleteed:

 



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


[jira] [Updated] (FLINK-10553) Unified sink and source table name in SQL statement

2018-10-15 Thread wangsan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangsan updated FLINK-10553:

Summary: Unified sink and source table name in SQL statement  (was: Unified 
table sink and source name in SQL statement)

> Unified sink and source table name in SQL statement
> ---
>
> Key: FLINK-10553
> URL: https://issues.apache.org/jira/browse/FLINK-10553
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.0, 1.7.0
>Reporter: wangsan
>Assignee: wangsan
>Priority: Major
>
> Since sink table can now be registered using ExternalCatalog, just the same 
> as source table, the source and sink name in SQL statement should also be 
> treated equally. Now we can only use `catalog.database.table` for sink table 
> (enclosed in back-ticks as a identifier), this is not consistent with source 
> table name (do not treat the whole name as a identifier). 
> *INSERT INTO catalog.database.sinktable SELECT ... FROM 
> catalog.database.sourcetable* should be supported .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10553) Unified table sink and source name in SQL statement

2018-10-15 Thread wangsan (JIRA)
wangsan created FLINK-10553:
---

 Summary: Unified table sink and source name in SQL statement
 Key: FLINK-10553
 URL: https://issues.apache.org/jira/browse/FLINK-10553
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.0, 1.7.0
Reporter: wangsan
Assignee: wangsan


Since sink table can now be registered using ExternalCatalog, just the same as 
source table, the source and sink name in SQL statement should also be treated 
equally. Now we can only use `catalog.database.table` for sink table (enclosed 
in back-ticks as a identifier), this is not consistent with source table name 
(do not treat the whole name as a identifier). 

*INSERT INTO catalog.database.sinktable SELECT ... FROM 
catalog.database.sourcetable* should be supported .





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10553) Unified table sink and source name in SQL statement

2018-10-15 Thread wangsan (JIRA)
wangsan created FLINK-10553:
---

 Summary: Unified table sink and source name in SQL statement
 Key: FLINK-10553
 URL: https://issues.apache.org/jira/browse/FLINK-10553
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.0, 1.7.0
Reporter: wangsan
Assignee: wangsan


Since sink table can now be registered using ExternalCatalog, just the same as 
source table, the source and sink name in SQL statement should also be treated 
equally. Now we can only use `catalog.database.table` for sink table (enclosed 
in back-ticks as a identifier), this is not consistent with source table name 
(do not treat the whole name as a identifier). 

*INSERT INTO catalog.database.sinktable SELECT ... FROM 
catalog.database.sourcetable* should be supported .





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


ignite.events().remoteListen unmarshal0 throw class not found

2018-09-20 Thread wangsan
A server and a client, then client call:
   line_1: CacheNodeFilter rmtFilter = new CacheNodeFilter(serviceType,
uuid);
   line_2: ignite.events().remoteListen(listenerDispatcher, rmtFilter,
EventType.EVT_CACHE_OBJECT_EXPIRED);

I am quite sure all the  server and client have the class CacheNodeFilter
with same version.
And CacheNodeFilter implements IgnitePredicate
But in client it will throw:
catch (ClassNotFoundException e) {
throw new IgniteCheckedException("Failed to find class with
given class loader for unmarshalling " +
"(make sure same versions of all classes are available on
all nodes or enable peer-class-loading) " +
"[clsLdr=" + clsLdr + ", cls=" + e.getMessage() + "]", e);
}

It is weird that exception throw on line_2 but not line_1.If line_1 run
successful,the class "CacheNodeFilter" must be loaded by classloader, So why
line_2 still throw ClassNotFoundException?

And the classloader is  appclassloader.






--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: ignte cluster hang with GridCachePartitionExchangeManager

2018-09-11 Thread wangsan
Yes , It was blocked when do cache operation in discovery event listeners
when node left events arrival concurrently.  I just do cache operation in
another thread. Then the listener will not be blocked. 
The original cause may be that discovery event processor hold the server
latch.when do cache operation in the listener,the server latch will be
blocked(why,I am not sure).



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


[jira] [Created] (FLINK-10290) Conversion error in StreamScan and BatchScan

2018-09-06 Thread wangsan (JIRA)
wangsan created FLINK-10290:
---

 Summary: Conversion error in StreamScan and BatchScan
 Key: FLINK-10290
 URL: https://issues.apache.org/jira/browse/FLINK-10290
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.0, 1.5.3
Reporter: wangsan


`RowTypeInfo#equals()` only compares field types, and fields names are not 
considered. When checking the equality of `inputType` and `internalType`, we 
should compare both filed types and field names.

Behavior of this bug:

A table T with schema (a: Long, b:Long, c:Long)
SELECT b,c,a from T
expected: b,c,a
actually: a,b,c





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10290) Conversion error in StreamScan and BatchScan

2018-09-06 Thread wangsan (JIRA)
wangsan created FLINK-10290:
---

 Summary: Conversion error in StreamScan and BatchScan
 Key: FLINK-10290
 URL: https://issues.apache.org/jira/browse/FLINK-10290
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.0, 1.5.3
Reporter: wangsan


`RowTypeInfo#equals()` only compares field types, and fields names are not 
considered. When checking the equality of `inputType` and `internalType`, we 
should compare both filed types and field names.

Behavior of this bug:

A table T with schema (a: Long, b:Long, c:Long)
SELECT b,c,a from T
expected: b,c,a
actually: a,b,c





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10290) Conversion error in StreamScan and BatchScan

2018-09-06 Thread wangsan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangsan reassigned FLINK-10290:
---

Assignee: wangsan

> Conversion error in StreamScan and BatchScan
> 
>
> Key: FLINK-10290
> URL: https://issues.apache.org/jira/browse/FLINK-10290
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.3, 1.6.0
>Reporter: wangsan
>Assignee: wangsan
>Priority: Major
>
> `RowTypeInfo#equals()` only compares field types, and fields names are not 
> considered. When checking the equality of `inputType` and `internalType`, we 
> should compare both filed types and field names.
> Behavior of this bug:
> A table T with schema (a: Long, b:Long, c:Long)
> SELECT b,c,a from T
> expected: b,c,a
> actually: a,b,c



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: ignite cluster management

2018-09-03 Thread wangsan


1. Note that you will have problem writing to caches when topology changes.
Why not just query ignite.cluster(), expose node traits as
node.attributes()?

>> Yes, I use attributes for node join. But I want to know the stopped
>> nodes(when node left) . Then I can do some operations such as restart or
>> alarm or start on anther machine. 

2. Having 100 servers should not be a problem. It is recommended to have
long-lived clients in this case, as opposed to creating and taking down them
all the time.

>> In a machine, It will many nodes, When the machine crash. I hope the
>> cluster can work well.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: ignte cluster hang with GridCachePartitionExchangeManager

2018-09-03 Thread wangsan
Thanks!

Can I do cache operations(update cache item) in another thread from
discovery event listeners? And the operation(update cache item) will execute
concurrently or execute before partition map exchange?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: ignte cluster hang with GridCachePartitionExchangeManager

2018-08-28 Thread wangsan
I can reproduce the bug,  above log is the server(first) node print when I
stop other nodes .



import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.logger.slf4j.Slf4jLogger;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.junit.Test;

import com.google.common.collect.Lists;

/**
 * @author wangsan
 * @date 2018/08/23
 */
public class IgniteMultiThreadOnOffTest {
private static final String CACHE_DATA_REGION_PERSISTENCE =
"PERSISTENCE_REGION";
private static final String stringCacheOneName = "StringCacheOneName";

@Test
public void testFirstServer() throws Exception {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setClientMode(false);
cfg.setDiscoverySpi(tcpDiscoverySpi());
cfg.setConsistentId("server_test_1");
cfg.setIgniteInstanceName("server_test_1");
cfg.setGridLogger(new Slf4jLogger());
Ignite ignite = Ignition.start(cfg);

CacheConfiguration cacheOneConfiguration = new
CacheConfiguration<>(stringCacheOneName);
cacheOneConfiguration.setCacheMode(CacheMode.REPLICATED);
IgniteCache cacheOne =
ignite.getOrCreateCache(cacheOneConfiguration);

ignite.events().localListen(event -> {
System.err.println("get event " + event.name() + " " + event);
if (event instanceof DiscoveryEvent) {
ClusterNode clusterNode = ((DiscoveryEvent)
event).eventNode();

String item = "event_" + clusterNode.consistentId();
System.err.println("--  oldest node process the message
: " + item);

switch (event.type()) {
case EventType.EVT_NODE_JOINED:
cacheOne.put(item, item);
//   
System.err.println("--- do add async" + item);
//   
ForkJoinPool.commonPool().execute(() -> cacheOne.put(item, item));
break;
case EventType.EVT_NODE_FAILED:
case EventType.EVT_NODE_LEFT:
// will block
cacheOne.remove(item);
//   
System.err.println("--- do remove async " + item);
//   
ForkJoinPool.commonPool().execute(() -> cacheOne.remove(item));
break;
default:
System.err.println("ignore discovery event:" +
event);
break;
}
return true;
} else {
return false;
}
}, EventType.EVTS_DISCOVERY);

while (true) {
TimeUnit.SECONDS.sleep(5);

System.err.println(" " + cacheOne.size());
   
System.err.println(ignite.cacheNames().stream().collect(Collectors.joining(",")));
}
}

@Test
public void testMultiServer() throws IOException {
ExecutorService executorService = Executors.newCachedThreadPool();
long start = System.currentTimeMillis();
CompletableFuture> allIgnite =
IntStream.range(0, 5)
.mapToObj(i -> CompletableFuture.supplyAsync(() ->
testServer(i), executorService))
.map(f -> f.thenApply(i -> Lists.newArrayList(i)))
.reduce((x, y) -> x.thenCombine(y, (a, b) -> {
a.addAll(b);
return a;
})).get();

allIgnite.thenAccept(list -> {
System.err.println("start use time ms " +
(System.currentTimeMillis() - start));
list.forEach(n -> System.err.println("n.id " +
n.cluster().localNode().consistentId()));
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
S

ignite multi thread stop bug

2018-08-28 Thread wangsan wang
In my test case:see the attachment.I can reproduce the bug

   1.

   testStartServer

   2.

   testMultiClient

   3.

   close testMultiClient

   4.

   then testStartServer will Failed to wait for partition map exchang



I need update cache in DiscoveryEvent. how to fix this?


IgniteMultiThreadTest.java
Description: Binary data


ignite cluster management

2018-08-27 Thread wangsan
I am using ignite to manage my cluster.when node join I save an item in my
NodeCache, when node left I delete an item in my NodeCache with ignite event
listener. 
Use custom NodeCache but not ignite cluster topology,The reason is I want to
keep node status persistence when node left for some restart operations.
I have some problems
1. How to guarantee client node with same consistent(or other properties) do
not join the cluster twice like server node works.
2. My nodes topology will have server 100+ client 2000+. how to make the
cluster keep high stability.

tks



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: ignte cluster hang with GridCachePartitionExchangeManager

2018-08-27 Thread wangsan wang
About question 2, debug level like this:
I start a node,then b,c,d,e,f nodes in mulitithread. then close them all.
in the debugs log, A server latch created with participantsSize=5 but only
one countdown .then latch will be hang.

simple logs is:

>>> ++
> >>> Topology snapshot.
> >>> ++
> >>> Ignite instance name: test01
> >>> Number of server nodes: 5
> >>> Number of client nodes: 0
> >>> Topology version: 71
> >>> Local: 016F5C35-AC7D-4391-9142-1A7AEA1C3378, [
> 192.168.10.103/0:0:0:0:0:0:0:1, /127.0.0.1, /192.168.10.103], 21, Mac OS
> X x86_64 10.11.6, wangqingpeng, Java(TM) SE Runtime Environment
> 1.8.0_131-b11
> >>> Remote: 58973B59-DB04-4325-9966-D259A7AE3BBD, [
> 192.168.10.103/0:0:0:0:0:0:0:1, /127.0.0.1, /192.168.10.103], 66, Mac OS
> X x86_64 10.11.6, wangqingpeng, Java(TM) SE Runtime Environment
> 1.8.0_131-b11
> >>> Remote: A79A57F0-D3A4-4ECA-890B-0A043BB0D23F, [
> 192.168.10.103/0:0:0:0:0:0:0:1, /127.0.0.1, /192.168.10.103], 67, Mac OS
> X x86_64 10.11.6, wangqingpeng, Java(TM) SE Runtime Environment
> 1.8.0_131-b11
> >>> Remote: C261B07C-9495-4058-889B-BD484BE10477, [
> 192.168.10.103/0:0:0:0:0:0:0:1, /127.0.0.1, /192.168.10.103], 68, Mac OS
> X x86_64 10.11.6, wangqingpeng, Java(TM) SE Runtime Environment
> 1.8.0_131-b11
> >>> Remote: 230E516F-6C12-4391-B902-822AFC6F7BC4, [
> 192.168.10.103/0:0:0:0:0:0:0:1, /127.0.0.1, /192.168.10.103], 70, Mac OS
> X x86_64 10.11.6, wangqingpeng, Java(TM) SE Runtime Environment
> 1.8.0_131-b11
> >>> Total number of CPUs: 4
> >>> Total heap size: 3.6GB
> >>> Total offheap size: 6.4GB
> ..
> 23:48:44.629 [exchange-worker-#42%test01%] DEBUG
> o.a.i.i.p.c.d.d.p.l.ExchangeLatchManager  - Server latch is created
> [latch=IgniteBiTuple [val1=exchange, val2=AffinityTopologyVersion
> [topVer=71, minorTopVer=0]], participantsSize=5]
> ..
> 23:48:44.635 [exchange-worker-#42%test01%] DEBUG
> o.a.i.i.p.c.d.d.p.l.ExchangeLatchManager  - Count down +
> [latch=exchange-AffinityTopologyVersion [topVer=71, minorTopVer=0],
> remaining=4]
> ..
> 23:48:54.639 [exchange-worker-#42%test01%] WARN
> o.a.i.i.p.c.d.d.p.GridDhtPartitionsExchangeFuture  - Unable to await
> partitions release latch within timeout: ServerLatch [permits=4,
> pendingAcks=[230e516f-6c12-4391-b902-822afc6f7bc4,
> 58973b59-db04-4325-9966-d259a7ae3bbd, a79a57f0-d3a4-4eca-890b-0a043bb0d23f,
> c261b07c-9495-4058-889b-bd484be10477], super=CompletableLatch [id=exchange,
> topVer=AffinityTopologyVersion [topVer=71, minorTopVer=0]]]



full logs is:
see the attachment

tks


question2.log
Description: Binary data


ignte cluster hang with GridCachePartitionExchangeManager

2018-08-24 Thread wangsan
Now my cluster topology is Node a,b,c,d  all with persistence enable and
peerclassloader false. b c d have different class(cache b) from a.
1.When any node crash with oom(memory or stack) .all nodes hang with " -
Still waiting for initial partition map exchange "
2.When a start first,  b,c,d start in multi threads concurrent.b,c,d hang
with " - Still waiting for initial partition map exchange ".a hang with
"Unable to await partitions release latch"





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread wangsan
Hi Timo, 

I think this may not only affect  explain() method. Method 
DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode 
into DataStream or DataSet, we add desired operators in execution environment. 
By side effect, I mean that if we call DataStreamRel#translateToPlan on same 
RelNode  several times, the same operators are added in execution environment 
more than once, but actually we need that for only one time. Correct me if I 
misunderstood that.

I will open an issue late this day, if this is indeed a problem.

Best,
wangsan



> On Aug 21, 2018, at 10:16 PM, Timo Walther  wrote:
> 
> Hi,
> 
> this sounds like a bug to me. Maybe the explain() method is not implemented 
> correctly. Can you open an issue for it in Jira?
> 
> Thanks,
> Timo
> 
> 
> Am 21.08.18 um 15:04 schrieb wangsan:
>> Hi all,
>> 
>> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
>> may cause the execution plan not as what we expected. Every time we call 
>> DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
>> TableEnvirnment#writeToSink, etc), we add same operators in execution 
>> environment repeatedly.
>> 
>> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>> 
>> Best,  Wangsan
>> 
>> appendix
>> 
>> tenv.registerTableSource("test_source", sourceTable)
>> 
>> val t = tenv.sqlQuery("SELECT * from test_source")
>> println(tenv.explain(t))
>> println(tenv.explain(t))
>> 
>> implicit val typeInfo = TypeInformation.of(classOf[Row])
>> tenv.toAppendStream(t)
>> println(tenv.explain(t))
>> We call explain three times, and the Physical Execution Plan are all 
>> diffrent.
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 4 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 5 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 6 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 4 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 5 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 6 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 7 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 8 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 9 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 10 : Operator
>> content : to: Row
>> ship_strategy : FORWARD
>> 
>> Stage 11 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 12 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 13 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 



Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread wangsan
Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
may cause the execution plan not as what we expected. Every time we call 
DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
TableEnvirnment#writeToSink, etc), we add same operators in execution 
environment repeatedly. 

Should we eliminate the side effect of DataStreamRel#translateToPlan ? 

Best,  Wangsan

appendix

tenv.registerTableSource("test_source", sourceTable)

val t = tenv.sqlQuery("SELECT * from test_source")
println(tenv.explain(t))
println(tenv.explain(t))

implicit val typeInfo = TypeInformation.of(classOf[Row])
tenv.toAppendStream(t)
println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD

Stage 7 : Data Source
content : collect elements with CollectionInputFormat

Stage 8 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 9 : Operator
content : Map
ship_strategy : FORWARD

Stage 10 : Operator
content : to: Row
ship_strategy : FORWARD

Stage 11 : Data Source
content : collect elements with CollectionInputFormat

Stage 12 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 13 : Operator
content : Map
ship_strategy : FORWARD



Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread wangsan
Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
may cause the execution plan not as what we expected. Every time we call 
DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
TableEnvirnment#writeToSink, etc), we add same operators in execution 
environment repeatedly. 

Should we eliminate the side effect of DataStreamRel#translateToPlan ? 

Best,  Wangsan

appendix

tenv.registerTableSource("test_source", sourceTable)

val t = tenv.sqlQuery("SELECT * from test_source")
println(tenv.explain(t))
println(tenv.explain(t))

implicit val typeInfo = TypeInformation.of(classOf[Row])
tenv.toAppendStream(t)
println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD

Stage 7 : Data Source
content : collect elements with CollectionInputFormat

Stage 8 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 9 : Operator
content : Map
ship_strategy : FORWARD

Stage 10 : Operator
content : to: Row
ship_strategy : FORWARD

Stage 11 : Data Source
content : collect elements with CollectionInputFormat

Stage 12 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 13 : Operator
content : Map
ship_strategy : FORWARD



how ignite c++ node set baselinetopology

2018-08-13 Thread wangsan
I have a java node with persistence. Then when c++ node join.how to active
and set blt with api?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: two data region with two nodes

2018-08-10 Thread wangsan
I just want  A module: node a1 a2 make cache cache_a be  persistence and
balance in A module only with persistence region p_region
B module node b1,b2 with cache cache_b be non_persistence and distribute in
B module.B will access cache_a and have same region p_region for persistence
even cache_b in default region(non_persistence)
and a1,a2 b1,b2 in one cluster. but A won't affect B when a1 restart or all
the A module restart 
But after restart, some weird  things happen, such as baselinetoplogy
smaller then one, or hang with partitionexchangemanager with message : Still
waiting for initial partition map exchange 

I'm confused about ignite's multi roles nodes in one cluster. 
And our node size will be 2000+. I'm worried about the  backward
compatibility and availability of clusters.




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: two data region with two nodes

2018-08-08 Thread wangsan
tks,
I will try the oldest node select method to ensure only one node to process
the event message.

As mentioned earlier, in my project ,I have several modules。eg:
module A , daemon node with cache nodecache,machinecache .all the cache
persistent。
module B,  search node with cache featureCache,all the caches are
persistent。
module C, processor node with cache hotCache, non-persitence
module D, algo node,via C++ ,with cache algoCache ,non-persitence
more...

B,C,D's cache is private, via cahcenodefilter to guarantee isolation。
A’s cache is public,all the moudles(A,B,C,D...) can access the
nodecache,machinecache .but still use node filter to ensure persistent
data(such as wal,archive) only in module A‘s nodes.  nodecahce just like
cluster.nodes .I just wan't to keep them be persistent.

my topology is:
B1 B2  C1 D1
\   ||/  access nodeCache which store in node A1,A2
A1 A2













--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: two data region with two nodes

2018-08-01 Thread wangsan
Thank you for your answer,
Maybe I use region config the wrong way.
There are two apps(more apps with different roles) with different ignite
configs, 
first App 
:set default region with persistence enable
:set cache a,with nodefilter in first apps,default region
second app 
:set default region with persistence disable
:just access cache a with query

start first app instance 1,second app instance 2,and first app instance 3. 
then close 1, 
then restart 1, the deadlock will happes

fyi, I use ignitelock in first apps when process ignite discovery event such
as join event.  when a new node join, apps 1,3 will receive join message by
local event listener, but I just want one node to process the message, so i
use it like this:

if (globalLock.tryLock()) {
LOGGER.info("--  hold global lock ");
try {
switch (event.type()) {
case EventType.EVT_NODE_JOINED:
joinListener.onMessage(clusterNode);
break;
case EventType.EVT_NODE_FAILED:
case EventType.EVT_NODE_LEFT:
leftListener.onMessage(clusterNode);
break;
default:
LOGGER.info("ignore discovery event: {}",
event);
break;
}
} finally {
LOGGER.debug("--  process event done ");
// don't unlock until node left
// globalLock.unlock();
}
}

the node which hold the globalLock will never unlock unless it left, Is it
the right way with lock




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Confusions About JDBCOutputFormat

2018-07-11 Thread wangsan
Well, I see. If the connection is established when writing data into DB, we 
need to cache received rows since last write. 

IMO, maybe we do not need to open connections repeatedly or introduce 
connection pools. Test and refresh the connection periodically can simply solve 
this problem. I’ve implemented this at 
https://github.com/apache/flink/pull/6301 
<https://github.com/apache/flink/pull/6301>, It would be kind of you to review 
this.

Best,
wangsan


> On Jul 11, 2018, at 2:25 PM, Hequn Cheng  wrote:
> 
> Hi wangsan,
> 
> What I mean is establishing a connection each time write data into JDBC,
> i.e.  establish a connection in flush() function. I think this will make
> sure the connection is ok. What do you think?
> 
> On Wed, Jul 11, 2018 at 12:12 AM, wangsan  <mailto:wamg...@163.com>> wrote:
> 
>> Hi Hequn,
>> 
>> Establishing a connection for each batch write may also have idle
>> connection problem, since we are not sure when the connection will be
>> closed. We call flush() method when a batch is finished or  snapshot state,
>> but what if the snapshot is not enabled and the batch size not reached
>> before the connection is closed?
>> 
>> May be we could use a Timer to test the connection periodically and keep
>> it alive. What do you think?
>> 
>> I will open a jira and try to work on that issue.
>> 
>> Best,
>> wangsan
>> 
>> 
>> 
>> On Jul 10, 2018, at 8:38 PM, Hequn Cheng  wrote:
>> 
>> Hi wangsan,
>> 
>> I agree with you. It would be kind of you to open a jira to check the
>> problem.
>> 
>> For the first problem, I think we need to establish connection each time
>> execute batch write. And, it is better to get the connection from a
>> connection pool.
>> For the second problem, to avoid multithread problem, I think we should
>> synchronized the batch object in flush() method.
>> 
>> What do you think?
>> 
>> Best, Hequn
>> 
>> 
>> 
>> On Tue, Jul 10, 2018 at 2:36 PM, wangsan > <mailto:wamg...@163.com>> wrote:
>> 
>>> Hi all,
>>> 
>>> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink
>>> application. But I am confused with the implementation of JDBCOutputFormat.
>>> 
>>> 1. The Connection was established when JDBCOutputFormat is opened, and
>>> will be used all the time. But if this connction lies idle for a long time,
>>> the database will force close the connetion, thus errors may occur.
>>> 2. The flush() method is called when batchCount exceeds the threshold,
>>> but it is also called while snapshotting state. So two threads may modify
>>> upload and batchCount, but without synchronization.
>>> 
>>> Please correct me if I am wrong.
>>> 
>>> ——
>>> wangsan



Re: Confusions About JDBCOutputFormat

2018-07-11 Thread wangsan
Well, I see. If the connection is established when writing data into DB, we 
need to cache received rows since last write. 

IMO, maybe we do not need to open connections repeatedly or introduce 
connection pools. Test and refresh the connection periodically can simply solve 
this problem. I’ve implemented this at 
https://github.com/apache/flink/pull/6301 
<https://github.com/apache/flink/pull/6301>, It would be kind of you to review 
this.

Best,
wangsan


> On Jul 11, 2018, at 2:25 PM, Hequn Cheng  wrote:
> 
> Hi wangsan,
> 
> What I mean is establishing a connection each time write data into JDBC,
> i.e.  establish a connection in flush() function. I think this will make
> sure the connection is ok. What do you think?
> 
> On Wed, Jul 11, 2018 at 12:12 AM, wangsan  <mailto:wamg...@163.com>> wrote:
> 
>> Hi Hequn,
>> 
>> Establishing a connection for each batch write may also have idle
>> connection problem, since we are not sure when the connection will be
>> closed. We call flush() method when a batch is finished or  snapshot state,
>> but what if the snapshot is not enabled and the batch size not reached
>> before the connection is closed?
>> 
>> May be we could use a Timer to test the connection periodically and keep
>> it alive. What do you think?
>> 
>> I will open a jira and try to work on that issue.
>> 
>> Best,
>> wangsan
>> 
>> 
>> 
>> On Jul 10, 2018, at 8:38 PM, Hequn Cheng  wrote:
>> 
>> Hi wangsan,
>> 
>> I agree with you. It would be kind of you to open a jira to check the
>> problem.
>> 
>> For the first problem, I think we need to establish connection each time
>> execute batch write. And, it is better to get the connection from a
>> connection pool.
>> For the second problem, to avoid multithread problem, I think we should
>> synchronized the batch object in flush() method.
>> 
>> What do you think?
>> 
>> Best, Hequn
>> 
>> 
>> 
>> On Tue, Jul 10, 2018 at 2:36 PM, wangsan > <mailto:wamg...@163.com>> wrote:
>> 
>>> Hi all,
>>> 
>>> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink
>>> application. But I am confused with the implementation of JDBCOutputFormat.
>>> 
>>> 1. The Connection was established when JDBCOutputFormat is opened, and
>>> will be used all the time. But if this connction lies idle for a long time,
>>> the database will force close the connetion, thus errors may occur.
>>> 2. The flush() method is called when batchCount exceeds the threshold,
>>> but it is also called while snapshotting state. So two threads may modify
>>> upload and batchCount, but without synchronization.
>>> 
>>> Please correct me if I am wrong.
>>> 
>>> ——
>>> wangsan



[jira] [Commented] (FLINK-9794) JDBCOutputFormat does not consider idle connection and multithreads synchronization

2018-07-10 Thread wangsan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539468#comment-16539468
 ] 

wangsan commented on FLINK-9794:


Hi [~yanghua], I've opened a pull request for this issue, it would be very kind 
of you if you can review the changes.

> JDBCOutputFormat does not consider idle connection and multithreads 
> synchronization
> ---
>
> Key: FLINK-9794
> URL: https://issues.apache.org/jira/browse/FLINK-9794
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0, 1.5.0
>Reporter: wangsan
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of  JDBCOutputFormat has two potential problems: 
> 1. The Connection was established when JDBCOutputFormat is opened, and will 
> be used all the time. But if this connection lies idle for a long time, the 
> database will force close the connection, thus errors may occur.
> 2. The flush() method is called when batchCount exceeds the threshold, but it 
> is also called while snapshotting state. So two threads may modify upload and 
> batchCount, but without synchronization.
> We need fix these two problems to make JDBCOutputFormat more reliable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9794) JDBCOutputFormat does not consider idle connection and multithreads synchronization

2018-07-10 Thread wangsan (JIRA)
wangsan created FLINK-9794:
--

 Summary: JDBCOutputFormat does not consider idle connection and 
multithreads synchronization
 Key: FLINK-9794
 URL: https://issues.apache.org/jira/browse/FLINK-9794
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.5.0, 1.4.0
Reporter: wangsan


Current implementation of  JDBCOutputFormat has two potential problems: 

1. The Connection was established when JDBCOutputFormat is opened, and will be 
used all the time. But if this connection lies idle for a long time, the 
database will force close the connection, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold, but it 
is also called while snapshotting state. So two threads may modify upload and 
batchCount, but without synchronization.

We need fix these two problems to make JDBCOutputFormat more reliable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9794) JDBCOutputFormat does not consider idle connection and multithreads synchronization

2018-07-10 Thread wangsan (JIRA)
wangsan created FLINK-9794:
--

 Summary: JDBCOutputFormat does not consider idle connection and 
multithreads synchronization
 Key: FLINK-9794
 URL: https://issues.apache.org/jira/browse/FLINK-9794
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.5.0, 1.4.0
Reporter: wangsan


Current implementation of  JDBCOutputFormat has two potential problems: 

1. The Connection was established when JDBCOutputFormat is opened, and will be 
used all the time. But if this connection lies idle for a long time, the 
database will force close the connection, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold, but it 
is also called while snapshotting state. So two threads may modify upload and 
batchCount, but without synchronization.

We need fix these two problems to make JDBCOutputFormat more reliable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Confusions About JDBCOutputFormat

2018-07-10 Thread wangsan
Hi Hequn,

Establishing a connection for each batch write may also have idle connection 
problem, since we are not sure when the connection will be closed. We call 
flush() method when a batch is finished or  snapshot state, but what if the 
snapshot is not enabled and the batch size not reached before the connection is 
closed?

May be we could use a Timer to test the connection periodically and keep it 
alive. What do you think?

I will open a jira and try to work on that issue.

Best, 
wangsan



> On Jul 10, 2018, at 8:38 PM, Hequn Cheng  wrote:
> 
> Hi wangsan,
> 
> I agree with you. It would be kind of you to open a jira to check the problem.
> 
> For the first problem, I think we need to establish connection each time 
> execute batch write. And, it is better to get the connection from a 
> connection pool.
> For the second problem, to avoid multithread problem, I think we should 
> synchronized the batch object in flush() method.
> 
> What do you think?
> 
> Best, Hequn
> 
> 
> 
> On Tue, Jul 10, 2018 at 2:36 PM, wangsan  <mailto:wamg...@163.com>> wrote:
> Hi all,
> 
> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink 
> application. But I am confused with the implementation of JDBCOutputFormat.
> 
> 1. The Connection was established when JDBCOutputFormat is opened, and will 
> be used all the time. But if this connction lies idle for a long time, the 
> database will force close the connetion, thus errors may occur.
> 2. The flush() method is called when batchCount exceeds the threshold, but it 
> is also called while snapshotting state. So two threads may modify upload and 
> batchCount, but without synchronization.
> 
> Please correct me if I am wrong.
> 
> ——
> wangsan
> 



Re: Confusions About JDBCOutputFormat

2018-07-10 Thread wangsan
Hi Hequn,

Establishing a connection for each batch write may also have idle connection 
problem, since we are not sure when the connection will be closed. We call 
flush() method when a batch is finished or  snapshot state, but what if the 
snapshot is not enabled and the batch size not reached before the connection is 
closed?

May be we could use a Timer to test the connection periodically and keep it 
alive. What do you think?

I will open a jira and try to work on that issue.

Best, 
wangsan



> On Jul 10, 2018, at 8:38 PM, Hequn Cheng  wrote:
> 
> Hi wangsan,
> 
> I agree with you. It would be kind of you to open a jira to check the problem.
> 
> For the first problem, I think we need to establish connection each time 
> execute batch write. And, it is better to get the connection from a 
> connection pool.
> For the second problem, to avoid multithread problem, I think we should 
> synchronized the batch object in flush() method.
> 
> What do you think?
> 
> Best, Hequn
> 
> 
> 
> On Tue, Jul 10, 2018 at 2:36 PM, wangsan  <mailto:wamg...@163.com>> wrote:
> Hi all,
> 
> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink 
> application. But I am confused with the implementation of JDBCOutputFormat.
> 
> 1. The Connection was established when JDBCOutputFormat is opened, and will 
> be used all the time. But if this connction lies idle for a long time, the 
> database will force close the connetion, thus errors may occur.
> 2. The flush() method is called when batchCount exceeds the threshold, but it 
> is also called while snapshotting state. So two threads may modify upload and 
> batchCount, but without synchronization.
> 
> Please correct me if I am wrong.
> 
> ——
> wangsan
> 



[jira] [Resolved] (SPARK-22503) Using current processing time to generate windows in streaming processing

2018-07-10 Thread wangsan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-22503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangsan resolved SPARK-22503.
-
Resolution: Not A Problem

> Using current processing time to generate windows in streaming processing
> -
>
> Key: SPARK-22503
> URL: https://issues.apache.org/jira/browse/SPARK-22503
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.3.0
>Reporter: wangsan
>Priority: Major
>
> There are cases we want to use processing time to generate windows in stream 
> processing, but `current_timestamp()` are not allowed in `window()` function 
> now. 
> When using `current_timestamp` in `window` function, the exception tells 
> "nondeterministic expressions are only allowed in Project, Filter, Aggregate 
> or Window". 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Confusions About JDBCOutputFormat

2018-07-10 Thread wangsan
Hi all,

I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink 
application. But I am confused with the implementation of JDBCOutputFormat.

1. The Connection was established when JDBCOutputFormat is opened, and will be 
used all the time. But if this connction lies idle for a long time, the 
database will force close the connetion, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold, but it 
is also called while snapshotting state. So two threads may modify upload and 
batchCount, but without synchronization.

Please correct me if I am wrong.

——
wangsan


Confusions About JDBCOutputFormat

2018-07-10 Thread wangsan
Hi all,

I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink 
application. But I am confused with the implementation of JDBCOutputFormat.

1. The Connection was established when JDBCOutputFormat is opened, and will be 
used all the time. But if this connction lies idle for a long time, the 
database will force close the connetion, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold, but it 
is also called while snapshotting state. So two threads may modify upload and 
batchCount, but without synchronization.

Please correct me if I am wrong.

——
wangsan


Re: ignite c++ support service grid?

2018-06-07 Thread wangsan
can i use c++ disturbted closure  on java ingite node



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


ignite c++ support service grid?

2018-05-27 Thread wangsan
>From the doc  https://apacheignite-cpp.readme.io/docs
  , I did not find any service grid
description. I just want to make sure ignite c++ don't support service
grid(ignite service rpc) now, but maybe later.

I have some c++ nodes do deeplearning which offer visual algorithm
interfaces。Then i want my ignite java client to call those interfaces with
ignite service grid.Is it possible?

 



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


client service fail when reconnect,all server nodes stopped then restart

2018-05-15 Thread wangsan
i see some bugs such as  https://issues.apache.org/jira/browse/IGNITE-2766
   
https://github.com/apache/ignite/pull/2627
  
when i apply the patch, cache can be reused when client reconnect. but
igniteservice can not be reused.
the ex as follows:

Caused by: java.lang.IllegalStateException: class
org.apache.ignite.internal.processors.cache.CacheStoppedException: Failed to
perform cache operation (cache is stopped): ignite-sys-cache
at
org.apache.ignite.internal.processors.cache.GridCacheGateway.enter(GridCacheGateway.java:164)
at
org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy.mapKeyToNode(GridCacheAffinityProxy.java:202)
at
org.apache.ignite.internal.processors.service.GridServiceProcessor.serviceTopology(GridServiceProcessor.java:931)
at
org.apache.ignite.internal.processors.service.GridServiceProxy.randomNodeForService(GridServiceProxy.java:274)
at
org.apache.ignite.internal.processors.service.GridServiceProxy.nodeForService(GridServiceProxy.java:260)
at
org.apache.ignite.internal.processors.service.GridServiceProxy.invokeMethod(GridServiceProxy.java:170)






--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Unsubscribe

2018-02-09 Thread wangsan


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Question about Timestamp in Flink SQL

2017-11-29 Thread wangsan
Hi Timo,

What I am doing is extracting a timestamp field (may be string format as 
“2017-11-28 11:00:00” or a long value base on my current timezone) as Event 
time attribute. So In timestampAndWatermarkAssigner , for string format I 
should parse the data time string using GMT, and for long value I should add 
the offset as opposite to what internalToTimestamp did. But the Processing time 
attribute can not keep consistent. Am I understanding that correctly?

Best,
wangsan



> On 29 Nov 2017, at 4:43 PM, Timo Walther <twal...@apache.org> wrote:
> 
> Hi Wangsan,
> 
> currently the timestamps in Flink SQL do not depend on a timezone. All 
> calculations happen on the UTC timestamp. This also guarantees that an input 
> with Timestamp.valueOf("XXX") remains consistent when parsing and outputing 
> it with toString().
> 
> Regards,
> Timo
> 
> 
> Am 11/29/17 um 3:43 AM schrieb wangsan:
>> Hi Xincan,
>> 
>> Thanks for your reply. 
>> 
>> The system default timezone is just as what I expected 
>> (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
>>  
>> I looked into the generated code, and I found the following code snippet:
>> 
>> ```
>> result$20 = 
>> org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
>> ```
>> 
>> And what `internalToTimestamp` function did is:
>> 
>> ```
>> public static Timestamp internalToTimestamp(long v) {
>> return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
>> }
>> ```
>> 
>> So, if I give it an event time with unix timestamp 0, then I got the 
>> Timestamp(-2880). I am confused why `internalToTimestamp` need to 
>> subtract the offset?
>> 
>> Best,
>> wangsan
>> 
>> 
>>> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com 
>>> <mailto:xingc...@gmail.com>> wrote:
>>> 
>>> Hi wangsan,
>>> 
>>> in Flink, the ProcessingTime is just implemented by invoking 
>>> System.currentTimeMillis() and the long value will be automatically wrapped 
>>> to a Timestamp with the following statement:
>>> 
>>> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
>>> 
>>> You can check your TimeZone.getDefault() to see if it returns the right 
>>> TimeZone. Generally, the returned value should rely on the default TimeZone 
>>> of your operating system.
>>> 
>>> Hope that helps.
>>> 
>>> Best,
>>> Xingcan
>>> 
>>> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com 
>>> <mailto:wamg...@163.com>> wrote:
>>> Hi all,
>>> 
>>> While using Timestamp in Flint SQL, how can I set timezone info? Since my 
>>> current timezone is GMT+8, and I found the selected processing time is 
>>> always 8 hours late than current time. So as extracted event time.
>>> 
>>> Here’s my simplified code:
>>> val senv = StreamExecutionEnvironment.getExecutionEnvironment
>>> senv.setParallelism(1)
>>> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>> 
>>> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
>>> println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
>>> Locale.CHINA).format(new Date())}")
>>> 
>>> val stream: DataStream[(String, String, String)] = 
>>> senv.socketTextStream("localhost", ).map(line => (line, line, line))
>>> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 
>>> 't.proctime)
>>> sTableEnv.registerTable("foo", table)
>>> val result = sTableEnv.sql("select * from foo")
>>> result.printSchema()
>>> result.toAppendStream[Row].print()
>>> 
>>> senv.execute("foo")
>>> And here’s the result:
>>> 
>>> 
>>> 
>>> Best,
>>> wangsan
>>> 
>> 
> 



Re: Question about Timestamp in Flink SQL

2017-11-28 Thread wangsan
Hi Xincan,

Thanks for your reply. 

The system default timezone is just as what I expected 
(sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
 
I looked into the generated code, and I found the following code snippet:

```
result$20 = 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the 
Timestamp(-2880). I am confused why `internalToTimestamp` need to subtract 
the offset?

Best,
wangsan


> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com> wrote:
> 
> Hi wangsan,
> 
> in Flink, the ProcessingTime is just implemented by invoking 
> System.currentTimeMillis() and the long value will be automatically wrapped 
> to a Timestamp with the following statement:
> 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
> 
> You can check your TimeZone.getDefault() to see if it returns the right 
> TimeZone. Generally, the returned value should rely on the default TimeZone 
> of your operating system.
> 
> Hope that helps.
> 
> Best,
> Xingcan
> 
> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com 
> <mailto:wamg...@163.com>> wrote:
> Hi all,
> 
> While using Timestamp in Flint SQL, how can I set timezone info? Since my 
> current timezone is GMT+8, and I found the selected processing time is always 
> 8 hours late than current time. So as extracted event time.
> 
> Here’s my simplified code:
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> 
> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
> println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
> Locale.CHINA).format(new Date())}")
> 
> val stream: DataStream[(String, String, String)] = 
> senv.socketTextStream("localhost", ).map(line => (line, line, line))
> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
> sTableEnv.registerTable("foo", table)
> val result = sTableEnv.sql("select * from foo")
> result.printSchema()
> result.toAppendStream[Row].print()
> 
> senv.execute("foo")
> And here’s the result:
> 
> 
> 
> Best,
> wangsan
> 



Re: Hive integration in table API and SQL

2017-11-20 Thread wangsan
Hi Timo,

Thanks for your reply. I do notice that the document says "A Table is always 
bound to a specific TableEnvironment. It is not possible to combine tables of 
different TableEnvironments in the same query, e.g., to join or union them.” 
Does that mean there is no way I can make operations, like join, on a streaming 
table and a batch table ?

Best,
wangsan

> On 20 Nov 2017, at 9:15 PM, Timo Walther <twal...@apache.org> wrote:
> 
> Timo



Hive integration in table API and SQL

2017-11-20 Thread wangsan
Hi all,

I am currently learning table API and SQL in Flink. I noticed that Flink does 
not support Hive tables as table source, and even JDBC table source are not 
provided. There are cases we do need to join a stream table with static Hive or 
other database tables to get more specific attributes, so how can I implements 
this functionality. Do I need to implement my own dataset connectors to load 
data from external tables using JDBC and register the dataset as table, or 
should I provide an external catalog?

Thanks,
wangsan


[jira] [Updated] (SPARK-22503) Using current processing time to generate windows in streaming processing

2017-11-12 Thread wangsan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangsan updated SPARK-22503:

Affects Version/s: 2.2.0

> Using current processing time to generate windows in streaming processing
> -
>
> Key: SPARK-22503
> URL: https://issues.apache.org/jira/browse/SPARK-22503
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.3.0
>Reporter: wangsan
>
> There are cases we want to use processing time to generate windows in stream 
> processing, but `current_timestamp()` are not allowed in `window()` function 
> now. 
> When using `current_timestamp` in `window` function, the exception tells 
> "nondeterministic expressions are only allowed in Project, Filter, Aggregate 
> or Window". 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22503) Using current processing time to generate windows in streaming processing

2017-11-12 Thread wangsan (JIRA)
wangsan created SPARK-22503:
---

 Summary: Using current processing time to generate windows in 
streaming processing
 Key: SPARK-22503
 URL: https://issues.apache.org/jira/browse/SPARK-22503
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: wangsan


There are cases we want to use processing time to generate windows in stream 
processing, but `current_timestamp()` are not allowed in `window()` function 
now. 

When using `current_timestamp` in `window` function, the exception tells 
"nondeterministic expressions are only allowed in Project, Filter, Aggregate or 
Window". 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Generate windows on processing time in Spark Structured Streaming

2017-11-10 Thread wangsan
Hi all,


How can I use current processing time to generate windows in streaming 
processing? 
window function's Scala doc says "For a streaming query, you may use the 
function current_timestamp to generate windows on processing time.”  But when 
using current_timestamp as column in window function, exceptions occurred.


Here are my code:
val socketDF = spark.readStream
  .format("socket")
.option("host", "localhost")
.option("port", )
.load()

socketDF.createOrReplaceTempView("words")
val windowedCounts = spark.sql(
"""
|SELECT value as word, current_timestamp() as time, count(1) as count FROM 
words
|   GROUP BY window(time, "5 seconds"), word
  """.stripMargin)

windowedCounts
  .writeStream
  .outputMode("complete")
.format("console")
.start()
.awaitTermination()
And here are Exception Info:
Caused by: org.apache.spark.sql.AnalysisException: nondeterministic expressions 
are only allowed in
Project, Filter, Aggregate or Window, found:







Re:Re: Exception in BucketingSink when cancelling Flink job

2017-09-28 Thread wangsan
Hi,


'Join' method can be call with a timeout (as is called in TaskCanceler), so it 
won't be block forever if  the respective thread is in deadlock state. Maybe 
calling 'interrupt()'  after 'join(timeout)' is more reasonable, altought it 
still can not make sure operations inside 'close()' method is finished.


Best,
wangsan


在2017年09月29 01时52分, "Stephan Ewen"<step...@data-artisans.com>写道:


Hi!


Calling 'interrupt()' makes only sense before 'join()', because 'join()' blocks 
until the respective thread is finished.
The 'interrupt()' call happens to cancel the task out of potentially blocking 
I/O or sleep/wait operations.


The problem is that HDFS does not handle interrupts correctly, it sometimes 
deadlocks in the case of interrupts on unclosed streams :-(


I think it would be important to make sure (in the Bucketing Sink) that the DFS 
streams are closed upon task cancellation.
@aljoscha - adding you to this thread, as you know most about the bucketing 
sink.


Best,
Stephan




On Wed, Sep 27, 2017 at 10:18 AM, Stefan Richter <s.rich...@data-artisans.com> 
wrote:

Hi,


I would speculate that the reason for this order is that we want to shutdown 
the tasks quickly by interrupting blocking calls in the event of failure, so 
that recover can begin as fast as possible. I am looping in Stephan who might 
give more details about this code.


Best,
Stefan 


Am 27.09.2017 um 07:33 schrieb wangsan <wamg...@163.com>:



After digging into the source code, we found that when Flink job is canceled, a 
TaskCanceler thread is created.

The TaskCanceler thread calls cancel() on the invokable and periodically 
interrupts the
task thread until it has terminated.

try {
  invokable.cancel();
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//..executer.interrupt();try {
  executer.join(interruptInterval);
}catch (InterruptedException e) {  // we can ignore this}//..

Notice that TaskCanceler first send interrupt signal to task thread, and 
following with join method. And since the task thread is now try to close 
DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed 
out in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {break;
  }  try {
dataQueue.wait(1000); // when we receive an ack, we notify on
// dataQueue
  } catch (InterruptedException ie) {thrownewInterruptedIOException(
"Interrupted while waiting for data to be acknowledged by pipeline");
  }
}

I was confused why TaskCanceler call executer.interrupt() before 
executer.join(interruptInterval). Can anyone help?










Hi,


We are currently using BucketingSink to save data into HDFS in parquet format. 
But when the flink job was cancelled, we always got Exception in 
BucketingSink's  close method. The datailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] 
[org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal 
of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be 
acknowledged by pipeline
at 
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at 
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
...

at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)


It seems that DFSOutputStream haven't been closed before task thread is force 
terminated. We found a similar problem in 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html
 , but setting "akka.ask.timeout" to a larger value does not work for us. So 
how can we make sure the stream is safely closed when cacelling a job?


Best,
wangsan















Question about job canceling in Flink

2017-09-27 Thread wangsan
Hi all,

We are currently using BucketingSink to save data into HDFS in parquet format. 
But when the flink job was canceled, we always got Exception in BucketingSink’s 
close method. The detailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] 
[org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal 
of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be 
acknowledged by pipeline
   at 
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
   at 
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
   at 
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
   at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
   at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
   at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
   at 
org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
   at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
   at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
   …….
   at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
   at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
   at java.lang.Thread.run(Thread.java:745)

After digging into the source code, we found that when Flink job is canceled, a 
TaskCanceler thread is created. The TaskCanceler thread calls cancel() on the 
invokable and periodically interrupts the task thread until it has terminated.

try {
  invokable.cancel();
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//..executer.interrupt();try {
  executer.join(interruptInterval);
}catch (InterruptedException e) {  // we can ignore this}//..

Notice that TaskCanceler first send interrupt signal to task thread, following 
with join method. And since the task thread is now try to close 
DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed 
in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {break;
  }  try {
dataQueue.wait(1000); // when we receive an ack, we notify on
// dataQueue
  } catch (InterruptedException ie) {thrownewInterruptedIOException(
"Interrupted while waiting for data to be acknowledged by pipeline");
  }
}

I was so confused why TaskCanceler call executer.interrupt() before 
executer.join(interruptInterval). Can anyone help?

Best,

wangsan

Exception in BucketingSink when cancelling Flink job

2017-09-26 Thread wangsan
Hi,


We are currently using BucketingSink to save data into HDFS in parquet format. 
But when the flink job was cancelled, we always got Exception in 
BucketingSink's  close method. The datailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] 
[org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal 
of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be 
acknowledged by pipeline
at 
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at 
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
...

at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)


It seems that DFSOutputStream haven't been closed before task thread is force 
terminated. We found a similar problem in 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html
 , but setting "akka.ask.timeout" to a larger value does not work for us. So 
how can we make sure the stream is safely closed when cacelling a job?


Best,
wangsan