[jira] [Updated] (HUDI-611) Add Impala Guide to Doc

2020-02-21 Thread Yanjia Gary Li (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanjia Gary Li updated HUDI-611:

Priority: Minor  (was: Major)

> Add Impala Guide to Doc
> ---
>
> Key: HUDI-611
> URL: https://issues.apache.org/jira/browse/HUDI-611
> Project: Apache Hudi (incubating)
>  Issue Type: New Feature
>Reporter: Yanjia Gary Li
>Assignee: Yanjia Gary Li
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Like sync to Hive. We need a tool to sync with Impala. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] garyli1019 opened a new pull request #1349: HUDI-611 Add Impala Guide to Doc

2020-02-21 Thread GitBox
garyli1019 opened a new pull request #1349: HUDI-611 Add Impala Guide to Doc
URL: https://github.com/apache/incubator-hudi/pull/1349
 
 
   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a 
pull request.*
   
   ## What is the purpose of the pull request
   
   *Add impala guide to doc*
   
   ## Brief change log
   
 - *add to querying_data English version and Chinese version*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test 
coverage.

- [ ] Commit message is descriptive of the change

- [ ] CI is green
   
- [ ] Necessary doc changes done or have another open PR
  
- [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (HUDI-611) Add Impala Guide to Doc

2020-02-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-611:

Labels: pull-request-available  (was: )

> Add Impala Guide to Doc
> ---
>
> Key: HUDI-611
> URL: https://issues.apache.org/jira/browse/HUDI-611
> Project: Apache Hudi (incubating)
>  Issue Type: New Feature
>Reporter: Yanjia Gary Li
>Assignee: Yanjia Gary Li
>Priority: Major
>  Labels: pull-request-available
>
> Like sync to Hive. We need a tool to sync with Impala. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042356#comment-17042356
 ] 

lamber-ken edited comment on HUDI-625 at 2/22/20 4:49 AM:
--

Hi, [~vinoth] I cached the Class info, 100x more than before, I found that when 
deserialize each entry, kryo always "KryoBase#newInstantiator" method.


Sorry, I checked the deserialized result, it's wrong (x), keep going !
||times||1||2||10||
|before|48859ms|107353ms|too long|
|now|494ms|590ms|2067ms|
{code:java}
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.SerializationUtils;
import org.objenesis.instantiator.ObjectInstantiator;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;

public class KryoTest3 {

public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": 
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
\"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},"
+ "{\"name\": \"fare\",\"type\": {\"type\":\"record\", 
\"name\":\"fare\",\"fields\": ["
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": 
\"currency\", \"type\": \"string\"}]}},"
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
\"default\": false} ]}";

public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);

public static GenericRecord generateGenericRecord() {
Random RAND = new Random(46474747);

GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
rec.put("_row_key", "rowKey");
rec.put("timestamp", "timestamp");
rec.put("rider", "riderName");
rec.put("driver", "driverName");
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
rec.put("_hoodie_is_deleted", false);
return rec;
}

public static void main(String[] args) throws Exception {

GenericRecord genericRecord = generateGenericRecord();

Kryo kryo = new KryoInstantiator().newKryo();

// store data bytes
List objectDatas = new LinkedList<>();
for (int i = 0; i < 1; i++) {
Output output = new Output(1, 4024);
kryo.writeClassAndObject(output, genericRecord);
output.close();
objectDatas.add(SerializationUtils.serialize(genericRecord));
}

long t1 = System.currentTimeMillis();

System.out.println("starting deserialize");

// deserialize
List datas = new LinkedList<>();
for (byte[] data : objectDatas) {
datas.add(kryo.readClassAndObject(new Input(data)));
}

long t2 = System.currentTimeMillis();

System.err.println("dese times: " + datas.size());
System.err.println("dese cost: " + (t2 - t1) + "ms");

}

