Java 8 - Streams

A Sequence of Elements

The Challenge

No solution without a problem

  • Imagine a bunch of data to process
  • Data comes from sources
  • Typically in a serialized fashion and not as unstructured blog
  • The data "stream" is process step by step
  • If done right, work for long "streams" can be split up and done in parallel before joined together again
  • Imagine a list of names aka first names
  • We want to process these names in various ways
  • Clean them
  • Sort them
  • Filter bad data
  • Transform them
  • Count them
  • ...

An Example for our Challenge

Let's try something

  • Imagine a list of names aka first names
  • Filter empty names and single letters
  • Transform into correct casing
  • Sort
  • Display the first 3
public void naive01()
{
    final List<String> names = 
        Arrays.asList(
            "B", "Sara", "", "HARRY", "will", 
            "Tom", "Vic", "Alf", "MAria", null);
    
    // our intermediate result
    final List<String> result = new ArrayList<>();
    
    // process the input
    for (String name : names)
    {
        // don't do empty or single letters
        if (name != null && name.length() > 1)
        {
            // fix the casing and store result
            result.add(
                name.substring(0, 1).toUpperCase() 
                    + name.substring(1).toLowerCase());
        }
    }
    
    // sort
    Collections.sort(result);
    
    // display the first three
    for (int i = 0; i < Math.min(3, result.size()); i++)
    {
        System.out.println(result.get(i));
    }
}

Less lines and more clarity

The same result by using streams

public void naive01()
{
    final List<String> names = 
        Arrays.asList(
            "B", "Sara", "", "HARRY", "will", 
            "Tom", "Vic", "Alf", "MAria", null);
    
    // our intermediate result
    final List<String> result = new ArrayList<>();
    
    // process the input
    for (String name : names)
    {
        // don't do empty or single letters
        if (name != null && name.length() > 1)
        {
            // fix the casing and store result
            result.add(
                name.substring(0, 1).toUpperCase() 
                    + name.substring(1).toLowerCase());
        }
    }
    
    // sort
    Collections.sort(result);
    
    // display the first three
    for (int i = 0; i < Math.min(3, result.size()); i++)
    {
        System.out.println(result.get(i));
    }
}
public void stream01()
{
    final List<String> names = 
        Arrays.asList(
            "B", "Sara", "", "HARRY", "will", 
            "Tom", "Vic", "Alf", "MAria", null);
    
    names.stream()
        .filter(s -> s != null)
        .filter(s -> s.length() > 1)
        .map(s -> s.toLowerCase())
        .map(s -> s.substring(0, 1).toUpperCase())
        .sorted()
        .limit(3)
        .forEach(System.out::println);
}

What are Streams?

The long version

  • Streams are pipes with two ends and a functionality in between
  • Pipes can be connected when the pipe/stream is intermediate - called intermediate operation - they will form a pipeline
  • Only one pipe can and has to end the stream aka the pipeline - called terminal operation
  • Streams are resources and need to be closed but typically that is done either automatically or is a noop due to the stream source not being an IO channel
  • Streams can be serial or parallel
  • Most operations have to be:
    • non-interfering: they don't modify the source of the data
    • stateless: the operation can be repeated and produces the same outcome aka it is deterministic
    • Important: Even when the entire stream is stateless, single operations might be stateful because they rely on information from previous elements

Explained again

Just in case version 1 was too much

  • Streams work like assembly lines
  • A stream has one source - which might be an array, a collection, a generator function, an I/O channel, etc
  • It has no, one, or many intermediate operations - which transform a stream into another stream
  • It has one and only one terminal operation - which produces a result or side-effect
  • Streams are lazy; computation on the source data is only performed when the terminal operation is initiated
  • Source elements are consumed only when needed
// print 1 to 10
IntStream.range(1, 11) // inclusive, exclusive!
    .forEach(System.out::println);

// print even numbers of 1 to 10
IntStream.range(1, 11)
    .filter(i -> i % 2 == 0)
    .forEach(System.out::println);

// return the sum of all even numbers between 1 and 100    
final int sum = IntStream.range(1, 101)
                            .filter(i -> i % 2 == 0)
                            .sum();

Get us a Stream

Where to get a stream from?

  • Collections have stream() and parallelStream()
  • Arrays.stream(Object[]) produces streams
  • Primitive streams can be created from IntStream, LongStream, DoubleStream
  • Stream.of(T... values)
  • String.chars() produces a stream of chars
