# System Design - Streaming Data Processing

• Introduction
• Common Design Patterns
• Optimization in Acknowledgment Collection
• S4: Distributed Stream Computing Platform
• Hash-based Routing
• MillWheel: Fault-Tolerant Stream Processing at Internet Scale
• Low Watermark
• Techniques to Support Exactly-Once Delivery
• Spark Streaming
• System architecture
• Two Main Concepts
• 1. Resilient Distributed Datasets (RDDs)
• 2. Discretized streams (D-Streams)
• Deterministic Computation

## Introduction

Streaming data processing is the core of any event-driven architecture. It's different from batch processing in many ways:

• Batch processing can be thought as offline computation while streaming data processing is online/real-time computation.
• Data is batch processing is more static while it's more active in streaming data processing.
• The computation topology is more active in streaming data processing as well.
• The computation in batch processing is often scheduled works while the computation in streaming data processing is a reaction to some real-time events.

In this post, we will present four important streaming data processing frameworks.

## Common Design Patterns

For streaming data processing, we usually need to have the following components

• Cluster of physical machines.
• Computation topology. A computation topology is a user-defined computation workflow and message routing rules. In most of the frameworks, users can specify the topology in a configuration file and then submit it to the framework. T
• Coordination system. This is usually implemented using systems such zookeeper and having a special coordination node in the cluster.
• Computation Units. Most of the time, this means a thread that executes a task defined in the computation topology.
• Virtual Nodes. Conceptually, this is a container of computation units. It's usually a running process on a physical machine.
• Communication Layer. This is part of the coordination system.

A commonly used data model in streaming data processing is to model the data as a stream of tuples. A tuple consists of a key and a body. The key is used to identify tasks and routing of the data.

As machine failures are common in any large cluster, the framework needs to be resilient. A common technique called checkpoint is to save the system state to persistent records at important points in the computation process.

Storm is a real-time fault-tolerant and distributed stream data processing system used at Twitter. It consists of the following components:

• Coordinator called Nimbus
• Worker Node
• Supervision layer in the worker node
• Worker process
• Executor

In Storm, worker processes are the virtual nodes, and executors in the worker processes are the computing units.

### Optimization in Acknowledgment Collection

Storm provides at-least-once semantics, therefore, it needs to implement some sort of acknowledgment system. The idea is very simple. Suppose we have a task that emits three output data streams. The receivers of these data stream can acknowledge the receipt of the data by reporting to a acknowledge collection node. Once the acknowledgment collection node receives the acknowledgments for all three data streams, it sends a message to the sender so that the sender knows the data is successfully delivered.

In a naive implementation, we could have a map in the acknowledgment collection node. For example,

1
2
3
4
5
6
Map<SenderId, DeliveryTracker> tracking;

class DeliveryTracker {
Set<OutputStreamId> acknowledgedOuptutStreamId;
Set<OutputStreamId> expectedOutputStreamId;
}


The downside of this approach is that it can consume a lot of memory. Storm uses XOR operator to track the acknowledgment instead. Once the tracking variable becomes zero, we know all data is delivered.

1
2
3
4
5
6
Map<SenderId, BitArray> tracking;

def handleAck(senderId, outputStreamId):
currentState = tracking.get(senderId)
tracking.get(senderId) = XOR(currentState, outputStreamId)



In this way, we don't need to save all the output stream ID in the acknowledgment collection node. Note that the XOR operation is not idempotent itself, however, Strom nodes communicate over TCP/IP which has reliable message delivery so no data is delivered more than once.

Here are two diagrams of the component details:

## S4: Distributed Stream Computing Platform

The architecture of S4 is similar to Storm. It consists of

• Processing Nodes (PNs) are the logical hosts to PEs. They are responsible for listening to events, executing operations on the incoming events, dispatching events with the assistance of the communication layer, and emitting output events.
• Processing Elements (PEs): This is similar to the executors in Storm.
• Note that a PE is instantiated for each value of the key attribute.
• When a new word is seen in an event, S4 creates a new instance of the PE corresponding to that word.

