[GitHub] flink pull request #4422: [FLINK-7299][AVRO] Write GenericRecord using AvroO...

2017-09-01 Thread soniclavier
Github user soniclavier closed the pull request at:

https://github.com/apache/flink/pull/4422


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4422: [FLINK-7299][AVRO] Write GenericRecord using AvroOutputFo...

2017-08-31 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/4422
  
@aljoscha Thanks for taking a look. I have modified the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4422: [FLINK-7299][AVRO] Write GenericRecord using AvroOutputFo...

2017-07-31 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/4422
  
Link to passed Travis build : 
https://travis-ci.org/soniclavier/flink/builds/259243850


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4422: [FLINK-7299][AVRO] Write GenericRecord using AvroO...

2017-07-30 Thread soniclavier
GitHub user soniclavier opened a pull request:

https://github.com/apache/flink/pull/4422

[FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat

## What is the purpose of the change

Allow writing Avro GenericRecord using AvroOutputFormat. 

## Brief change log

- Added condition in AvroOutputFormat to check if avroValue is an instance 
of GenericRecord and create a GenericDatumWriter.

## Verifying this change

This change added tests and can be verified as follows:

 -  Added unit tests- testGenericRecord() in AvroOutputFormatTest to write 
GenericRecords.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
  - The serializers: yes
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature?: no (not a major 
feature)
  - If yes, how is the feature documented? not applicable 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/soniclavier/flink 
FLINK-7299-GenericRecord-in-AvroOutputFormat

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4422


commit 1c71ca43bcd5d4733e581f80637b531ba447e9dc
Author: Vishnu Viswanath <vishnu.viswanat...@gmail.com>
Date:   2017-07-31T04:58:28Z

[FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3763: [FLINK-6372][scripts] Fix change scala version of ...

2017-05-03 Thread soniclavier
Github user soniclavier closed the pull request at:

https://github.com/apache/flink/pull/3763


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3763: [FLINK-6372][scripts] Fix change scala version of ...

2017-04-24 Thread soniclavier
GitHub user soniclavier opened a pull request:

https://github.com/apache/flink/pull/3763

[FLINK-6372][scripts] Fix change scala version of flink-gelly-examples

- changed change-scala-version.sh

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/soniclavier/flink FLINK-6372

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3763.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3763


commit eb8c1e96f0cb64e387947cd194309b9763bed1b0
Author: Vishnu Viswanath <vishnu.viswanat...@gmail.com>
Date:   2017-04-24T16:47:14Z

[FLINK-6372][scripts] Fix change scala version of flink-gelly-examples




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality

2016-11-15 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2736
  
I don't think I can edit a closed issue, could you please make the edit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality

2016-11-15 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2736
  
Thank you for your guidance @aljoscha 😄. Could you please tell me what 
Fix version I should keep for these JIRAs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2736: [FLINK-4174] Enhance evictor functionality

2016-11-15 Thread soniclavier
Github user soniclavier closed the pull request at:

https://github.com/apache/flink/pull/2736


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality

2016-11-14 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2736
  
@aljoscha made the changes as per your comments, could you please review it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2355: [FLINK-4282]Add Offset Parameter to WindowAssigner...

2016-11-04 Thread soniclavier
Github user soniclavier commented on a diff in the pull request:

https://github.com/apache/flink/pull/2355#discussion_r86656836
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
 ---
@@ -236,6 +236,6 @@ public int compare(TimeWindow o1, TimeWindow o2) {
 * @return window start
 */
public static long getWindowStartWithOffset(long timestamp, long 
offset, long windowSize) {
--- End diff --

Is the windowSize actually slide size? I see that it is called by passing 
the slide instead of size `TimeWindow.getWindowStartWithOffset(timestamp, 
offset, slide);`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality

2016-11-04 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2736
  
@aljoscha I have made the changes, could you please review it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2736: [FLINK-4174] Enhance evictor functionality

2016-11-03 Thread soniclavier
Github user soniclavier commented on a diff in the pull request:

https://github.com/apache/flink/pull/2736#discussion_r86462637
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 ---
@@ -273,32 +284,93 @@ public void onProcessingTime(InternalTimer<K, W> 
timer) throws Exception {
return;
}
 
-   TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
-   if (triggerResult.isFire()) {
-   fire(context.window, contents);
-   }
+   TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
+   if (triggerResult.isFire()) {
+   fire(context.window, contents, 
windowState);
+   }
 
if (triggerResult.isPurge() || (!windowAssigner.isEventTime() 
&& isCleanupTime(context.window, timer.getTimestamp( {
cleanup(context.window, windowState, mergingWindows);
}
}
 
-   private void fire(W window, Iterable<StreamRecord> contents) throws 
Exception {
+   private void fire(W window, Iterable<StreamRecord> contents, 
ListState<StreamRecord> windowState) throws Exception {

timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
 
// Work around type system restrictions...
-   int toEvict = evictor.evict((Iterable) contents, 
Iterables.size(contents), context.window);
-
-   FluentIterable projectedContents = FluentIterable
+   FluentIterable<TimestampedValue> recordsWithTimestamp = 
FluentIterable
.from(contents)
-   .skip(toEvict)
-   .transform(new Function<StreamRecord, IN>() {
+   .transform(new Function<StreamRecord, 
TimestampedValue>() {
+   @Override
+   public TimestampedValue 
apply(StreamRecord input) {
+   return new 
TimestampedValue<>(input.getValue(), input.getTimestamp());
+   }
+   });
+   evictorContext.evictBefore(recordsWithTimestamp, 
Iterables.size(recordsWithTimestamp));
+
+   FluentIterable projectedContents = recordsWithTimestamp
+   .transform(new Function<TimestampedValue, IN>() {
@Override
-   public IN apply(StreamRecord input) {
+   public IN apply(TimestampedValue input) {
return input.getValue();
}
});
+
userFunction.apply(context.key, context.window, 
projectedContents, timestampedCollector);
+   evictorContext.evictAfter(recordsWithTimestamp, 
Iterables.size(recordsWithTimestamp));
+
+
+   //work around to fix FLINK-4369, remove the evicted elements 
from the windowState.
+   //this is inefficient, but there is no other way to remove 
elements from ListState, which is an AppendingState.
+   windowState.clear();
+   for(TimestampedValue record : recordsWithTimestamp) {
+   if (record.getTimestamp() < 0) {
--- End diff --

Regarding the copy method: Are you asking me to add a copy method in the 
TimestampedValue that will return a corresponding StreamRecord, something like 
this:

```java
/**
 * Creates a {@link StreamRecord} from this TimestampedValue.
 */
public StreamRecord getStreamRecord() {
StreamRecord streamRecord = new StreamRecord<>(value);
if (hasTimestamp) {
streamRecord.setTimestamp(timestamp);
}
return streamRecord;
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2736: [FLINK-4174] Enhance evictor functionality

2016-10-31 Thread soniclavier
GitHub user soniclavier opened a pull request:

https://github.com/apache/flink/pull/2736

[FLINK-4174] Enhance evictor functionality

The PR implements 
[FLINK-4174](https://issues.apache.org/jira/browse/FLINK-4174) Enhance window 
evictor as proposed in 
[FLIP-4](https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor)

Changes made:

- Modified the Evictor API, added two new methods evictBefore and 
evictAfter. Removed existing evict method
- Modified the corresponding implementations of CountEvictor, DeltaEvictor 
and TimeEvictor
- Created EvictorContext in the class EvictingWindowOperator, which is 
passed to the evictor methods
- Created TimestampedValue class which holds the value with corresponding 
timestamp. This class is exposed to the user via the evictBefore and evictAfter 
methods.
- Modified EvictingWindowOperator class 
- to call evictBefore before calling window function and evictAfter 
after calling window function.
- create FluentIterable<TimestampedValue> from 
Iterable<StreamRecord> contents, which is passed to the evictor methods.
- to clear the windowstate and add the remaining element(after the 
eviction) back to the state. (this fixes 
[FLINK-4369](https://issues.apache.org/jira/browse/FLINK-4369))
- Added test cases in EvictingWindowOperatorTest.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/soniclavier/flink FLINK-4174

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2736.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2736


commit bfd4fb509463dea0dc86e702c3aad0b0b9e70ff2
Author: Vishnu Viswanath <vishnu.viswanat...@gmail.com>
Date:   2016-10-31T23:21:04Z

[FLINK-4174] Enhance evictor functionality




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
One more question, is it possible  to configure the JobManager Actor path 
that the client connects to, it looks like it default to 
`akka://flink/user/jobmanager`.
In that way I can create a much more generic client. 

Note: I know this is initial version, just curious if this is already 
implemented. :)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Sorry, the compilation error was because the Tuple2 was scala.Tuple2 not 
flink Tuple2. Changing to `org.apache.flink.api.java.tuple.Tuple2` fixed the 
issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Thanks Ufuk & Stephen for the reply,

I tried the serializers suggested by you
```
val typeHint = new TypeHint[Tuple2[Long,String]](){}
val serializer = TypeInformation.of(typeHint).createSerializer(null)

//also tried this
val fieldSerializers = Array[TypeSerializer[_]](StringSerializer.INSTANCE, 
LongSerializer.INSTANCE)
val serializer2 = new 
TupleSerializer(classOf[Tuple2[Long,String]].asInstanceOf[Class[_]].asInstanceOf[Class[Tuple2[String,
 Long]]], fieldSerializers)
```

But both gives me compilation error at
```
val serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
  key,
  serializer2,
  VoidNamespace.INSTANCE,
  VoidNamespaceSerializer.INSTANCE)
```
the compilation error is:
```
Error:(43, 7) type mismatch;
found   : 
org.apache.flink.api.common.typeutils.TypeSerializer[org.apache.flink.api.java.tuple.Tuple2[Long,String]]
 required: 
org.apache.flink.api.common.typeutils.TypeSerializer[java.io.Serializable]
Note: org.apache.flink.api.java.tuple.Tuple2[Long,String] <: 
java.io.Serializable, but Java-defined class TypeSerializer is invariant in 
type T.
You may wish to investigate a wildcard type such as `_ <: 
java.io.Serializable`. (SLS 3.2.10)
  serializer,
  ^
```

I had seen this before when I tried to set the serializer from 
`queryableState.getKeySerializer` 

Note : It works fine when I use the longer version of serializer that I 
created.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Never mind, I was hitting with wrong key, it works now! Cheers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-07-21 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2051
  
Hi,

Continuing the discussion from the mailing list, I was able to go past the 
NettyConfig problem once I ran Flink in cluster mode ( I would still like to 
know if there is a way to run in local mode so that I can avoid running SBT 
assembly every time ).

But now I am stuck at error message "KvState does not hold any state for 
key/namespace." which I believe is because of my KeySerializer. Since I am 
running the QueryClient as a separate application, I don't have access to my 
queryableState to call `queryableState.getKeySerializer`

My key is a tuple of (Long,String) and this is the naive serializer that I 
wrote (which is probably wrong and I have never written a serializer before)

```
class KeySerializer extends TypeSerializerSingleton[(Long,String)]{

private val EMPTY: (Long,String) = (0,"")

override def createInstance(): (Long, String) = EMPTY

override def getLength: Int = return 2;

override def canEqual(o: scala.Any): Boolean = return 
o.isInstanceOf[(Long,String)]

override def copy(t: (Long, String)): (Long, String) = t

override def copy(t: (Long, String), t1: (Long, String)): (Long, 
String) = t

override def copy(dataInputView: DataInputView, dataOutputView: 
DataOutputView): Unit =  {
  dataOutputView.writeLong(dataInputView.readLong())
  StringValue.copyString(dataInputView,dataOutputView)
}

override def serialize(t: (Long, String), dataOutputView: 
DataOutputView): Unit = {
  dataOutputView.writeLong(t._1)
  StringValue.writeString(t._2,dataOutputView)
}

override def isImmutableType: Boolean = true

override def deserialize(dataInputView: DataInputView): (Long, String) 
= {
  val l = dataInputView.readLong()
  val s = StringValue.readString(dataInputView)
  (l,s)
}

override def deserialize(t: (Long, String), dataInputView: 
DataInputView): (Long, String) = deserialize(dataInputView)
  }
```

Can you tell me what I am doing wrong here? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---