// just numbers
IntStream.range(1, 11)...;

// from a list    
final List<String> names = Arrays.asList("B", "Sara", "MAria");
names.stream()...;

// from a entry set of a map
final Map<String, Integer> map = new HashMap<>();
map.entrySet().stream()...;

// stream of elements
Stream.of("a1", "a2", "a3")...;

final String[] array = new String[]{"a", "b", "c"};
Arrays.stream(array)...;

For-Each

void forEach(Consumer<? super T> action)

  • An terminal operation
  • Remember, one terminal operation is a must!
  • Takes a consumer as function
  • Function is most likely not stateless
  • Operation is greedy
@Test
public void forEach()
{
    Stream.of("d2", "a2", "b1", "b3", "c")
        .forEach(System.out::println);
   
    IntStream.range(1, 11)
        .forEach(System.out::println);  
        
    "Foobar is a nice word".chars()
        .forEach(c -> System.out.print(Character.valueOf((char) c)));      
}

Filter

Stream<T> filter(Predicate<? super T> predicate)

  • An intermediate operation
  • Takes a Predicate<T> as function
  • If function returns true, element is passed on
  • Predicate has to be stateless and non-interfering
  • Operation is lazy
@Test
public void filters()
{
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> true)
        .forEach(System.out::println);
   
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.length() > 1)
        .filter(s -> Integer.parseInt(s.substring(1)) == 1)
        .forEach(System.out::println);  
        
    "Foobar is a nice word".chars()
        .filter(c -> c > 32)
        .forEach(c -> System.out.print(Character.valueOf((char) c)));      
}

Distinct, Count

Stream<T> distinct()

  • A stateful intermediate operation
  • First found element is preserved
  • Calls T.equals() to compare elements
  • Might be expensive, because state has to be preserved
@Test
public void distinct()
{
    Stream.of("a", "b", "a", "b", "c")
        .distinct()
        .forEach(System.out::println);
}

long count()

  • A stateful terminal operation
  • Counter as state, will be returned as result
  • Counts the number of elements it sees
@Test
public void count()
{
    long result = Stream.of("a", "b", "a", "b", "c")
        .distinct()
        .count();
}

Matcher

boolean *Match(Predicate<? super T> predicate)

  • Terminal operations
  • Takes a predicate with input T
  • Might be short-circuiting, can terminate early when result is known
  • allMatch, noneMatch, anyMatch
@Test
public void matches()
{
    // we have at least one match, false when stream is empty
    boolean b1 = Stream.of("a", "b", "a", "b", "c").anyMatch(s -> s.equals("a"));
    
    // everything matches, true when stream is empty
    boolean b2 = Stream.of("a", "b", "a", "b", "c").allMatch(s -> s.equals("a"));

    // none matches, true when stream is empty
    boolean b3 = Stream.of("a", "b", "a", "b", "c").noneMatch(s -> s.equals("g"));
}

Map

<R> Stream<R> map(Function<? super T,? extends R> mapper)

  • An intermediate operation
  • Takes a function with input T and return type R
  • Function has to be stateless and non-interfering
  • Map is a one-to-one transformation
  • Operation is lazy
  • Optional mapToObj, mapToInt, mapToLong, mapToDouble to switch between primitive and non-primitive types
@Test
public void map()
{
    Stream.of("d2", "a2", "b1", "b3", "c")
        .map(s -> new StringBuilder(s))
        .map(s -> s.reverse())
        .forEach(System.out::println);
   
    Stream.of("d2", "a2", "b1", "b3", "c")
        .mapToInt(s -> s.length())
        .map(i -> i + 1)
        .forEach(System.out::println);   
    
    "Foobar is a nice word".chars()
        .map(Character::toUpperCase)
        .mapToObj(c ->  {
            String s = String.valueOf(c);
            return s + s;
         })
        .forEach(System.out::print);
}

FlatMap

<R> Stream<R> flatMap(Function<? super T,? extends Stream<? extends R>> mapper)

  • An intermediate operation
  • Takes a function with input T and returns a stream of type R
  • Resulting stream will be read until empty, the read content is returned to the "main stream" as elements of type R
  • flatMap is a one-to-many transformation
  • Function has to be stateless and non-interfering
  • flatMapToInt, flatMapToLong, flatMapToDouble to switch to primitive types
@Test
public void flatMap()
{
    // need all words larger 2 characters 
    Stream.of("Java in Action", "BDD in  Action", "Maven in Action")
        .flatMap(s -> Arrays.stream(s.split("\\s")))
        .filter(s -> s.length() > 2)
        .forEach(System.out::println);
}  