Mapping between S4 and Storm:

• processing node -> worker process
• processing element -> executor
• communication layer -> supervision

### Hash-based Routing

The routing in S4 is based on key attributes and hashing. Each processing element is associated with a specific tuple of the key attributes. A hash value is calculated based on this tuple and is used to determine the processing node that hosts the processing element. Consequently, for a given tuple of some key attributes, the tasks are assigned to only one processing node and one processing element and the tasks are always route to the same processing node and processing element.

The advantage of this setup is all the nodes in the cluster are equivalent and we don't need to have a special coordinator node. (Note that the load balancing is done by the communication layer.)

The following diagrams are copied from the paper:

## MillWheel: Fault-Tolerant Stream Processing at Internet Scale

The general idea of the architecture is similar to Storm and S4. The specialty in MillWheel is that it provides framework-level idempotent guarantee and exactly-once delivery.

### Low Watermark

Low watermark is a very interesting idea. It's a measure of the work progress. The definition of low watermark is recursive:

lowWatermark(A) = min( timestamp of the oldest pending work of A, {lowWatermakrOf(C) for C outputs to A} )


With this information, the downstream node can have an idea of the work progress in the upstream processes.

### Techniques to Support Exactly-Once Delivery

First, we need to save the system states in persistent records. Second, we need to handle machine failures. When the failed machine restarts, it replays checkpoints which may create duplicated outputs. Therefore, there needs to be a dedupe mechanism in place.

More specifically, according to the paper:

• The produced records are checkpointed before delivery in the same atomic write as state modification.
• Without a checkpoint, it would be possible for that computation to produce a window count downstream, but crash before saving its state.
• Happy-path workflow: checkpoint => production => save state => delete checkpoint.

Now if a node fails when it's in the production phase and restarts, it will start with the checkpoint phase. In this case, some of the produced records will be resent.

MillWheel also enforces single-writer semantics. This is archived by attaching a sequencer token to each write. Writes with old sequencer will be ignored.

## Spark Streaming

Spark Streaming is different from the previous three frameworks because it uses batch processing mode. The streaming computation is considered a series of deterministic batch computations on small time intervals. The main limitation of this approach is that it has a fixed minimum latency due to batching data. The advantage of apply batch processing model to streaming data is that we can treat batched data and streaming data in the same way, which means users don't need to distinguish these two data types anymore.

### System architecture

• A master that tracks the D-Stream lineage graph and schedules tasks to compute new RDD partitions.
• Worker nodes that receive data, store the partitions of input and computed RDDs, and execute tasks.
• A client library used to send data into the system.

### Two Main Concepts

#### 1. Resilient Distributed Datasets (RDDs)

Copied from tutorialpoint

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

Formally, an RDD is a read-only, partitioned collection of records. RDDs can be created through deterministic operations on either data on stable storage or other RDDs. RDD is a fault-tolerant collection of elements that can be operated on in parallel.

All state in Spark Streaming is stored in fault-tolerant data structures (RDDs). RDD partitions can reside on any node, and can even be computed on multiple nodes because they are computed deterministically.

#### 2. Discretized streams (D-Streams)

A D-Stream is a sequence of immutable, partitioned datasets (RDDs) that can be acted on by deterministic transformations. D-Streams enable a parallel recovery mechanism that improves efficiency over traditional replication and backup schemes, and tolerates stragglers.

The idea in D-Streams is to structure a streaming computation as a series of stateless, deterministic batch computations on small time intervals.

### Deterministic Computation

Spark Streaming is able to perform parallel recovery because the data is highly partitioned. When a node fails, other nodes in the cluster can work together to recompute part of the lost RDDs.

Another special feature of Sparking Streaming is the deterministic computation. The deterministic behavior is the result of using immutable persistent partitioned data. The immutability also means different worker nodes in the system can always have a consistent view.

----- END -----

Welcome to join reddit self-learning community.

Want some fun stuff?