Thursday, December 24, 2015

Java - Coding - Java 8 new features - Aggregate Operation


Concept:
  1. Stream - A stream is a sequence of elements. Unlike a collection, it is not a data structure that stores elements. Instead, a stream carries values from a source through a pipeline
  2. Intermediate operation - an intermediate operation produce a new stream
  3. Terminal operation - produces a non-stream result. Can be a collection, primitive value or no value at all
  4. Pipeline - a sequence of aggregate operations. 
    1. Components in a pipeline
      1. A source or stream
      2. Zero or more intermediate operation
      3. A terminal operation
  5. Reduction function - the terminal operations returning one value combining the content of a stream.
    1. Raw reduction functions are provided by JDK to have custom implementation. Example-4 and 5 elaborates that
  6. Parallelism in Stream - stream execution can be done in parallel. the Java runtime partitions the stream into multiple substreams. Aggregate operations iterate over and process these substreams in parallel and then combine the results. Method to use '.parallelStream(...)'.


Example-1: basic operations on a stream.  

1
2
3
4
5
int[] values = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20};
System.out.println("Any match of element value 3. "+ (Arrays.stream(values).anyMatch(each -> each==3) ? "FOUND": "NOT FOUND"));
System.out.println("All element between 1 and 15. "+ Arrays.stream(values).allMatch(each -> each > 0 && each <15));
//Show how many elements, average and sum of elements
System.out.print(Arrays.stream(values).count()+" elements, where SUM= "+Arrays.stream(values).sum()+" AVERAGE="+Arrays.stream(values).average().getAsDouble());
Full source code below.

Explanation - Arrays.stream(...) converts any array to a stream than any pipeline operation can be used. anyMatch, allMatch methods are passed lambda expression. count, sum, average, max, min methods don't take any input parameter or expression and are terminal operations. Terminal operations return OptionalInt/OptionalDouble/etc. if returns a value. max method returns OptionalInt and getAsInt method is used to fetch the value. average method returns OptionalDouble and getAsDouble is used to fetch the value.

Example-2: filter out value elements and show the values.

1
2
3
//Filter out odd value elements and print the even values.
System.out.print("Even value elements are= ");
Arrays.stream(values).filter(each -> each %2 == 0).forEach(each -> System.out.print(each+", "));
Full source code below

Explanation - first operation in the pipeline is filter which takes a lambda expression. When the lambda expression is executed, it returns true if element value is a even number and that even value is passed to the next operation which is forEach. forEach operation is passed with another lambda expression which executes a regular statement, in this case it prints the value from stream to console. Filter method returns a stream as input for forEach and the stream type is IntStream.

Example-3: use aggregate operation on collection and map to a value.

Source code

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import java.util.function.DoubleConsumer;
import java.util.function.IntConsumer;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class TestAggregateOperation {

 private static List<Course> courses = new ArrayList<Course>();
 static{
  courses.add(new Course("Basic math", 89, Course.Type.Math));
  courses.add(new Course("English", 75, Course.Type.Literature));
  courses.add(new Course("Geomatry I", 82, Course.Type.Geomatry));
  courses.add(new Course("Advance math", 92, Course.Type.Math));
  courses.add(new Course("Basic science", 89, Course.Type.Science)); 
  courses.add(new Course("Physics", 78, Course.Type.Science));
  courses.add(new Course("Chemistry", 77, Course.Type.Science));
 }
 public static void main(String[] args) {
  new TestAggregateOperation().example1_2();
  new TestAggregateOperation().example3();
  new TestAggregateOperation().example4();
  new TestAggregateOperation().example5();
 }

 private void example1_2() {
  int[] values = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20};
  
  System.out.println("Any match of element value 3. "+ (Arrays.stream(values).anyMatch(each -> each==3) ? "FOUND": "NOT FOUND"));
  System.out.println("All element between 1 and 15. "+ Arrays.stream(values).allMatch(each -> each > 0 && each <15));
  //Show how many elements, average and sum of elements
  System.out.print(Arrays.stream(values).count()+" elements, where SUM= "+Arrays.stream(values).sum()+" AVERAGE="+Arrays.stream(values).average().getAsDouble());
  //Show min and max value
  System.out.println(" MAX= "+Arrays.stream(values).max().getAsInt()+" MIN= "+Arrays.stream(values).min().getAsInt());
  //Filter out odd value elements and print the even values.
  System.out.print("Even value elements are= ");
  Arrays.stream(values).filter(each -> each %2 == 0).forEach(each -> System.out.print(each+", "));
  System.out.println("");
 }
 
 public void example3(){
  //Find Math courses and get total and average
  double total = courses
   .stream()
   .filter(each -> each.getCourseType()== Course.Type.Math)
   .mapToDouble(each -> each.getMark())
   .sum();
  double average = courses
   .stream()
   .filter(each -> each.getCourseType()== Course.Type.Math)
   .mapToDouble(each -> each.getMark())
   .average().getAsDouble();
  System.out.println("Total mark ("+total+") and average ("+average+") on math courses");
  
  //Print all courses mark along with name and type
  System.out.println("Courses");
  courses.stream().forEach(each -> System.out.println("Course ("+each.getName()+"), Type ("+each.getCourseType()+"), Mark ("+each.getMark()+")"));
 }
 
 public void example4(){
  System.out.println("Total= "+courses.stream().mapToDouble(each -> each.getMark()).sum());
  System.out.println("Total using reduce= "+courses.stream().mapToDouble(each -> each.getMark()).reduce(0.0, (a,b)-> a + b));
 }
 
 public void example5(){
  CalculateAverage avg = courses.stream().mapToDouble(each -> each.getMark())
    .collect(CalculateAverage::new, CalculateAverage::accept, CalculateAverage::combine);
  System.out.println("Average using collect= "+ avg.average()); 
 }
}