Peek

Stream<T> peek(Consumer<? super T> action)

  • An intermediate operation
  • Returns a stream of the elements of the stream
  • Performing the provided action for each element
  • Action should be non-interfering, don't play with the source
  • Mainly provided for debugging reasons
@Test
public void peek()
{
    Stream.of("one", "two", "three", "four")
         .filter(e -> e.length() > 3)
         .peek(e -> System.out.println("Afrer filter: " + e))
         .map(String::toUpperCase)
         .peek(e -> System.out.println("After map: " + e))
         .forEach(s -> {});
}
After Filter: three
After Map: THREE
After Filter: four
After Map: FOUR

Limit, Skip

Stream<T> limit(long n)

  • A short-circuiting stateful intermediate operation.
  • Returns the first elements and skips the rest.
  • Cheap sequentially, expensive in parallel when an ordered stream is used
@Test
public void limit()
{
    Stream.of("a", "b", "a", "b", "c")
        .limit(1)
        .forEach(System.out::println);
}

Stream<T> skip(long n)

  • A stateful intermediate operation
  • Returns the remaining elements after skipping the first n.
  • Cheap sequentially, expensive in parallel when an ordered stream is used
@Test
public void skip()
{
    Stream.of("a", "b", "a", "b", "c")
        .skip(2)
        .forEach(System.out::println);
}

Sorting

Stream<T> sorted()

  • Returns a sorted stream
  • Elements have to implement Comparable
  • Hence it is sorted by natural order
  • This is a stateful operation and expensive
@Test
public void sorted()
{
    Stream.of("a", "b", "a", "b", "c")
        .sorted()
        .forEach(System.out::println);
}

Stream<T>
sorted(Comparator<? super T> comparator)

  • Returns a sorted stream
  • Enables to sort types that are not implementing Comparable
  • This is a stateful operation and expensive
@Test
public void sorted2()
{
    Stream.of("a", "b", "a", "b", "c")
        .sorted((s1, s2) -> s2.compareTo(s1))
        .forEach(System.out::println);
}
@Test
public void sorted2()
{
    Stream.of(new Vehicle("Maybach", 20), new Vehicle("Audi", 10))
        .sorted(
               Comparator.comparing(Vehicle::getBrand)
               .thenComparing(Comparator.comparing(Vehicle::getAge)))
        .forEach(System.out::println);
}

Reduction Operations

How to get to meaningful results

The Challenge

So far, we have gotten only a few results back

  • Streams might have to produce results
  • A result of these forms is needed:
    • a single value,
    • a complex object,
    • or a kind of list of the data processed
  • Hence a result can be a reduction or a collection
  • So far we sometimes already returned a result by applying find or match without knowing anything about reduction aka we reduced our stream to a single result
  • Reduction via the method reduce()
  • Collection via the method collect()
  • Sadly, collect() accepts a Collector that can run reduction operations, hence the naming is not absolutely clean
  • Java 8 calls the reduction an immutable reduction
  • Java 8 calls the collection a mutable reduction

Keep it simple

Provided reduction and collection operations

  • Stream offers some pre-made reducers:
    • max()
    • min()
    • count()
  • Primitive streams offer some more:
    • average()
    • summaryStatistics()
    • sum()
  • Despite not being named reducers or collectors, these methods do exactly the same:
    • boolean allMatch(Predicate)
    • boolean anyMatch(Predicate)
    • boolean noneMatch(Predicate)
    • Optional findAny()
    • Optional findFirst()

How does it work?

How collecting works aka mutable reductions

  • A mutable reduction creates a container with results/a result
  • The result and intermediate containers are created by a supplier function
  • Any addition to a container is defined by a accumulator function
  • Because we can run in parallel and have intermediate containers, we have to combine them using a combiner function
  • After collecting everything, we can apply a last transformation with a finisher
  • Imagine a simple collection of a stream result in a list that should be protected later on
  • supplier: ArrayList::new
  • accumulator: ArrayList::add
  • combiner: ArrayList::addAll
  • finisher: Collections.unmodifiableList

Collect

Let's collect the hard way first

// java.util.stream.Stream
<R> R collect(
        Supplier<R> supplier, 
        BiConsumer<R,? super T> accumulator,
        BiConsumer<R,R> combiner)
  • This is a terminal operation
  • A mutable reduction