private static class KryoInstantiator implements Serializable {

public Kryo newKryo() {

Kryo kryo = new KryoBase();
// ensure that kryo doesn't fail if classes are not registered with 
kryo.
kryo.setRegistrationRequired(false);
// This would be used for object initialization if nothing else 
works out.
kryo.setInstantiatorStrategy(new 
org.objenesis.strategy.StdInstantiatorStrategy());
// Handle cases where we may have an odd classloader setup like 
with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
}

private static class KryoBase extends Kryo {
@Override
protected Serializer newDefaultSerializer(Class type) {
final Serializer serializer = super.newDefaultSerializer(type);
if (serializer instanceof FieldSerializer) {
final FieldSerializer fieldSerializer = (FieldSerializer) 
serializer;
 

[jira] [Updated] (HUDI-611) Add Impala Guide to Doc

2020-02-21 Thread Yanjia Gary Li (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanjia Gary Li updated HUDI-611:

Summary: Add Impala Guide to Doc  (was: Impala sync tool)

> Add Impala Guide to Doc
> ---
>
> Key: HUDI-611
> URL: https://issues.apache.org/jira/browse/HUDI-611
> Project: Apache Hudi (incubating)
>  Issue Type: New Feature
>Reporter: Yanjia Gary Li
>Assignee: Yanjia Gary Li
>Priority: Major
>
> Like sync to Hive. We need a tool to sync with Impala. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042356#comment-17042356
 ] 

lamber-ken edited comment on HUDI-625 at 2/22/20 4:30 AM:
--

Hi, [~vinoth] I cached the Class info, 100x more than before.

I'm verifying the way is wrong or not.

 
||times||1||2||10||
|before|48859ms|107353ms|too long|
|now|494ms|590ms|2067ms|

 
{code:java}
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.SerializationUtils;
import org.objenesis.instantiator.ObjectInstantiator;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;

public class KryoTest3 {

public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": 
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
\"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},"
+ "{\"name\": \"fare\",\"type\": {\"type\":\"record\", 
\"name\":\"fare\",\"fields\": ["
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": 
\"currency\", \"type\": \"string\"}]}},"
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
\"default\": false} ]}";

public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);

public static GenericRecord generateGenericRecord() {
Random RAND = new Random(46474747);

GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
rec.put("_row_key", "rowKey");
rec.put("timestamp", "timestamp");
rec.put("rider", "riderName");
rec.put("driver", "driverName");
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
rec.put("_hoodie_is_deleted", false);
return rec;
}

public static void main(String[] args) throws Exception {

GenericRecord genericRecord = generateGenericRecord();

Kryo kryo = new KryoInstantiator().newKryo();

// store data bytes
List objectDatas = new LinkedList<>();
for (int i = 0; i < 1; i++) {
Output output = new Output(1, 4024);
kryo.writeClassAndObject(output, genericRecord);
output.close();
objectDatas.add(SerializationUtils.serialize(genericRecord));
}

long t1 = System.currentTimeMillis();

System.out.println("starting deserialize");

// deserialize
List datas = new LinkedList<>();
for (byte[] data : objectDatas) {
datas.add(kryo.readClassAndObject(new Input(data)));
}

long t2 = System.currentTimeMillis();

System.err.println("dese times: " + datas.size());
System.err.println("dese cost: " + (t2 - t1) + "ms");

}

private static class KryoInstantiator implements Serializable {

public Kryo newKryo() {

Kryo kryo = new KryoBase();
// ensure that kryo doesn't fail if classes are not registered with 
kryo.
kryo.setRegistrationRequired(false);
// This would be used for object initialization if nothing else 
works out.
kryo.setInstantiatorStrategy(new 
org.objenesis.strategy.StdInstantiatorStrategy());
// Handle cases where we may have an odd classloader setup like 
with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
}

private static class KryoBase extends Kryo {
@Override
protected Serializer newDefaultSerializer(Class type) {
final Serializer serializer = super.newDefaultSerializer(type);
if (serializer instanceof FieldSerializer) {
final FieldSerializer fieldSerializer = (FieldSerializer) 
serializer;
fieldSerializer.setIgnoreSyntheticFields(true);
}
return serialize

[jira] [Comment Edited] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042356#comment-17042356
 ] 

lamber-ken edited comment on HUDI-625 at 2/22/20 4:13 AM:
--

Hi, [~vinoth] I cached the Class info, 100x more than before

I'm verifying the way is wrong or not.

 
||times||1||2||10||
|before|48859ms|107353ms|too long|
|now|494ms|590ms|2067ms|
 
{code:java}
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.SerializationUtils;
import org.objenesis.instantiator.ObjectInstantiator;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;

public class KryoTest3 {

public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": 
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
\"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},"
+ "{\"name\": \"fare\",\"type\": {\"type\":\"record\", 
\"name\":\"fare\",\"fields\": ["
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": 
\"currency\", \"type\": \"string\"}]}},"
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
\"default\": false} ]}";

public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);

public static GenericRecord generateGenericRecord() {
Random RAND = new Random(46474747);

GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
rec.put("_row_key", "rowKey");
rec.put("timestamp", "timestamp");
rec.put("rider", "riderName");
rec.put("driver", "driverName");
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
rec.put("_hoodie_is_deleted", false);
return rec;
}

public static void main(String[] args) throws Exception {

GenericRecord genericRecord = generateGenericRecord();

Kryo kryo = new KryoInstantiator().newKryo();

// store data bytes
List objectDatas = new LinkedList<>();
for (int i = 0; i < 1; i++) {
Output output = new Output(1, 4024);
kryo.writeClassAndObject(output, genericRecord);
output.close();
objectDatas.add(SerializationUtils.serialize(genericRecord));
}

long t1 = System.currentTimeMillis();

System.out.println("starting deserialize");

// deserialize
List datas = new LinkedList<>();
for (byte[] data : objectDatas) {
datas.add(kryo.readClassAndObject(new Input(data)));
}

long t2 = System.currentTimeMillis();

System.err.println("dese times: " + datas.size());
System.err.println("dese cost: " + (t2 - t1) + "ms");

}

private static class KryoInstantiator implements Serializable {

public Kryo newKryo() {

Kryo kryo = new KryoBase();
// ensure that kryo doesn't fail if classes are not registered with 
kryo.
kryo.setRegistrationRequired(false);
// This would be used for object initialization if nothing else 
works out.
kryo.setInstantiatorStrategy(new 
org.objenesis.strategy.StdInstantiatorStrategy());
// Handle cases where we may have an odd classloader setup like 
with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
}

private static class KryoBase extends Kryo {
@Override
protected Serializer newDefaultSerializer(Class type) {
final Serializer serializer = super.newDefaultSerializer(type);
if (serializer instanceof FieldSerializer) {
final FieldSerializer fieldSerializer = (FieldSerializer) 
serializer;
fieldSerializer.setIgnoreSyntheticFields(true);
}
return serializer;

[jira] [Comment Edited] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042356#comment-17042356
 ] 

lamber-ken edited comment on HUDI-625 at 2/22/20 4:05 AM:
--

Hi, [~vinoth] I cached the Class info, 100x more than before

 
||times||1||2||10||
|before|48859ms|107353ms|too long|
|now|494ms|590ms|2067ms|

 
{code:java}
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.SerializationUtils;
import org.objenesis.instantiator.ObjectInstantiator;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;

public class KryoTest3 {

public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": 
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
\"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},"
+ "{\"name\": \"fare\",\"type\": {\"type\":\"record\", 
\"name\":\"fare\",\"fields\": ["
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": 
\"currency\", \"type\": \"string\"}]}},"
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
\"default\": false} ]}";

public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);

public static GenericRecord generateGenericRecord() {
Random RAND = new Random(46474747);

GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
rec.put("_row_key", "rowKey");
rec.put("timestamp", "timestamp");
rec.put("rider", "riderName");
rec.put("driver", "driverName");
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
rec.put("_hoodie_is_deleted", false);
return rec;
}

public static void main(String[] args) throws Exception {

GenericRecord genericRecord = generateGenericRecord();

Kryo kryo = new KryoInstantiator().newKryo();

// store data bytes
List objectDatas = new LinkedList<>();
for (int i = 0; i < 1; i++) {
Output output = new Output(1, 4024);
kryo.writeClassAndObject(output, genericRecord);
output.close();
objectDatas.add(SerializationUtils.serialize(genericRecord));
}

long t1 = System.currentTimeMillis();

System.out.println("starting deserialize");

// deserialize
List datas = new LinkedList<>();
for (byte[] data : objectDatas) {
datas.add(kryo.readClassAndObject(new Input(data)));
}

long t2 = System.currentTimeMillis();

System.err.println("dese times: " + datas.size());
System.err.println("dese cost: " + (t2 - t1) + "ms");

}

private static class KryoInstantiator implements Serializable {

public Kryo newKryo() {

Kryo kryo = new KryoBase();
// ensure that kryo doesn't fail if classes are not registered with 
kryo.
kryo.setRegistrationRequired(false);
// This would be used for object initialization if nothing else 
works out.
kryo.setInstantiatorStrategy(new 
org.objenesis.strategy.StdInstantiatorStrategy());
// Handle cases where we may have an odd classloader setup like 
with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
}

private static class KryoBase extends Kryo {
@Override
protected Serializer newDefaultSerializer(Class type) {
final Serializer serializer = super.newDefaultSerializer(type);
if (serializer instanceof FieldSerializer) {
final FieldSerializer fieldSerializer = (FieldSerializer) 
serializer;
fieldSerializer.setIgnoreSyntheticFields(true);
}
return serializer;
}

Map cache =

[jira] [Comment Edited] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042356#comment-17042356
 ] 

lamber-ken edited comment on HUDI-625 at 2/22/20 3:59 AM:
--

Hi, [~vinoth] I cached the Class info, 100x more than before

 

Before 

dese times: 1
 dese cost: 48859ms

 

Now
 dese times: 1
 dese cost: 494ms
{code:java}
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.SerializationUtils;
import org.objenesis.instantiator.ObjectInstantiator;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;

public class KryoTest3 {

public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": 
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
\"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},"
+ "{\"name\": \"fare\",\"type\": {\"type\":\"record\", 
\"name\":\"fare\",\"fields\": ["
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": 
\"currency\", \"type\": \"string\"}]}},"
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
\"default\": false} ]}";

public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);

public static GenericRecord generateGenericRecord() {
Random RAND = new Random(46474747);

GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
rec.put("_row_key", "rowKey");
rec.put("timestamp", "timestamp");
rec.put("rider", "riderName");
rec.put("driver", "driverName");
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
rec.put("_hoodie_is_deleted", false);
return rec;
}

public static void main(String[] args) throws Exception {

GenericRecord genericRecord = generateGenericRecord();

Kryo kryo = new KryoInstantiator().newKryo();

// store data bytes
List objectDatas = new LinkedList<>();
for (int i = 0; i < 1; i++) {
Output output = new Output(1, 4024);
kryo.writeClassAndObject(output, genericRecord);
output.close();
objectDatas.add(SerializationUtils.serialize(genericRecord));
}

long t1 = System.currentTimeMillis();

System.out.println("starting deserialize");

// deserialize
List datas = new LinkedList<>();
for (byte[] data : objectDatas) {
datas.add(kryo.readClassAndObject(new Input(data)));
}

long t2 = System.currentTimeMillis();

System.err.println("dese times: " + datas.size());
System.err.println("dese cost: " + (t2 - t1) + "ms");

}

private static class KryoInstantiator implements Serializable {

public Kryo newKryo() {

Kryo kryo = new KryoBase();
// ensure that kryo doesn't fail if classes are not registered with 
kryo.
kryo.setRegistrationRequired(false);
// This would be used for object initialization if nothing else 
works out.
kryo.setInstantiatorStrategy(new 
org.objenesis.strategy.StdInstantiatorStrategy());
// Handle cases where we may have an odd classloader setup like 
with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
}

private static class KryoBase extends Kryo {
@Override
protected Serializer newDefaultSerializer(Class type) {
final Serializer serializer = super.newDefaultSerializer(type);
if (serializer instanceof FieldSerializer) {
final FieldSerializer fieldSerializer = (FieldSerializer) 
serializer;
fieldSerializer.setIgnoreSyntheticFields(true);
}
return serializer;
}

Map cache = n

[jira] [Updated] (HUDI-561) hudi partition path config

2020-02-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-561:

Labels: pull-request-available  (was: )

> hudi partition path config
> --
>
> Key: HUDI-561
> URL: https://issues.apache.org/jira/browse/HUDI-561
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: DeltaStreamer
>Reporter: liujinhui
>Assignee: liujinhui
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.6.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The current hudi partition is in accordance with 
> hoodie.datasource.write.partitionpath.field = keyname
> example:
> keyname 2019/12/20
> Usually the time format may be -MM-dd HH: mm: ss or other
> -MM-dd HH: mm: ss cannot be partitioned correctly
> So I want to add configuration :
> hoodie.datasource.write.partitionpath.source.format = -MM-dd HH: mm: ss
> hoodie.datasource.write.partitionpath.target.format =  / MM / dd



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] vinothchandar commented on issue #1308: [WIP] [HUDI-561] partition path config

2020-02-21 Thread GitBox
vinothchandar commented on issue #1308: [WIP] [HUDI-561] partition path config
URL: https://github.com/apache/incubator-hudi/pull/1308#issuecomment-589914890
 
 
   Closing based on previous discussion. Please feel free to reopen if you 
think I am missing something


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] vinothchandar commented on issue #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer

2020-02-21 Thread GitBox
vinothchandar commented on issue #1165: [HUDI-76] Add CSV Source support for 
Hudi Delta Streamer
URL: https://github.com/apache/incubator-hudi/pull/1165#issuecomment-589914835
 
 
   @nsivabalan Please shepherd this across the finish line :) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] vinothchandar closed pull request #1308: [WIP] [HUDI-561] partition path config

2020-02-21 Thread GitBox
vinothchandar closed pull request #1308: [WIP] [HUDI-561] partition path config
URL: https://github.com/apache/incubator-hudi/pull/1308
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042356#comment-17042356
 ] 

lamber-ken commented on HUDI-625:
-

Hi, [~vinoth] I cached the Class info, 100x more

 

Before 

dese times: 1
dese cost: 48859ms

 

Now
dese times: 1
dese cost: 494ms
{code:java}
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.SerializationUtils;
import org.objenesis.instantiator.ObjectInstantiator;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;

public class KryoTest3 {

public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": 
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
\"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},"
+ "{\"name\": \"fare\",\"type\": {\"type\":\"record\", 
\"name\":\"fare\",\"fields\": ["
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": 
\"currency\", \"type\": \"string\"}]}},"
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
\"default\": false} ]}";

public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);

public static GenericRecord generateGenericRecord() {
Random RAND = new Random(46474747);

GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
rec.put("_row_key", "rowKey");
rec.put("timestamp", "timestamp");
rec.put("rider", "riderName");
rec.put("driver", "driverName");
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
rec.put("_hoodie_is_deleted", false);
return rec;
}

public static void main(String[] args) throws Exception {

GenericRecord genericRecord = generateGenericRecord();

Kryo kryo = new KryoInstantiator().newKryo();

// store data bytes
List objectDatas = new LinkedList<>();
for (int i = 0; i < 1; i++) {
Output output = new Output(1, 4024);
kryo.writeClassAndObject(output, genericRecord);
output.close();
objectDatas.add(SerializationUtils.serialize(genericRecord));
}

long t1 = System.currentTimeMillis();

System.out.println("starting deserialize");

// deserialize
List datas = new LinkedList<>();
for (byte[] data : objectDatas) {
datas.add(kryo.readClassAndObject(new Input(data)));
}

long t2 = System.currentTimeMillis();

System.err.println("dese times: " + datas.size());
System.err.println("dese cost: " + (t2 - t1) + "ms");

}

private static class KryoInstantiator implements Serializable {

public Kryo newKryo() {

Kryo kryo = new KryoBase();
// ensure that kryo doesn't fail if classes are not registered with 
kryo.
kryo.setRegistrationRequired(false);
// This would be used for object initialization if nothing else 
works out.
kryo.setInstantiatorStrategy(new 
org.objenesis.strategy.StdInstantiatorStrategy());
// Handle cases where we may have an odd classloader setup like 
with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
}

private static class KryoBase extends Kryo {
@Override
protected Serializer newDefaultSerializer(Class type) {
final Serializer serializer = super.newDefaultSerializer(type);
if (serializer instanceof FieldSerializer) {
final FieldSerializer fieldSerializer = (FieldSerializer) 
serializer;
fieldSerializer.setIgnoreSyntheticFields(true);
}
return serializer;
}

Map cache = new HashMap<>();

@Override
protected Obj

Build failed in Jenkins: hudi-snapshot-deployment-0.5 #196

2020-02-21 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 2.29 KB...]
/home/jenkins/tools/maven/apache-maven-3.5.4/boot:
plexus-classworlds-2.5.2.jar

/home/jenkins/tools/maven/apache-maven-3.5.4/conf:
logging
settings.xml
toolchains.xml

/home/jenkins/tools/maven/apache-maven-3.5.4/conf/logging:
simplelogger.properties

/home/jenkins/tools/maven/apache-maven-3.5.4/lib:
aopalliance-1.0.jar
cdi-api-1.0.jar
cdi-api.license
commons-cli-1.4.jar
commons-cli.license
commons-io-2.5.jar
commons-io.license
commons-lang3-3.5.jar
commons-lang3.license
ext
guava-20.0.jar
guice-4.2.0-no_aop.jar
jansi-1.17.1.jar
jansi-native
javax.inject-1.jar
jcl-over-slf4j-1.7.25.jar
jcl-over-slf4j.license
jsr250-api-1.0.jar
jsr250-api.license
maven-artifact-3.5.4.jar
maven-artifact.license
maven-builder-support-3.5.4.jar
maven-builder-support.license
maven-compat-3.5.4.jar
maven-compat.license
maven-core-3.5.4.jar
maven-core.license
maven-embedder-3.5.4.jar
maven-embedder.license
maven-model-3.5.4.jar
maven-model-builder-3.5.4.jar
maven-model-builder.license
maven-model.license
maven-plugin-api-3.5.4.jar
maven-plugin-api.license
maven-repository-metadata-3.5.4.jar
maven-repository-metadata.license
maven-resolver-api-1.1.1.jar
maven-resolver-api.license
maven-resolver-connector-basic-1.1.1.jar
maven-resolver-connector-basic.license
maven-resolver-impl-1.1.1.jar
maven-resolver-impl.license
maven-resolver-provider-3.5.4.jar
maven-resolver-provider.license
maven-resolver-spi-1.1.1.jar
maven-resolver-spi.license
maven-resolver-transport-wagon-1.1.1.jar
maven-resolver-transport-wagon.license
maven-resolver-util-1.1.1.jar
maven-resolver-util.license
maven-settings-3.5.4.jar
maven-settings-builder-3.5.4.jar
maven-settings-builder.license
maven-settings.license
maven-shared-utils-3.2.1.jar
maven-shared-utils.license
maven-slf4j-provider-3.5.4.jar
maven-slf4j-provider.license
org.eclipse.sisu.inject-0.3.3.jar
org.eclipse.sisu.inject.license
org.eclipse.sisu.plexus-0.3.3.jar
org.eclipse.sisu.plexus.license
plexus-cipher-1.7.jar
plexus-cipher.license
plexus-component-annotations-1.7.1.jar
plexus-component-annotations.license
plexus-interpolation-1.24.jar
plexus-interpolation.license
plexus-sec-dispatcher-1.4.jar
plexus-sec-dispatcher.license
plexus-utils-3.1.0.jar
plexus-utils.license
slf4j-api-1.7.25.jar
slf4j-api.license
wagon-file-3.1.0.jar
wagon-file.license
wagon-http-3.1.0-shaded.jar
wagon-http.license
wagon-provider-api-3.1.0.jar
wagon-provider-api.license

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/ext:
README.txt

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native:
freebsd32
freebsd64
linux32
linux64
osx
README.txt
windows32
windows64

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/freebsd32:
libjansi.so

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/freebsd64:
libjansi.so

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/linux32:
libjansi.so

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/linux64:
libjansi.so

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/osx:
libjansi.jnilib

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/windows32:
jansi.dll

/home/jenkins/tools/maven/apache-maven-3.5.4/lib/jansi-native/windows64:
jansi.dll
Finished /home/jenkins/tools/maven/apache-maven-3.5.4 Directory Listing :
Detected current version as: 
'HUDI_home=
0.5.2-SNAPSHOT'
[INFO] Scanning for projects...
[WARNING] 
[WARNING] Some problems were encountered while building the effective model for 
org.apache.hudi:hudi-spark_2.11:jar:0.5.2-SNAPSHOT
[WARNING] 'artifactId' contains an expression but should be a constant. @ 
org.apache.hudi:hudi-spark_${scala.binary.version}:[unknown-version], 

 line 26, column 15
[WARNING] 
[WARNING] Some problems were encountered while building the effective model for 
org.apache.hudi:hudi-utilities_2.11:jar:0.5.2-SNAPSHOT
[WARNING] 'artifactId' contains an expression but should be a constant. @ 
org.apache.hudi:hudi-utilities_${scala.binary.version}:[unknown-version], 

 line 26, column 15
[WARNING] 
[WARNING] Some problems were encountered while building the effective model for 
org.apache.hudi:hudi-spark-bundle_2.11:jar:0.5.2-SNAPSHOT
[WARNING] 'artifactId' contains an expression but should be a constant. @ 
org.apache.hudi:hudi-spark-bundle_${scala.binary.version}:[unknown-version], 

 line 26, column 15
[WARNING] 
[WARNING] Some problems were encountered while building the effective model for 
org.apache.hudi:hudi-utilities-bundle_2.11:jar:0.5.2-SNAPSHOT
[WARNING] 'artifactId' contains an e

[GitHub] [incubator-hudi] garyli1019 opened a new pull request #1348: HUDI-597 Enable incremental pulling from defined partitions

2020-02-21 Thread GitBox
garyli1019 opened a new pull request #1348: HUDI-597 Enable incremental pulling 
from defined partitions
URL: https://github.com/apache/incubator-hudi/pull/1348
 
 
   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a 
pull request.*
   
   ## What is the purpose of the pull request
   
   *Allow the user to do incremental pulling from certain partitions*
   
   ## Brief change log
   
 - *Added a new option in DataSourceReadOptions*
 - *Applied a glob pattern filter if the option is defined by user*
 - *Added a unit test for this feature*
   
   ## Verify this pull request
   
   This change added tests and can be verified as follows:
   
 - *Added a test in testCopyOnWriteStorage*
   
   ## Committer checklist
   
- [ ] Has a corresponding JIRA in PR title & commit

- [ ] Commit message is descriptive of the change

- [ ] CI is green
   
- [ ] Necessary doc changes done or have another open PR
  
- [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (HUDI-597) Enable incremental pulling from defined partitions

2020-02-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-597:

Labels: pull-request-available  (was: )

> Enable incremental pulling from defined partitions
> --
>
> Key: HUDI-597
> URL: https://issues.apache.org/jira/browse/HUDI-597
> Project: Apache Hudi (incubating)
>  Issue Type: New Feature
>Reporter: Yanjia Gary Li
>Assignee: Yanjia Gary Li
>Priority: Minor
>  Labels: pull-request-available
>
> For the use case that I only need to pull the incremental part of certain 
> partitions, I need to do the incremental pulling from the entire dataset 
> first then filtering in Spark.
> If we can use the folder partitions directly as part of the input path, it 
> could run faster by only load relevant parquet files.
> Example:
>  
> {code:java}
> spark.read.format("org.apache.hudi")
> .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY,DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL)
> .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
> .load(path, "year=2020/*/*/*")
>  
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042346#comment-17042346
 ] 

Vinoth Chandar commented on HUDI-625:
-

Okay I was able pack 10x more entries into memory from my changes.. but we 
still need to figure out this kryo issue.. so keep going

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875
> Total size in bytes of MemoryBasedMap => 83886580
> Number of entries in DiskBasedMap => 2849125
> Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>  
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
> at java.util.zip.ZipFile.getEntry(Native Method)
>  

[jira] [Comment Edited] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042346#comment-17042346
 ] 

Vinoth Chandar edited comment on HUDI-625 at 2/22/20 2:49 AM:
--

Okay I was able pack 10x more entries into memory from my changes.. but we 
still need to figure out this kryo issue.. so keep going ;)


was (Author: vc):
Okay I was able pack 10x more entries into memory from my changes.. but we 
still need to figure out this kryo issue.. so keep going

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875
> Total size in bytes of MemoryBasedMap => 83886580
> Number of entries in DiskBasedMap => 2849125
> Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file sp

[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042337#comment-17042337
 ] 

lamber-ken commented on HUDI-625:
-

But, when test following code snippet like hudi init the kyro does, slow

So, I think we can focus on the kryo instance (i)

 

starting deserialize
dese times: 1000
dese cost: 6019ms

 
{code:java}
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.reflectasm.ConstructorAccess;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.SerializationUtils;
import org.objenesis.instantiator.ObjectInstantiator;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;

public class KryoTest {

public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": 
\"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
\"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},"
+ "{\"name\": \"fare\",\"type\": {\"type\":\"record\", 
\"name\":\"fare\",\"fields\": ["
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": 
\"currency\", \"type\": \"string\"}]}},"
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
\"default\": false} ]}";

public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);

public static GenericRecord generateGenericRecord() {
Random RAND = new Random(46474747);

GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
rec.put("_row_key", "rowKey");
rec.put("timestamp", "timestamp");
rec.put("rider", "riderName");
rec.put("driver", "driverName");
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
rec.put("_hoodie_is_deleted", false);
return rec;
}

public static void main(String[] args) throws Exception {

GenericRecord genericRecord = generateGenericRecord();

Kryo kryo = new KryoInstantiator().newKryo();


// store data bytes
List objectDatas = new LinkedList<>();
for (int i = 0; i < 1000; i++) {
Output output = new Output(1, 4024);
kryo.writeClassAndObject(output, genericRecord);
output.close();
objectDatas.add(SerializationUtils.serialize(genericRecord));
}

long t1 = System.currentTimeMillis();

System.out.println("starting deserialize");

// deserialize
List datas = new LinkedList<>();
for (byte[] data : objectDatas) {
datas.add(kryo.readClassAndObject(new Input(data)));
}

long t2 = System.currentTimeMillis();

System.err.println("dese times: " + datas.size());
System.err.println("dese cost: " + (t2 - t1) + "ms");

}

private static class KryoInstantiator implements Serializable {

public Kryo newKryo() {

Kryo kryo = new KryoBase();
// ensure that kryo doesn't fail if classes are not registered with 
kryo.
kryo.setRegistrationRequired(false);
// This would be used for object initialization if nothing else 
works out.
kryo.setInstantiatorStrategy(new 
org.objenesis.strategy.StdInstantiatorStrategy());
// Handle cases where we may have an odd classloader setup like 
with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
}

private static class KryoBase extends Kryo {
@Override
protected Serializer newDefaultSerializer(Class type) {
final Serializer serializer = super.newDefaultSerializer(type);
if (serializer instanceof FieldSerializer) {
final FieldSerializer fieldSerializer = (FieldSerializer) 
serializer;
fieldSerializer.setIgnoreSyntheticFields(true);
}
return serializer;
}

@Override
p

[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042324#comment-17042324
 ] 

lamber-ken commented on HUDI-625:
-

I test kryo api, seems blazing fast

 

dese times: 10
dese cost: 167ms
{code:java}


  com.esotericsoftware
  kryo
  4.0.2



import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.BeanSerializer;

import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;

public class KryoTest {

public static void main(String[] args) throws Exception {

Kryo kryo = new Kryo();
kryo.register(TestBean.class, new BeanSerializer(kryo, TestBean.class));

TestBean testBean = new TestBean();

// store data bytes
List objectDatas = new LinkedList<>();
for (int i = 0; i < 10; i++) {
Output output = new Output(1, 1024);
kryo.writeClassAndObject(output, testBean);
output.close();
objectDatas.add(output.toBytes());
}

long t1 = System.currentTimeMillis();

// deserialize
List datas = new LinkedList<>();
for (byte[] data : objectDatas) {
datas.add(kryo.readClassAndObject(new Input(data)));
}

long t2 = System.currentTimeMillis();

System.err.println("dese times: " + datas.size());
System.err.println("dese cost: " + (t2 - t1) + "ms");

}

static class TestBean implements Serializable {

String _row_key;
String timestamp;
String rider;
String driver;
String begin_lat;

public TestBean() {
this._row_key = "_row_key" + System.currentTimeMillis();
this.timestamp = "timestamp" + System.currentTimeMillis();
this.rider = "rider" + System.currentTimeMillis();
this.driver = "driver" + System.currentTimeMillis();
this.begin_lat = "begin_lat" + System.currentTimeMillis();
}

@Override
public String toString() {
return "TestBean{" +
"_row_key='" + _row_key + '\'' +
", timestamp='" + timestamp + '\'' +
", rider='" + rider + '\'' +
", driver='" + driver + '\'' +
", begin_lat='" + begin_lat + '\'' +
'}';
}
}

}

{code}

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875
> Total size in bytes of MemoryBasedMap => 83886580
> Number of entries in DiskBasedMap => 2849125
> Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upse

[jira] [Commented] (HUDI-628) MultiPartKeysValueExtractor does not work with run_sync_tool.sh

2020-02-21 Thread Balaji Varadarajan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042289#comment-17042289
 ] 

Balaji Varadarajan commented on HUDI-628:
-

@Andrew Wong, This is expected if you use MultiPartKeysValueExtractor as it 
splits by "/". You might want to give 3 fields as partition fields (continent, 
country, city) for  "americas/brazil/sao_paulo". If you want to treat them as 
one field, you can simply add a new implementation for PartitionValueExtractor 
and plug it in.  

> MultiPartKeysValueExtractor does not work with run_sync_tool.sh
> ---
>
> Key: HUDI-628
> URL: https://issues.apache.org/jira/browse/HUDI-628
> Project: Apache Hudi (incubating)
>  Issue Type: Bug
>Reporter: Andrew Wong
>Assignee: Balaji Varadarajan
>Priority: Major
> Attachments: stack_trace.txt
>
>
> The [https://hudi.apache.org/docs/quick-start-guide.html] example data has a 
> column `partitionpath` which holds values like `americas/brazil/sao_paulo`. 
> Using the docker environment's spark-shell, you can change the basePath from 
> the quickstart to save to hdfs://user/hive/warehouse/hudi_trips_cow and write 
> the table. Then you can see the folder in the HDFS browser, similar to the 
> stock_ticks_cow folder created in the docker demo.
> However, if you try to use run_sync_tool.sh to sync the table to Hive, you 
> get the error: "java.lang.IllegalArgumentException: Partition key parts 
> [partitionpath] does not match with partition values [americas, brazil, 
> sao_paulo]. Check partition strategy. "
> {quote}{{/var/hoodie/ws/hudi-hive/run_sync_tool.sh --jdbc-url 
> jdbc:hive2://hiveserver:1 --user hive --pass hive --partitioned-by 
> partitionpath --partition-value-extractor 
> org.apache.hudi.hive.MultiPartKeysValueExtractor -MultiPartKeysValueExtractor 
> -base-path /user/hive/warehouse/hudi_trips_cow --database default --table 
> hudi_trips_cow}}
> {quote}
> This error is thrown in `HoodieHiveClient.getPartitionClause`, which uses 
> `extractPartitionValuesInPath` to get a list of partitionValues. The problem 
> is that it compares the length of the partitionValues to the length of the 
> partitionField. In this example, there is only 1 partitionField, 
> "partitionpath," which is split into 3 partitionValues. Thus the check fails 
> and throws the exception. 
> See 
> [https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java#L182]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (HUDI-628) MultiPartKeysValueExtractor does not work with run_sync_tool.sh

2020-02-21 Thread Balaji Varadarajan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Balaji Varadarajan reassigned HUDI-628:
---

Assignee: Balaji Varadarajan

> MultiPartKeysValueExtractor does not work with run_sync_tool.sh
> ---
>
> Key: HUDI-628
> URL: https://issues.apache.org/jira/browse/HUDI-628
> Project: Apache Hudi (incubating)
>  Issue Type: Bug
>Reporter: Andrew Wong
>Assignee: Balaji Varadarajan
>Priority: Major
> Attachments: stack_trace.txt
>
>
> The [https://hudi.apache.org/docs/quick-start-guide.html] example data has a 
> column `partitionpath` which holds values like `americas/brazil/sao_paulo`. 
> Using the docker environment's spark-shell, you can change the basePath from 
> the quickstart to save to hdfs://user/hive/warehouse/hudi_trips_cow and write 
> the table. Then you can see the folder in the HDFS browser, similar to the 
> stock_ticks_cow folder created in the docker demo.
> However, if you try to use run_sync_tool.sh to sync the table to Hive, you 
> get the error: "java.lang.IllegalArgumentException: Partition key parts 
> [partitionpath] does not match with partition values [americas, brazil, 
> sao_paulo]. Check partition strategy. "
> {quote}{{/var/hoodie/ws/hudi-hive/run_sync_tool.sh --jdbc-url 
> jdbc:hive2://hiveserver:1 --user hive --pass hive --partitioned-by 
> partitionpath --partition-value-extractor 
> org.apache.hudi.hive.MultiPartKeysValueExtractor -MultiPartKeysValueExtractor 
> -base-path /user/hive/warehouse/hudi_trips_cow --database default --table 
> hudi_trips_cow}}
> {quote}
> This error is thrown in `HoodieHiveClient.getPartitionClause`, which uses 
> `extractPartitionValuesInPath` to get a list of partitionValues. The problem 
> is that it compares the length of the partitionValues to the length of the 
> partitionField. In this example, there is only 1 partitionField, 
> "partitionpath," which is split into 3 partitionValues. Thus the check fails 
> and throws the exception. 
> See 
> [https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java#L182]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (HUDI-409) Replace Log Magic header with a secure hash to avoid clashes with data

2020-02-21 Thread Ramachandran M S (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ramachandran M S reassigned HUDI-409:
-

Assignee: Ramachandran M S  (was: Nishith Agarwal)

> Replace Log Magic header with a secure hash to avoid clashes with data
> --
>
> Key: HUDI-409
> URL: https://issues.apache.org/jira/browse/HUDI-409
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Common Core
>Reporter: Nishith Agarwal
>Assignee: Ramachandran M S
>Priority: Major
> Fix For: 0.5.2
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] ramachandranms commented on issue #1347: [HUDI-627] Aggregate code coverage and publish to codecov.io during CI

2020-02-21 Thread GitBox
ramachandranms commented on issue #1347: [HUDI-627] Aggregate code coverage and 
publish to codecov.io during CI
URL: https://github.com/apache/incubator-hudi/pull/1347#issuecomment-589883370
 
 
   we have a code coverage report already in 
[codecov.io](https://codecov.io/gh/apache/incubator-hudi) through this PR


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (HUDI-628) MultiPartKeysValueExtractor does not work with run_sync_tool.sh

2020-02-21 Thread Andrew Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Wong updated HUDI-628:
-
Description: 
The [https://hudi.apache.org/docs/quick-start-guide.html] example data has a 
column `partitionpath` which holds values like `americas/brazil/sao_paulo`. 
Using the docker environment's spark-shell, you can change the basePath from 
the quickstart to save to hdfs://user/hive/warehouse/hudi_trips_cow and write 
the table. Then you can see the folder in the HDFS browser, similar to the 
stock_ticks_cow folder created in the docker demo.

However, if you try to use run_sync_tool.sh to sync the table to Hive, you get 
the error: "java.lang.IllegalArgumentException: Partition key parts 
[partitionpath] does not match with partition values [americas, brazil, 
sao_paulo]. Check partition strategy. "
{quote}{{/var/hoodie/ws/hudi-hive/run_sync_tool.sh --jdbc-url 
jdbc:hive2://hiveserver:1 --user hive --pass hive --partitioned-by 
partitionpath --partition-value-extractor 
org.apache.hudi.hive.MultiPartKeysValueExtractor -MultiPartKeysValueExtractor 
-base-path /user/hive/warehouse/hudi_trips_cow --database default --table 
hudi_trips_cow}}
{quote}
This error is thrown in `HoodieHiveClient.getPartitionClause`, which uses 
`extractPartitionValuesInPath` to get a list of partitionValues. The problem is 
that it compares the length of the partitionValues to the length of the 
partitionField. In this example, there is only 1 partitionField, 
"partitionpath," which is split into 3 partitionValues. Thus the check fails 
and throws the exception. 

See 
[https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java#L182]

 

  was:
The [https://hudi.apache.org/docs/quick-start-guide.html] example data has a 
column `partitionpath` which holds values like `americas/brazil/sao_paulo`. 
Using the docker environment's spark-shell, you can change the basePath from 
the quickstart to save to hdfs://user/hive/warehouse/hudi_trips_cow. Then you 
can see the folder in the HDFS browser, similar to the stock_ticks_cow folder 
created in the docker demo.

However, if you try to use run_sync_tool.sh to sync the table to Hive, you get 
the error: "java.lang.IllegalArgumentException: Partition key parts 
[partitionpath] does not match with partition values [americas, brazil, 
sao_paulo]. Check partition strategy. "
{quote}{{/var/hoodie/ws/hudi-hive/run_sync_tool.sh --jdbc-url 
jdbc:hive2://hiveserver:1 --user hive --pass hive --partitioned-by 
partitionpath --partition-value-extractor 
org.apache.hudi.hive.MultiPartKeysValueExtractor -MultiPartKeysValueExtractor 
-base-path /user/hive/warehouse/hudi_trips_cow --database default --table 
hudi_trips_cow}}
{quote}
This error is thrown in `HoodieHiveClient.getPartitionClause`, which uses 
`extractPartitionValuesInPath` to get a list of partitionValues. The problem is 
that it compares the length of the partitionValues to the length of the 
partitionField. In this example, there is only 1 partitionField, 
"partitionpath," which is split into 3 partitionValues. Thus the check fails 
and throws the exception. 

See 
[https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java#L182]

 


> MultiPartKeysValueExtractor does not work with run_sync_tool.sh
> ---
>
> Key: HUDI-628
> URL: https://issues.apache.org/jira/browse/HUDI-628
> Project: Apache Hudi (incubating)
>  Issue Type: Bug
>Reporter: Andrew Wong
>Priority: Major
> Attachments: stack_trace.txt
>
>
> The [https://hudi.apache.org/docs/quick-start-guide.html] example data has a 
> column `partitionpath` which holds values like `americas/brazil/sao_paulo`. 
> Using the docker environment's spark-shell, you can change the basePath from 
> the quickstart to save to hdfs://user/hive/warehouse/hudi_trips_cow and write 
> the table. Then you can see the folder in the HDFS browser, similar to the 
> stock_ticks_cow folder created in the docker demo.
> However, if you try to use run_sync_tool.sh to sync the table to Hive, you 
> get the error: "java.lang.IllegalArgumentException: Partition key parts 
> [partitionpath] does not match with partition values [americas, brazil, 
> sao_paulo]. Check partition strategy. "
> {quote}{{/var/hoodie/ws/hudi-hive/run_sync_tool.sh --jdbc-url 
> jdbc:hive2://hiveserver:1 --user hive --pass hive --partitioned-by 
> partitionpath --partition-value-extractor 
> org.apache.hudi.hive.MultiPartKeysValueExtractor -MultiPartKeysValueExtractor 
> -base-path /user/hive/warehouse/hudi_trips_cow --database default --table 
> hudi_trips_cow}}
> {quote}
> This error is thrown in `HoodieHiveClient.getPartitionClause`, which uses 
> `extractPartitionValuesIn

[jira] [Updated] (HUDI-628) MultiPartKeysValueExtractor does not work with run_sync_tool.sh

2020-02-21 Thread Andrew Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Wong updated HUDI-628:
-
Description: 
The [https://hudi.apache.org/docs/quick-start-guide.html] example data has a 
column `partitionpath` which holds values like `americas/brazil/sao_paulo`. 
Using the docker environment's spark-shell, you can change the basePath from 
the quickstart to save to hdfs://user/hive/warehouse/hudi_trips_cow. Then you 
can see the folder in the HDFS browser, similar to the stock_ticks_cow folder 
created in the docker demo.

However, if you try to use run_sync_tool.sh to sync the table to Hive, you get 
the error: "java.lang.IllegalArgumentException: Partition key parts 
[partitionpath] does not match with partition values [americas, brazil, 
sao_paulo]. Check partition strategy. "
{quote}{{/var/hoodie/ws/hudi-hive/run_sync_tool.sh --jdbc-url 
jdbc:hive2://hiveserver:1 --user hive --pass hive --partitioned-by 
partitionpath --partition-value-extractor 
org.apache.hudi.hive.MultiPartKeysValueExtractor -MultiPartKeysValueExtractor 
-base-path /user/hive/warehouse/hudi_trips_cow --database default --table 
hudi_trips_cow}}
{quote}
This error is thrown in `HoodieHiveClient.getPartitionClause`, which uses 
`extractPartitionValuesInPath` to get a list of partitionValues. The problem is 
that it compares the length of the partitionValues to the length of the 
partitionField. In this example, there is only 1 partitionField, 
"partitionpath," which is split into 3 partitionValues. Thus the check fails 
and throws the exception. 

See 
[https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java#L182]

 

  was:
The [https://hudi.apache.org/docs/quick-start-guide.html] example data has a 
column `partitionpath` which holds values like `americas/brazil/sao_paulo`. 
Using the docker environment, you can change the basePath from the quickstart 
to save to hdfs://user/hive/warehouse/hudi_trips_cow. Then you can see the 
folder in the HDFS browser, similar to the stock_ticks_cow folder created in 
the docker demo.

However, if you try to use run_sync_tool.sh to sync the table to Hive, you get 
the error: "java.lang.IllegalArgumentException: Partition key parts 
[partitionpath] does not match with partition values [americas, brazil, 
sao_paulo]. Check partition strategy. "
{quote}{{/var/hoodie/ws/hudi-hive/run_sync_tool.sh --jdbc-url 
jdbc:hive2://hiveserver:1 --user hive --pass hive --partitioned-by 
partitionpath --partition-value-extractor 
org.apache.hudi.hive.MultiPartKeysValueExtractor -MultiPartKeysValueExtractor 
-base-path /user/hive/warehouse/hudi_trips_cow --database default --table 
hudi_trips_cow}}
{quote}
This error is thrown in `HoodieHiveClient.getPartitionClause`, which uses 
`extractPartitionValuesInPath` to get a list of partitionValues. The problem is 
that it compares the length of the partitionValues to the length of the 
partitionField. In this example, there is only 1 partitionField, 
"partitionpath," which is split into 3 partitionValues. Thus the check fails 
and throws the exception. 

See 
[https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java#L182]

 


> MultiPartKeysValueExtractor does not work with run_sync_tool.sh
> ---
>
> Key: HUDI-628
> URL: https://issues.apache.org/jira/browse/HUDI-628
> Project: Apache Hudi (incubating)
>  Issue Type: Bug
>Reporter: Andrew Wong
>Priority: Major
> Attachments: stack_trace.txt
>
>
> The [https://hudi.apache.org/docs/quick-start-guide.html] example data has a 
> column `partitionpath` which holds values like `americas/brazil/sao_paulo`. 
> Using the docker environment's spark-shell, you can change the basePath from 
> the quickstart to save to hdfs://user/hive/warehouse/hudi_trips_cow. Then you 
> can see the folder in the HDFS browser, similar to the stock_ticks_cow folder 
> created in the docker demo.
> However, if you try to use run_sync_tool.sh to sync the table to Hive, you 
> get the error: "java.lang.IllegalArgumentException: Partition key parts 
> [partitionpath] does not match with partition values [americas, brazil, 
> sao_paulo]. Check partition strategy. "
> {quote}{{/var/hoodie/ws/hudi-hive/run_sync_tool.sh --jdbc-url 
> jdbc:hive2://hiveserver:1 --user hive --pass hive --partitioned-by 
> partitionpath --partition-value-extractor 
> org.apache.hudi.hive.MultiPartKeysValueExtractor -MultiPartKeysValueExtractor 
> -base-path /user/hive/warehouse/hudi_trips_cow --database default --table 
> hudi_trips_cow}}
> {quote}
> This error is thrown in `HoodieHiveClient.getPartitionClause`, which uses 
> `extractPartitionValuesInPath` to get a list of partitionValues. The problem 
>

[jira] [Updated] (HUDI-628) MultiPartKeysValueExtractor does not work with run_sync_tool.sh

2020-02-21 Thread Andrew Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Wong updated HUDI-628:
-
Summary: MultiPartKeysValueExtractor does not work with run_sync_tool.sh  
(was: MultiPartKeysValueExtractor does not work with 
HoodieHiveClient.getPartitionClause)

> MultiPartKeysValueExtractor does not work with run_sync_tool.sh
> ---
>
> Key: HUDI-628
> URL: https://issues.apache.org/jira/browse/HUDI-628
> Project: Apache Hudi (incubating)
>  Issue Type: Bug
>Reporter: Andrew Wong
>Priority: Major
> Attachments: stack_trace.txt
>
>
> The [https://hudi.apache.org/docs/quick-start-guide.html] example data has a 
> column `partitionpath` which holds values like `americas/brazil/sao_paulo`. 
> Using the docker environment, you can change the basePath from the quickstart 
> to save to hdfs://user/hive/warehouse/hudi_trips_cow. Then you can see the 
> folder in the HDFS browser, similar to the stock_ticks_cow folder created in 
> the docker demo.
> However, if you try to use run_sync_tool.sh to sync the table to Hive, you 
> get the error: "java.lang.IllegalArgumentException: Partition key parts 
> [partitionpath] does not match with partition values [americas, brazil, 
> sao_paulo]. Check partition strategy. "
> {quote}{{/var/hoodie/ws/hudi-hive/run_sync_tool.sh --jdbc-url 
> jdbc:hive2://hiveserver:1 --user hive --pass hive --partitioned-by 
> partitionpath --partition-value-extractor 
> org.apache.hudi.hive.MultiPartKeysValueExtractor -MultiPartKeysValueExtractor 
> -base-path /user/hive/warehouse/hudi_trips_cow --database default --table 
> hudi_trips_cow}}
> {quote}
> This error is thrown in `HoodieHiveClient.getPartitionClause`, which uses 
> `extractPartitionValuesInPath` to get a list of partitionValues. The problem 
> is that it compares the length of the partitionValues to the length of the 
> partitionField. In this example, there is only 1 partitionField, 
> "partitionpath," which is split into 3 partitionValues. Thus the check fails 
> and throws the exception. 
> See 
> [https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java#L182]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (HUDI-628) MultiPartKeysValueExtractor does not work with HoodieHiveClient.getPartitionClause

2020-02-21 Thread Andrew Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Wong updated HUDI-628:
-
Attachment: stack_trace.txt

> MultiPartKeysValueExtractor does not work with 
> HoodieHiveClient.getPartitionClause
> --
>
> Key: HUDI-628
> URL: https://issues.apache.org/jira/browse/HUDI-628
> Project: Apache Hudi (incubating)
>  Issue Type: Bug
>Reporter: Andrew Wong
>Priority: Major
> Attachments: stack_trace.txt
>
>
> The [https://hudi.apache.org/docs/quick-start-guide.html] example data has a 
> column `partitionpath` which holds values like `americas/brazil/sao_paulo`. 
> Using the docker environment, you can change the basePath from the quickstart 
> to save to hdfs://user/hive/warehouse/hudi_trips_cow. Then you can see the 
> folder in the HDFS browser, similar to the stock_ticks_cow folder created in 
> the docker demo.
> However, if you try to use run_sync_tool.sh to sync the table to Hive, you 
> get the error: "java.lang.IllegalArgumentException: Partition key parts 
> [partitionpath] does not match with partition values [americas, brazil, 
> sao_paulo]. Check partition strategy. "
> {quote}{{/var/hoodie/ws/hudi-hive/run_sync_tool.sh --jdbc-url 
> jdbc:hive2://hiveserver:1 --user hive --pass hive --partitioned-by 
> partitionpath --partition-value-extractor 
> org.apache.hudi.hive.MultiPartKeysValueExtractor -MultiPartKeysValueExtractor 
> -base-path /user/hive/warehouse/hudi_trips_cow --database default --table 
> hudi_trips_cow}}
> {quote}
> This error is thrown in `HoodieHiveClient.getPartitionClause`, which uses 
> `extractPartitionValuesInPath` to get a list of partitionValues. The problem 
> is that it compares the length of the partitionValues to the length of the 
> partitionField. In this example, there is only 1 partitionField, 
> "partitionpath," which is split into 3 partitionValues. Thus the check fails 
> and throws the exception. 
> See 
> [https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java#L182]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (HUDI-628) MultiPartKeysValueExtractor does not work with HoodieHiveClient.getPartitionClause

2020-02-21 Thread Andrew Wong (Jira)
Andrew Wong created HUDI-628:


 Summary: MultiPartKeysValueExtractor does not work with 
HoodieHiveClient.getPartitionClause
 Key: HUDI-628
 URL: https://issues.apache.org/jira/browse/HUDI-628
 Project: Apache Hudi (incubating)
  Issue Type: Bug
Reporter: Andrew Wong


The [https://hudi.apache.org/docs/quick-start-guide.html] example data has a 
column `partitionpath` which holds values like `americas/brazil/sao_paulo`. 
Using the docker environment, you can change the basePath from the quickstart 
to save to hdfs://user/hive/warehouse/hudi_trips_cow. Then you can see the 
folder in the HDFS browser, similar to the stock_ticks_cow folder created in 
the docker demo.

However, if you try to use run_sync_tool.sh to sync the table to Hive, you get 
the error: "java.lang.IllegalArgumentException: Partition key parts 
[partitionpath] does not match with partition values [americas, brazil, 
sao_paulo]. Check partition strategy. "
{quote}{{/var/hoodie/ws/hudi-hive/run_sync_tool.sh --jdbc-url 
jdbc:hive2://hiveserver:1 --user hive --pass hive --partitioned-by 
partitionpath --partition-value-extractor 
org.apache.hudi.hive.MultiPartKeysValueExtractor -MultiPartKeysValueExtractor 
-base-path /user/hive/warehouse/hudi_trips_cow --database default --table 
hudi_trips_cow}}
{quote}
This error is thrown in `HoodieHiveClient.getPartitionClause`, which uses 
`extractPartitionValuesInPath` to get a list of partitionValues. The problem is 
that it compares the length of the partitionValues to the length of the 
partitionField. In this example, there is only 1 partitionField, 
"partitionpath," which is split into 3 partitionValues. Thus the check fails 
and throws the exception. 

See 
[https://github.com/apache/incubator-hudi/blob/master/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java#L182]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (HUDI-479) Eliminate use of guava if possible

2020-02-21 Thread Suneel Marthi (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suneel Marthi updated HUDI-479:
---
Fix Version/s: (was: 0.6.0)
   0.5.2

> Eliminate use of guava if possible
> --
>
> Key: HUDI-479
> URL: https://issues.apache.org/jira/browse/HUDI-479
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Code Cleanup
>Reporter: Vinoth Chandar
>Assignee: Suneel Marthi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.5.2
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (HUDI-627) Publish coverage to codecov.io

2020-02-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-627:

Labels: pull-request-available  (was: )

> Publish coverage to codecov.io
> --
>
> Key: HUDI-627
> URL: https://issues.apache.org/jira/browse/HUDI-627
> Project: Apache Hudi (incubating)
>  Issue Type: Sub-task
>Reporter: Ramachandran M S
>Assignee: Ramachandran M S
>Priority: Major
>  Labels: pull-request-available
>
> * Publish the coverage to codecov.io on every build
>  * Fix code coverage to pickup cross module testing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [incubator-hudi] ramachandranms opened a new pull request #1347: [HUDI-627] Aggregate code coverage and publish to codecov.io during CI

2020-02-21 Thread GitBox
ramachandranms opened a new pull request #1347: [HUDI-627] Aggregate code 
coverage and publish to codecov.io during CI
URL: https://github.com/apache/incubator-hudi/pull/1347
 
 
   ## What is the purpose of the pull request
   
   - This PR aggregates code coverage across modules to get a holistic coverage 
number (since we have tests that gets run across modules. e.g. hudi-client 
tests classes in hudi-common)
   - This also published the report to codecov.io (e.g. [report from the 
fork](https://codecov.io/gh/ramachandranms/incubator-hudi))
   - Above step automatically adds coverage report to every PR (e.g. [PR from 
fork](https://github.com/ramachandranms/incubator-hudi/pull/2))
   
   ## Brief change log
   
   - Modified pom files to include coverage in missing modules (hudi-hadoop-mr, 
hudi-spark & hudi-timeline-service)
   - Added a wrapper module for aggregation (due to limitations in jacoco). 
This can be remove in future once [this 
PR](https://github.com/jacoco/jacoco/pull/1007) gets merged to a release.
   
   ## Verify this pull request
   
   - Ran travis in private fork and verified the published report & PR.
   
   ## Committer checklist
   
- [ ] Has a corresponding JIRA in PR title & commit

- [ ] Commit message is descriptive of the change

- [ ] CI is green
   
- [ ] Necessary doc changes done or have another open PR
  
- [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1150: [HUDI-288]: Add support for ingesting multiple kafka streams in a single DeltaStreamer deployment

2020-02-21 Thread GitBox
pratyakshsharma commented on a change in pull request #1150: [HUDI-288]: Add 
support for ingesting multiple kafka streams in a single DeltaStreamer 
deployment
URL: https://github.com/apache/incubator-hudi/pull/1150#discussion_r382758933
 
 

 ##
 File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/TableConfig.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/*
+Represents object with all the topic level overrides for multi table delta 
streamer execution
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
 
 Review comment:
   @bvaradar since we are going ahead with DFSPropertiesConfiguration, I am 
going to remove TableConfig class completely. 
   
   Regarding comment having same source, yes I meant same source type. But 
since source class gets passed as command line parameter (--source-class), it 
is better to put this restriction. Because we cannot override command line 
parameter in source <-> sink config key value pairs. If you really want to have 
this flexibility of overriding source type also for every pair, I will have to 
define one new property in DataSourceWriteOptions and use it to configure 
source class. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1150: [HUDI-288]: Add support for ingesting multiple kafka streams in a single DeltaStreamer deployment

2020-02-21 Thread GitBox
pratyakshsharma commented on a change in pull request #1150: [HUDI-288]: Add 
support for ingesting multiple kafka streams in a single DeltaStreamer 
deployment
URL: https://github.com/apache/incubator-hudi/pull/1150#discussion_r382758933
 
 

 ##
 File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/TableConfig.java
 ##
 @@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/*
+Represents object with all the topic level overrides for multi table delta 
streamer execution
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
 
 Review comment:
   @bvaradar yes since we are going ahead with DFSPropertiesConfiguration, I am 
going to remove TableConfig class completely. 
   
   Regarding comment having same source, yes I meant same source type. But 
since source class gets passed as command line parameter (--source-class), it 
is better to put this restriction. Because we cannot override command line 
parameter in source <-> sink config key value pairs. If you really want to have 
this flexibility of overriding source type also for every pair, I will have to 
define one new property in DataSourceWriteOptions and use it to configure 
source class. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042044#comment-17042044
 ] 

Vinoth Chandar edited comment on HUDI-625 at 2/21/20 5:46 PM:
--

let's do atleast a million records for these, kind of things... :) yeah I 
intentionally tested a java native class like string to weed our DiskBasedMap 
itself as the culprit.. 

I am testing a PoC to make the map entries much smaller... it will help..  But 
we need to understand the deserialization problem.. 5ms/deserialization. We 
need to understand if that's expected. 


was (Author: vc):
let's do atleast a million records for these, kind of things... :) yeah I 
intentionally tested a java native class like string to weed our DiskBasedMap 
itself as the culprit.. 

 

I am testing a PoC to make the map entries much smaller... it will help..  But 
we need to understand the deserialization problem

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875
> Total size in bytes of MemoryBasedMap => 83886580
> Number of entries in DiskBasedMap => 2849125
> Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>

[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar updated HUDI-625:

Description: 
[https://github.com/apache/incubator-hudi/issues/1328]

 

 So what's going on here is that each entry (single data field) is estimated to 
be around 500-750 bytes in memory and things spill a lot... 
{code:java}
20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 for 
3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 partitionPath=default}, 
currentLocation='HoodieRecordLocation {instantTime=20200220225748, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
newLocation='HoodieRecordLocation {instantTime=20200220225921, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
 
{code:java}
INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875
Total size in bytes of MemoryBasedMap => 83886580
Number of entries in DiskBasedMap => 2849125
Size of file spilled to disk => 1067101739 {code}
h2. Reproduce steps

 
{code:java}
export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
--executor-memory 6G \
--packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
{code}
 
{code:java}
val HUDI_FORMAT = "org.apache.hudi"
val TABLE_NAME = "hoodie.table.name"
val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
val config = Map(
"table_name" -> "example_table",
"target" -> "file:///tmp/example_table/",
"primary_key" ->  "id",
"sort_key" -> "id"
)
val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
"{\"id\":" + i + "}")
val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
val df1 = spark.read.json(jsonRDD)

println(s"${df1.count()} records in source 1")

df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
  option(BULK_INSERT_PARALLELISM, 1).
  mode("Overwrite").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")

// Runs very slow
df1.limit(300).write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  save(config("target"))

// Runs very slow
df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")
{code}
 

 

 
h2. *Analysis*
h3. *Upsert (400 entries)*
{code:java}
WARN HoodieMergeHandle: 
Number of entries in MemoryBasedMap => 150875 
Total size in bytes of MemoryBasedMap => 83886580 
Number of entries in DiskBasedMap => 3849125 
Size of file spilled to disk => 1443046132
{code}
h3. Hang stackstrace (DiskBasedMap#get)

 
{code:java}
"pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
at java.util.zip.ZipFile.getEntry(Native Method)
at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
-  locked java.util.jar.JarFile@1fc27ed4
at java.util.jar.JarFile.getEntry(JarFile.java:240)
at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
-  locked java.lang.Object@28f65251
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked 
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@a353dff
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked com.esotericsoftware.reflectasm.Acces

[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042044#comment-17042044
 ] 

Vinoth Chandar commented on HUDI-625:
-

let's do atleast a million records for these, kind of things... :) yeah I 
intentionally tested a java native class like string to weed our DiskBasedMap 
itself as the culprit.. 

 

I am testing a PoC to make the map entries much smaller... it will help..  But 
we need to understand the deserialization problem

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875Total 
> size in bytes of MemoryBasedMap => 83886580Number of entries in DiskBasedMap 
> => 2849125Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132

[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread sivabalan narayanan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042035#comment-17042035
 ] 

sivabalan narayanan commented on HUDI-625:
--

oh, I see you tested string, string and probably data size is large. 

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875Total 
> size in bytes of MemoryBasedMap => 83886580Number of entries in DiskBasedMap 
> => 2849125Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>  
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
> at java.util.zip.ZipFile.getEntry(Native Method)
> at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
>

[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread sivabalan narayanan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17042030#comment-17042030
 ] 

sivabalan narayanan commented on HUDI-625:
--

I am getting something similar to lamberken's nos. (in my mac)
DiskBasedMap
>>> test 1000 entries
>>> write took : 227
>>> read took : 2116
DiskBasedMap
>>> test 1000 entries

>>> write took : 163
>>> read took : 25

[~vinoth]: I don't get what your nos are. Why write is taking longer time? 
myself and lamberken are seeing that read is taking longer and write if faster. 

 

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875Total 
> size in bytes of MemoryBasedMap => 83886580Number of entries in DiskBasedMap 
> => 2849125Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries

[jira] [Comment Edited] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041809#comment-17041809
 ] 

lamber-ken edited comment on HUDI-625 at 2/21/20 12:56 PM:
---

Test write HoodieRecord value into diskmap

>>> test 1000 entries 
 >>> write took : 106 ms
 >>> read took : 1492 ms
{code:java}
public void test02() throws IOException, URISyntaxException {
  List hoodieRecords = 
SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);

  DiskBasedMap stringDiskMap = new 
DiskBasedMap<>("/tmp/diskmap");

  long writeStartMs = System.currentTimeMillis();
  for (HoodieRecord record : hoodieRecords) {
stringDiskMap.put(record.getRecordKey(), record);
  }

  System.err.println(">>> test 1000 entries");
  System.err.println(">>> write took : " + (System.currentTimeMillis() - 
writeStartMs));

  long readStartMs = System.currentTimeMillis();

  List datas = new LinkedList<>();
  for (HoodieRecord record : hoodieRecords) {
datas.add(stringDiskMap.get(record.getRecordKey()));
  }
  System.err.println(">>> read took : " + (System.currentTimeMillis() - 
readStartMs));
}
{code}


was (Author: lamber-ken):
Test write HoodieRecord value into diskmap, slow

>>> test 1000 entries 
>>> write took : 106 ms
>>> read took : 1492 ms
{code:java}
public void test02() throws IOException, URISyntaxException {
  List hoodieRecords = 
SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);

  DiskBasedMap stringDiskMap = new 
DiskBasedMap<>("/tmp/diskmap");

  long writeStartMs = System.currentTimeMillis();
  for (HoodieRecord record : hoodieRecords) {
stringDiskMap.put(record.getRecordKey(), record);
  }

  System.err.println(">>> test 1000 entries");
  System.err.println(">>> write took : " + (System.currentTimeMillis() - 
writeStartMs));

  long readStartMs = System.currentTimeMillis();

  List datas = new LinkedList<>();
  for (HoodieRecord record : hoodieRecords) {
datas.add(stringDiskMap.get(record.getRecordKey()));
  }
  System.err.println(">>> read took : " + (System.currentTimeMillis() - 
readStartMs));
}
{code}

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875Total 
> size in bytes of MemoryBasedMap => 83886580Number of entries in DiskBasedMap 
> => 2849125Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.

[jira] [Comment Edited] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041809#comment-17041809
 ] 

lamber-ken edited comment on HUDI-625 at 2/21/20 12:54 PM:
---

Test write HoodieRecord value into diskmap, slow

>>> test 1000 entries 
>>> write took : 106 ms
>>> read took : 1492 ms
{code:java}
public void test02() throws IOException, URISyntaxException {
  List hoodieRecords = 
SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);

  DiskBasedMap stringDiskMap = new 
DiskBasedMap<>("/tmp/diskmap");

  long writeStartMs = System.currentTimeMillis();
  for (HoodieRecord record : hoodieRecords) {
stringDiskMap.put(record.getRecordKey(), record);
  }

  System.err.println(">>> test 1000 entries");
  System.err.println(">>> write took : " + (System.currentTimeMillis() - 
writeStartMs));

  long readStartMs = System.currentTimeMillis();

  List datas = new LinkedList<>();
  for (HoodieRecord record : hoodieRecords) {
datas.add(stringDiskMap.get(record.getRecordKey()));
  }
  System.err.println(">>> read took : " + (System.currentTimeMillis() - 
readStartMs));
}
{code}


was (Author: lamber-ken):
Test write HoodieRecord value into diskmap, slow
{code:java}
public void test02() throws IOException, URISyntaxException {
  List hoodieRecords = 
SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);

  DiskBasedMap stringDiskMap = new 
DiskBasedMap<>("/tmp/diskmap");

  long writeStartMs = System.currentTimeMillis();
  for (HoodieRecord record : hoodieRecords) {
stringDiskMap.put(record.getRecordKey(), record);
  }

  System.err.println(">>> test 1000 entries");
  System.err.println(">>> write took : " + (System.currentTimeMillis() - 
writeStartMs));

  long readStartMs = System.currentTimeMillis();

  List datas = new LinkedList<>();
  for (HoodieRecord record : hoodieRecords) {
datas.add(stringDiskMap.get(record.getRecordKey()));
  }
  System.err.println(">>> read took : " + (System.currentTimeMillis() - 
readStartMs));
}


>>> test 1000 entries
>>> write took : 106
>>> read took : 1492
{code}

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875Total 
> size in bytes of MemoryBasedMap => 83886580Number of entries in DiskBasedMap 
> => 2849125Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.js

[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041809#comment-17041809
 ] 

lamber-ken commented on HUDI-625:
-

Test write HoodieRecord value into diskmap, slow
{code:java}
public void test02() throws IOException, URISyntaxException {
  List hoodieRecords = 
SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);

  DiskBasedMap stringDiskMap = new 
DiskBasedMap<>("/tmp/diskmap");

  long writeStartMs = System.currentTimeMillis();
  for (HoodieRecord record : hoodieRecords) {
stringDiskMap.put(record.getRecordKey(), record);
  }

  System.err.println(">>> test 1000 entries");
  System.err.println(">>> write took : " + (System.currentTimeMillis() - 
writeStartMs));

  long readStartMs = System.currentTimeMillis();

  List datas = new LinkedList<>();
  for (HoodieRecord record : hoodieRecords) {
datas.add(stringDiskMap.get(record.getRecordKey()));
  }
  System.err.println(">>> read took : " + (System.currentTimeMillis() - 
readStartMs));
}


>>> test 1000 entries
>>> write took : 106
>>> read took : 1492
{code}

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875Total 
> size in bytes of MemoryBasedMap => 83886580Number of entries in DiskBasedMap 
> => 2849125Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KE

[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar updated HUDI-625:

Description: 
[https://github.com/apache/incubator-hudi/issues/1328]

 

 So what's going on here is that each entry (single data field) is estimated to 
be around 500-750 bytes in memory and things spill a lot... 
{code:java}
20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 for 
3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 partitionPath=default}, 
currentLocation='HoodieRecordLocation {instantTime=20200220225748, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
newLocation='HoodieRecordLocation {instantTime=20200220225921, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
 
{code:java}
INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875Total size 
in bytes of MemoryBasedMap => 83886580Number of entries in DiskBasedMap => 
2849125Size of file spilled to disk => 1067101739 {code}
h2. Reproduce steps

 
{code:java}
export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
--executor-memory 6G \
--packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
{code}
 
{code:java}
val HUDI_FORMAT = "org.apache.hudi"
val TABLE_NAME = "hoodie.table.name"
val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
val config = Map(
"table_name" -> "example_table",
"target" -> "file:///tmp/example_table/",
"primary_key" ->  "id",
"sort_key" -> "id"
)
val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
"{\"id\":" + i + "}")
val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
val df1 = spark.read.json(jsonRDD)

println(s"${df1.count()} records in source 1")

df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
  option(BULK_INSERT_PARALLELISM, 1).
  mode("Overwrite").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")

// Runs very slow
df1.limit(300).write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  save(config("target"))

// Runs very slow
df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")
{code}
 

 

 
h2. *Analysis*
h3. *Upsert (400 entries)*
{code:java}
WARN HoodieMergeHandle: 
Number of entries in MemoryBasedMap => 150875 
Total size in bytes of MemoryBasedMap => 83886580 
Number of entries in DiskBasedMap => 3849125 
Size of file spilled to disk => 1443046132
{code}
h3. Hang stackstrace (DiskBasedMap#get)

 
{code:java}
"pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
at java.util.zip.ZipFile.getEntry(Native Method)
at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
-  locked java.util.jar.JarFile@1fc27ed4
at java.util.jar.JarFile.getEntry(JarFile.java:240)
at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
-  locked java.lang.Object@28f65251
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked 
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@a353dff
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked com.esotericsoftware.reflectasm.Access

[GitHub] [incubator-hudi] vinothchandar commented on issue #1346: [WIP] [HUDI-554] Cleanup package structure in hudi-client

2020-02-21 Thread GitBox
vinothchandar commented on issue #1346: [WIP] [HUDI-554] Cleanup package 
structure in hudi-client
URL: https://github.com/apache/incubator-hudi/pull/1346#issuecomment-589581238
 
 
   cc @yanghua 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] vinothchandar opened a new pull request #1346: [HUDI-554] Cleanup package structure in hudi-client

2020-02-21 Thread GitBox
vinothchandar opened a new pull request #1346: [HUDI-554] Cleanup package 
structure in hudi-client
URL: https://github.com/apache/incubator-hudi/pull/1346
 
 
   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a 
pull request.*
   
   ## What is the purpose of the pull request
   
   Before we embark on moving spark specific code to `hudi-spark` and making a 
`hudi-writer-common`, lets fix some current inconsistencies in the class 
organization.
   
   ## Brief change log
   
- Just package, class moves and renames with the following intent
- `client` now has all the various client classes, that do the transaction 
management
- `func` renamed to `execution` and some helpers moved to `client/utils`
- All compaction code under `io` now under `table/compact`
- Rollback code under `table/rollback` and in general all code for 
individual operations under `table`
- `exception` `config`, `metrics` left untouched
- Moved the tests also accordingly
   
   ## Verify this pull request
   
   
   This pull request is a trivial rework / code cleanup without any test 
coverage.
   
   This pull request is already covered by existing tests, such as *(please 
describe tests)*.
   
   ## Committer checklist
   
- [ ] Has a corresponding JIRA in PR title & commit

- [ ] Commit message is descriptive of the change

- [ ] CI is green
   
- [ ] Necessary doc changes done or have another open PR
  
- [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (HUDI-554) Restructure code/packages to move more code back into hudi-writer-common

2020-02-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-554:

Labels: pull-request-available  (was: )

> Restructure code/packages  to move more code back into hudi-writer-common
> -
>
> Key: HUDI-554
> URL: https://issues.apache.org/jira/browse/HUDI-554
> Project: Apache Hudi (incubating)
>  Issue Type: Sub-task
>  Components: Code Cleanup
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041691#comment-17041691
 ] 

Vinoth Chandar commented on HUDI-625:
-

Wondering if its sufficient to just have the payload/data part of the incoming 
HoodieRecord in the ExternalSpillableMap.. the other fields like partitionpath, 
record location etc are just the same for all records coming in... so we can 
just add it lazily on demand? This will cut down the size by a lot and make 
deserialization simple.. 

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>  
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
> at java.util.zip.ZipFile.getEntry(Native Method)
> at java.util.zip.ZipFile.

[GitHub] [incubator-hudi] vinothchandar commented on issue #1278: [HUDI-573] Refactoring getter to avoid double extrametadata in json representation of HoodieCommitMetadata

2020-02-21 Thread GitBox
vinothchandar commented on issue #1278: [HUDI-573] Refactoring getter to avoid 
double extrametadata in json representation of HoodieCommitMetadata
URL: https://github.com/apache/incubator-hudi/pull/1278#issuecomment-589546842
 
 
   @bvaradar please check prior to merging if the commit message will retain 
the JIRA number :) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041656#comment-17041656
 ] 

Vinoth Chandar commented on HUDI-625:
-

[~lamber-ken] may be we are missing registering these classes with kryo? when 
serializing? 

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>  
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
> at java.util.zip.ZipFile.getEntry(Native Method)
> at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
> -  locked java.util.jar.JarFile@1fc27ed4
> at java.util.jar.JarFile.getEntry(JarFile.java:240)
> at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
> at sun.misc.URLClassPath$JarLoader.getResource(URL

[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041654#comment-17041654
 ] 

Vinoth Chandar commented on HUDI-625:
-

If I write a simple string key, value into diskmap, its blazing fast (as 
expected)

 
{code:java}
DiskBasedMap stringDiskMap = new DiskBasedMap<>("/tmp/diskmap");

long writeStartMs = System.currentTimeMillis();
for (HoodieRecord record : records) {
  stringDiskMap.put(record.getRecordKey(), record.getPartitionPath());
}
System.err.println(">>> write took : " + (System.currentTimeMillis() - 
writeStartMs));

long readStartMs = System.currentTimeMillis();
for (HoodieRecord record : records) {
  stringDiskMap.get(record.getRecordKey());
}
System.err.println(">>> read took : " + (System.currentTimeMillis() - 
readStartMs)); 


gives : 
>>> write took : 7560
>>> read took : 427


{code}

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 

[jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041649#comment-17041649
 ] 

Vinoth Chandar commented on HUDI-625:
-

cc [~nishith29] [~vbalaji] this seems very unexpected for the deser.. 

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>  
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
> at java.util.zip.ZipFile.getEntry(Native Method)
> at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
> -  locked java.util.jar.JarFile@1fc27ed4
> at java.util.jar.JarFile.getEntry(JarFile.java:240)
> at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
> at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
>