Processing real-time data using Spark Stateful Structured Streaming.

Bhushan Gosavi
6 min readAug 6, 2019

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

Stateful stream processing means that a “state” is shared between events and therefore past events can influence the way current events are processed.

Here in this blog, we will use file streaming as a source to analyze the server logs which are in JSON format. For the practical purpose, we will use Console as Sink. We will use Scala functional programming language to solve the problem statement.

Problem Statement:

Process the JSON user event logs dumped by the server in real-time in a folder by using Spark Structured Streaming and maintain state for each user whether that user has logged in or logged out. Also, maintain the state of items added in Cart by the user. Even if the user has logged out, the state of cart should remain the same. After 5 minutes of no activity by user, terminate the session of that user.

You can find the complete code for this problem statement here : https://github.com/BhushG/StructuredStreaming/blob/master/src/main/scala/ProcessServerLogs/SessionizeCartItems.scala

Below is the general structure of JSON for some of the events evoked by the user.

AddToCart Event : {“timestamp”:”20:06:2019 0:46:01", “CustomerId” : “1028_CId”,”EventType”: { “AddToCart”: {“ProductId”:”116_PId”,”Price”: 1.0}}, “Source” : “www.filpkart.tablet.com"}

Whenever a user adds some product into cart, AddToCart log is generated for that user. Where timestamp is the event time and CustomerId is used to identify customer uniquely. EventType structure identifies the type of Event and Event-related information. Such as which Product added into the cart for AddToCart event and Source of the event.

Similarly, there is ProductView event, if a user just has a look on that Product and there is a Purchase event when a user buys products added in cart.

ProductView Event : {“timestamp”:”20:06:2019 18:58:17", “CustomerId” : “1020_CId”,”EventType”: { “ProductView”: {“ProductId”:”140_PId”,”Price”: 956.0}}, “Source” : “www.filpkart.tablet.com"}

Purchase Event: {“timestamp”:”20:06:2019 21:52:4", “CustomerId” : “1010_CId”,”EventType”: { “Purchase”:[{“ProductId”:”132_PId”,”Price”: “348$”,”Quantity”:3},{“ProductId”:”136_PId”,”Price”: “995$”,”Quantity”:6},{“ProductId”:”106_PId”,”Price”: “181$”,”Quantity”:5}] }, “Source” : “www.filpkart.tablet.com"}

Lets us also assume for simplicity that whenever for any event when Source in that event is equal to “LoggedOut” then that specifies CustomerId associated with that event has loggedOut.

LogOut Event: {“timestamp”:”20:06:2019 0:46:15", “CustomerId” : “1028_CId”,”EventType”: { “AddToCart”: {“ProductId”:”116_PId”,”Price”: 276.0}}, “Source” : “LoggedOut”}

First, we will create a case class to monitor the state of each user. For each user the number of events evoked by the user since the session started and whether the user is logged in or logged out, this information will be maintained. In SessionUpdate case class for each user, totalEvents will be equal to the number of events evoked by that user and loggedIn will be set to true if the user is logged in and false if the user has logged out.

Similarly, we will create case classes to monitor the state of Cart for each user. In CartUpdate case class, totalPrice will be equal to the total price of items added in the cart by that user. totalItems will be equal to the number of items in cart. ProductList will contain details about products in the cart.

To manage Stateful Stream Processing we will use MapGroupsWithState API. Set Spark configurations and create SparkSession and let’s begin.

First, we will create a generic schema which will cover all the 3 types of events. This generic schema stream data can be used to monitor user sessions.

Let us assume that server dumps logs in local folder ./ServerLogs. This genericLogs is RawEvent dataset which contains timestamp in raw string format. We can convert this string timestamp into java.sql.timestamp and we will get the dataset of the event type.

Using createEvent function create the dataset of type Event. Now let us apply MapGroupWithState API on this dataset. This API can only be used on the dataset. First, we will apply groupByKey on CustomerId so all the events for that event will be grouped together, then we will apply MapGroupWithState on that grouped dataset and we will create a state for each group i.e. for each CustomerId.

To define the timeout for a user we will be using ProcessingTime. We can also use Event Time for timeout of the session but here we are assuming that we are processing data as soon as the event is generated. So processing time is almost equal to event time.

Similarly, to maintain the state of Products added in the cart by a user we will create a schema for AddToCart event then we will filter AddToCart events.

Thus addToCartLogs is now dataset of AddToCartEvent. We will apply MapGroupsWithState on this dataset for each unique CustomerId. Here we will set GroupStateTimeOut to NoTimeout() because we want to maintain the state of cart even if the user logs out.

Here for all the queries, we will be using output mode as Update as MapGroupsWithState requires Update output mode. That’s it. Now let’s run our Spark Streaming Application.

Running the Spark Streaming Application:

We will first add this log entry in ./ServerLogs folder : {“timestamp”:”20:06:2019 0:46:01", “CustomerId” : “1028_CId”,”EventType”: { “AddToCart”: {“ProductId”:”116_PId”,”Price”: 1.0}}, “Source” : “www.filpkart.tablet.com"}

User logged In: loggedIn set to true

Event added in AddToCartLogs:

Product added in Cart: totalItems=1

Now let’s add this log entry: {“timestamp”:”20:06:2019 0:46:02", “CustomerId” : “1028_CId”,”EventType”: { “AddToCart”: {“ProductId”:”117_PId”,”Price”: 2}}, “Source” : “www.filpkart.tablet.com"}

Event added in AddToCartLogs:

Product added in Cart:

User is still logged in.

Let's add this LogOut log entry in ./ServerLogs folder: {“timestamp”:”20:06:2019 0:50:02", “CustomerId” : “1028_CId”,”EventType”: { “AddToCart”: {“ProductId”:”119_PId”,”Price”: 5}}, “Source” : “LoggedOut”}

Here item will be added in Cart and user will log out.

Event added in AddToCartLogs:

Product added in Cart: totalItems= 3

User logged out: loggedIn=False

--

--

Bhushan Gosavi

Big Data Engineer. Software Engineer. Datastax Certified Cassandra Developer. (gosavibhushan36@gmail.com)