List<String> asList1 = Stream.of("a", "b", "a", "b", "c")
    .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
List<String> asList2 = Stream.of("a", "b", "c", "d", "e")
    .collect(
             () -> new ArrayList<String>(), 
             (result, element) -> result.add(element), 
             (result, intermediate) -> result.addAll(intermediate));
  • R: the type of the result
  • supplier: a function that creates a new result container. For parallel execution it must return a fresh value each time.
  • accumulator: an associative, non-interfering, stateless function for incorporating an additional element into a result
  • combiner: an associative, non-interfering, stateless function for combining two results into a new result, needed for parallel operations

Collectors

Make collect operations reusable

  • A Collector accumulates entries into a mutable result container, and optionally perform a final transform on the result
  • A Collector defines a mutable reduction operation
  • java.util.stream.Collector defines an interface
  • Allows to create reusable collectors
  • No need to implement a Collector, use of method
  • java.util.stream.Collectors defines a huge range of ready to use collectors
  • A collector consists of:
    • creation of a new result container - supplier
    • incorporating a new data element - accumulator
    • combining two result containers - combiner
    • performing an optional final transform - finisher
    • Characteristics.CONCURRENT, Characteristics.IDENTITY_TRANSFORM, Characteristics.UNORDERED
  • Characteristics are a hint to the implementation of the reduction aka the stream

Apply Collectors

Basic custom collector usage

// java.util.stream.Stream
<R,A> R collect(Collector<? super T,A,R> collector)
  • This is a terminal operation
  • Requires a configured collector
// as seen before 
List<String> asList1 = Stream.of("a", "b", "c", "d", "e")
    .collect(
             () -> new ArrayList<String>(), 
             (result, element) -> result.add(element), 
             (result, intermediate) -> result.addAll(intermediate));
final Collector<String, List<String>, List<String>> listCollector = Collector.of(
            () -> new ArrayList<String>(), 
            (result, element) -> result.add(element), 
            (result, intermediate) -> { 
                result.addAll(intermediate); 
                return result; 
            });
            
final List<String> asList3 = Stream.of("a", "b", "c", "d", "e")
    .collect(listCollector);
// interface java.util.stream.Collector
static <T, R> Collector<T, R, R> of(
    Supplier<R> supplier,
    BiConsumer<R, T> accumulator,
    BinaryOperator<R> combiner,
    Collector.Characteristics... characteristics)
  • T: the type of input elements
  • R: the type of intermediate accumulation result, and final result
  • supplier: the supplier function
  • accumulator: the accumulator function
  • combiner: the combiner function
  • characteristics: the collector characteristics

Finish it

Enhance collectors with finishers

static <T, A, R> Collector<T, A, R> of(
    Supplier<R> supplier,
    BiConsumer<R, T> accumulator,
    BinaryOperator<R> combiner,
    Function<A, R> finisher,
    Collector.Characteristics... characteristics)
  • T: the type of input elements
  • R: the type of intermediate accumulation result, and final result
  • supplier: the supplier function
  • accumulator: the accumulator function
  • combiner: the combiner function
  • finisher: the finisher function
  • characteristics: the collector characteristics
final Collector<String, List<String>, List<String>> withFinisher = 
    Collector.of(
        () -> new ArrayList<String>(), 
        (result, element) -> result.add(element), 
        (result, intermediate) -> { 
            result.addAll(intermediate); return result; 
        },
        (result) -> {
            Collections.shuffle(result); 
            return result;
        });

final List<String> asList4 = Stream.of("a", "b", "c", "d", "e")
    .collect(withFinisher);

Collectors

Ready to use collectors

  • java.util.stream.Collectors define many ready to use collectors
  • Reduces the boilerplate for the most typical stream tasks
  • Some offer redundant operations to already existing stream reductions
  • Math stuff
    • averaging
    • summing
    • counting
    • min/max
    • summary statistics
  • to collection
  • partitioning - two dimensional grouping
  • grouping - multi dimensional partitioning
  • immutable reducing

Collectors - Math

Examples of Math operations

// summing
long total = Stream.of(
    new Vehicle("Maybach", 20, 10000), new Vehicle("Audi", 10, 20000))
    .collect(Collectors.summingLong(Vehicle::getValue));
// averaging
double avgAge = Stream.of(
    new Vehicle("Maybach", 20, 10000), new Vehicle("Audi", 10, 20000))
    .collect(Collectors.averagingDouble(Vehicle::getAge));