class Course{
 enum Type{Science, Math, Geomatry, Literature};
 private String name;
 private double mark;
 private Type courseType;
 
 public Course(String name, double mark, Type courseType){
  this.name = name;
  this.mark = mark;
  this.courseType = courseType;
 }
 public String getName() {
  return name;
 }
 public void setName(String name) {
  this.name = name;
 }
 public double getMark() {
  return mark;
 }
 public void setMark(double mark) {
  this.mark = mark;
 }
 public Type getCourseType() {
  return courseType;
 }
 public void setCourseType(Type courseType) {
  this.courseType = courseType;
 }
}

class CalculateAverage implements DoubleConsumer{
 int total = 0;
 int count = 0;
 public double average(){
  return count > 0 ? total/count : 0.0;
 }
 @Override
 public void accept(double value){
  total += value;
  count++;  
 }
 public void combine(CalculateAverage other){
  total += other.total;
  count += other.count;
  
 }
}

Explanation - Course class is created to represent a course which has mark and course type attributes. Course type is an enum. In example3 method 5 Course instance are created. To find total and average on Math courses, filter is used with lambda expression to short out Math courses and passes to mapToDouble as stream of Course element. This mapToDouble maps each Course element to a double value which is the mark of the Course instance and passes to sum/average as another stream (DoubleStream). sum/average performs as terminal operation and returns OptionalDouble and getAsDouble method fetches the value.

Usage of Stream.reduce function
This method can be used to work on custom reduction functions which not provided in the API. For example - median. 

Arguments of Stream.reduce - 
  • identity - represents both initial value of reduction function and default value if no elements in the stream
  • accumulator - this is a function which takes two parameters, partial value of reduction and next element of stream. Returns a new partial result
Example-04: Stream.reduce to find total marks obtained.


1
2
System.out.println("Total= "+courses.stream().mapToDouble(each -> each.getMark()).sum());
System.out.println("Total using reduce= "+courses.stream().mapToDouble(each -> each.getMark()).reduce(0.0, (a,b)-> a + b));
Full source code above.

Usage of Stream.collect function
Stream.reduce is not a good choice when dealing with complex object or collection. Because every time accumulator function processes an element in reduce method, will create a new collection including that element. In such situation, it's better to use Stream.collect.

Example-05 - average using new data type.

1
2
3
CalculateAverage avg = courses.stream().mapToDouble(each -> each.getMark())
  .collect(CalculateAverage::new, CalculateAverage::accept, CalculateAverage::combine);
System.out.println("Average using collect= "+ avg.average());
Full source code above.

Explanation - 
  • New data type CalculateAverage is created to store total of elements and count of elements
  • Stream.collect method is used
  • Signature of Stream.collect-
    • supplier - factory function, creates a new instance
    • accumulator - incorporates stream element to container (CalculateAverage)
    • combiner - combines two resulting containers and merges 
  • accumulator and combiner don't return any value
  • 'method reference' is passed in supplier/accumulator/combiner which a new feature of JDK 8. Lambda expression could be used