// stats
final IntSummaryStatistics s = 
    Stream.of(
        new Vehicle("Maybach", 20, 10000), new Vehicle("Audi", 10, 20000))
        .collect(Collectors.summarizingInt(Vehicle::getAge));
        
/*
    void	combine(IntSummaryStatistics other)
    double	getAverage()
    long	getCount()
    int	    getMax()
    int	    getMin()
    long	getSum()
*/
// count
final long count = 
    Stream.of(
        new Vehicle("Maybach", 20, 10000), new Vehicle("Audi", 10, 20000))
        .collect(
            Collectors.minBy(
            .collect(Collectors.counting());
// min
final Optional<Vehicle> newest = 
    Stream.of(
        new Vehicle("Maybach", 20, 10000), new Vehicle("Audi", 10, 20000))
        .collect(
            Collectors.minBy(
                Comparator.comparingInt(Vehicle::getAge)));
// max
final Optional<Vehicle> newest = 
    Stream.of(
        new Vehicle("Maybach", 20, 10000), new Vehicle("Audi", 10, 20000))
        .collect(
            Collectors.maxBy(
                Comparator.comparingInt(Vehicle::getAge)));

Collectors - To Collections

How to simply "collect" all together

  • Just return a collection of the results
  • Type of collection object depends on the need
  • The map type has very extensive mapping options and also a parallel version
// collect in a list
final List<String> result = Stream.of("a", "b", "c", "b")
    .collect(Collectors.toList());
// collect with a set
final Set<String> result = Stream.of("a", "b", "c", "b")
    .collect(Collectors.toSet());
// map to a map with mapping definition
final Map<String, Integer> result = 
    Stream.of("a", "b", "c", "b")
        .collect(
            Collectors.toMap(String::toString, String::length));
// provide your own collection and have a sorted result
final Set<String> result = 
    Stream.of("a", "b", "c", "b")
        .collect(
            Collectors.toCollection(TreeSet::new));

Collectors - Partitioning

How to do two-dimensional partitions

  • Partitions according to a Predicate
  • Can only do boolean
  • Returns a map of Map<Boolean, List<T>>
  • Optionally an additional collection after partitioning is possible
// partition cars by restored state
final Map<Boolean, List<Vehicle>> result = 
    Stream.of(
        new Vehicle("Maybach", 20, 10000), new Vehicle("Audi", 10, 20000, true))
        .collect(Collectors.partitioningBy(Vehicle::isRestored));
{
    false = [Vehicle [brand=Maybach, age=20, value=10000, restored=false]], 
    true  = [Vehicle [brand=Audi, age=10, value=20000, restored=true]]
}
// partition cars by restored state first 
// and collected by age per partition next
final Map<Boolean, List<Vehicle>> result = 
    Stream.of(
        new Vehicle("Maybach", 20, 10000), new Vehicle("Audi", 10, 20000, true))
        .collect(
            Collectors.partitioningBy(Vehicle::isRestored, 
            Collectors.averagingInt(Vehicle::getAge)));
{
    false = 20.0, 
    true  = 10.0
}

Collectors - Grouping

How to do n-dimensional partitions

  • Partitions according to a classifier function
  • Any key possible
  • Returns a map of Map<K, List<T>>
  • Concurrent operations have their own method groupingByConcurrent
  • Possible to supply the maps, if something special is required aka sorting
  • Additional collection after grouping possible
// group by brand
final Map<String, List<Vehicle>> result = 
    Stream.of(
        new Vehicle("Maybach", 20, 10000), 
        new Vehicle("Audi", 10, 20000, true),
        new Vehicle("Audi", 15, 232000))
        .collect(Collectors.groupingBy(Vehicle::getBrand));
{
    Maybach = [
        Vehicle [brand=Maybach, age=20, value=10000, restored=false]
    ], 
    Audi = [
        Vehicle [brand=Audi, age=10, value=20000, restored=true], 
        Vehicle [brand=Audi, age=15, value=232000, restored=false]
    ]
}
// group by brand and collect the values
final Map<String, Double> result = 
    Stream.of(
        new Vehicle("Maybach", 20, 100000), 
        new Vehicle("Audi", 10, 100000, true),
        new Vehicle("Audi", 15, 200000))
        .collect(
            Collectors.groupingBy(Vehicle::getBrand, 
            Collectors.averagingInt(Vehicle::getValue)));
{
    Maybach = 100000.0, 
    Audi    = 150000.0
}