Time Series Analysis in Spark SQL

articles: 

"""
Time Series Analysis in Spark SQL
Written JP Vijaykumar
Date Mar 8 2021

This script is provided for educational purpose only.
Pls modify/change the script as may be required to suit your environment.

I presented a script to process data using Time Series Analysis algorithm in sql and pl/sql earlier.
In this article, I am using the same code(90%) from my previous article and 10% pyspark code.

If you know sql, coding in pyspark is not that difficult.

I like spark sql for the following reasons:
01) It is open source.
02) It combines the rich functionality of python and sql
03) It has the datamining libraries.
04) can be installed on my desktop and play around.

Besides, I love scripting and complex algorithms.

I used the following urls to install spark and setup spark on my desktop.
https://www.youtube.com/watch?v=IQfG0faDrzE
https://www.youtube.com/watch?v=WQErwxRTiW0
http://media.sundog-soft.com/spark-python-install.pdf

There are slight variations in the way, Time Series Analysis is performed, from presentation to presentation.
I followed mostly the below mentioned vedio presentation, on Time Series Analysis for programming in sql and pl/sql,
and spark sql presented in this article.
https://www.youtube.com/watch?v=HIWXdHlDSFs --TIME SERIES ANALYSIS

Questions to be answered:
01) Using the ratio to moving average method calculate seasonally adjusted indicies for each quarter.
02) Obtain a regression trend line representing the above data.
03) Obtain a seasonally adjusted trend estimate for the 4th quarter of 2011.

I created the "e:/data/TimeSeries.csv" file with following data:
year ,q1 ,q2 ,q3 ,q4
2008,20,30,39,60
2009,40,51,62,81
2010,50,64,74,95

Pls modify the code with the location of the csv file on your machine.
"""

#spark-submit.cmd python/pysparkTimeSeriesAnalysis.py
from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf
from pyspark.sql.functions import *
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("TimeSeriesAnalysis") \
.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.debug.maxToStringFields",100)
spark.conf.set("spark.sql.crossJoin.enabled", "true") #To enable cartesian product in sql

df = spark.read.csv("e:/data/TimeSeries.csv",inferSchema=True,header=True)
print(df.printSchema())
print(df.columns)
df.show()
#trim alternative. When spaces are there in DataFrame columns' names
#select the specific column with df.columns[] method
df.select(df.columns[3]).show()
#unpivot table, rotate rows as columns
#crosstable function can be implemented with "explode" option
df = df.select(array(col(df.columns[1]),col(df.columns[2]),col(df.columns[3]),col(df.columns[4])).alias("val"))
df = df.withColumn("val",explode(col("val")))
df.show()
w = Window().orderBy("val")
df.select("*",row_number().over(w).alias("id")).show() #add rownum/row_num/rowid to output data starts from "1"

df2 = df.withColumn ("rowid",row_number().over(Window.orderBy(monotonically_increasing_id())) + 0) #rownum starts from "1"
df2.show()
df = df.repartition(1).withColumn("rnum",monotonically_increasing_id() + 1) #add rownum to output data
#by default monotonically_increasing_id starts with "0", add "+ 1" to start with "1"
df.select("*").show() #add rownum to output data starts with "0"
df.registerTempTable("DF")
spark.catalog.cacheTable("DF")

spark.sql("select rnum + 1 as id,val from DF").show() #you can add "+ 1" while selecting data from the DataFrame also
#######################################
spark.sql("""
with
frqma01 as (select round(avg(val),2) frqma_val from DF where rnum>=1 and rnum<=4 ),
frqma02 as (select round(avg(val),2) frqma_val from DF where rnum>=2 and rnum<=5 ),
frqma03 as (select round(avg(val),2) frqma_val from DF where rnum>=3 and rnum<=6 ),
frqma04 as (select round(avg(val),2) frqma_val from DF where rnum>=4 and rnum<=7 ),
frqma05 as (select round(avg(val),2) frqma_val from DF where rnum>=5 and rnum<=8 ),
frqma06 as (select round(avg(val),2) frqma_val from DF where rnum>=6 and rnum<=9 ),
frqma07 as (select round(avg(val),2) frqma_val from DF where rnum>=7 and rnum<=10 ),
frqma08 as (select round(avg(val),2) frqma_val from DF where rnum>=8 and rnum<=11 ),
frqma09 as (select round(avg(val),2) frqma_val from DF where rnum>=9 and rnum<=12 ),
frqma_rpt as (select cast('Four Quarter Moving Average: ' as char(60))||frqma_val description from(
select frqma_val from frqma01
union all
select frqma_val from frqma02
union all
select frqma_val from frqma03
union all
select frqma_val from frqma04
union all
select frqma_val from frqma05
union all
select frqma_val from frqma06
union all
select frqma_val from frqma07
union all
select frqma_val from frqma08
union all
select frqma_val from frqma09
)),
ctdma1 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma01 union all select frqma_val from frqma02)),
ctdma2 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma02 union all select frqma_val from frqma03)),
ctdma3 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma03 union all select frqma_val from frqma04)),
ctdma4 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma04 union all select frqma_val from frqma05)),
ctdma5 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma05 union all select frqma_val from frqma06)),
ctdma6 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma06 union all select frqma_val from frqma07)),
ctdma7 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma07 union all select frqma_val from frqma08)),
ctdma8 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma08 union all select frqma_val from frqma09)),
ctdma_rpt as (select cast('Centered Average: ' as char(60))||ctdma_val from (
select ctdma_val from ctdma1
union all
select ctdma_val from ctdma2
union all
select ctdma_val from ctdma3
union all
select ctdma_val from ctdma4
union all
select ctdma_val from ctdma5
union all
select ctdma_val from ctdma6
union all
select ctdma_val from ctdma7
union all
select ctdma_val from ctdma8
)),
pctavg_rpt as (select cast('PCT of Average: ' as char(60))||pct_avg from (
select round(val*100/ctdma_val,2) pct_avg from DF,ctdma1 where rnum=3
union all
select round(val*100/ctdma_val,2) pct_avg from DF,ctdma2 where rnum=4
union all
select round(val*100/ctdma_val,2) pct_avg from DF,ctdma3 where rnum=5
union all
select round(val*100/ctdma_val,2) pct_avg from DF,ctdma4 where rnum=6
union all
select round(val*100/ctdma_val,2) pct_avg from DF,ctdma5 where rnum=7
union all
select round(val*100/ctdma_val,2) pct_avg from DF,ctdma6 where rnum=8
union all
select round(val*100/ctdma_val,2) pct_avg from DF,ctdma7 where rnum=9
union all
select round(val*100/ctdma_val,2) pct_avg from DF,ctdma8 where rnum=10
)),
q3_1 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma1 where rnum=3 ),
q4_1 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma2 where rnum=4 ),
q1_2 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma3 where rnum=5 ),
q2_2 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma4 where rnum=6 ),
q3_2 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma5 where rnum=7 ),
q4_2 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma6 where rnum=8 ),
q1_3 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma7 where rnum=9 ),
q2_3 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma8 where rnum=10),
mean_rpt as (select cast('Mean: ' as char(60))||mean from (
select round(avg(pct_avg),2) mean from (select * from q1_2 union all select * from q1_3)
union all
select round(avg(pct_avg),2) mean from (select * from q2_2 union all select * from q2_3)
union all
select round(avg(pct_avg),2) mean from (select * from q3_1 union all select * from q3_2)
union all
select round(avg(pct_avg),2) mean from (select * from q4_1 union all select * from q4_2)
)),
m1 as (select round(avg(pct_avg),2) mean from (select * from q1_2 union all select * from q1_3)),
m2 as (select round(avg(pct_avg),2) mean from (select * from q2_2 union all select * from q2_3)),
m3 as (select round(avg(pct_avg),2) mean from (select * from q3_1 union all select * from q3_2)),
m4 as (select round(avg(pct_avg),2) mean from (select * from q4_1 union all select * from q4_2)),
adj_factor as (select round(400/sum(mean),4) adj_factor from (select mean from m1 union all select mean from m2 union all select mean from m3 union all select mean from m4 )),
s1 as (select round(adj_factor*mean,2) seasonal_idx from adj_factor,m1),
s2 as (select round(adj_factor*mean,2) seasonal_idx from adj_factor,m2),
s3 as (select round(adj_factor*mean,2) seasonal_idx from adj_factor,m3),
s4 as (select round(adj_factor*mean,2) seasonal_idx from adj_factor,m4),
sum_ssidx as (select cast('Sum Seasonal Index: ' as char(60))||sum(seasonal_idx) from (select seasonal_idx from s1 union all select seasonal_idx from s2 union all select seasonal_idx from s3 union all select seasonal_idx from s4)),
tc as (select count(*) num_recs,sum(rnum) sum_x,sum(rnum)/count(*) mean_x,sum(val) sum_y,sum(val)/count(*) mean_y,sum(rnum*val) sum_xy,sum(power(rnum,2)) sum_x_sqr from DF),
b as (select round((num_recs*sum_xy - sum_x*sum_y)/(num_recs*sum_x_sqr - power(sum_x,2)),2) b_val from tc),
a as (select round( mean_y - b_val*mean_x,2) a_val from tc,b)
select cast('X code and Y code values: ' as char(60))||rnum||' '||val description from DF
union all
select * from frqma_rpt
union all
select * from ctdma_rpt
union all
select * from pctavg_rpt
union all
select * from mean_rpt
union all
select cast('Seasonal Index: ' as char(60))||seasonal_idx from s1
union all
select cast('Seasonal Index: ' as char(60))||seasonal_idx from s2
union all
select cast('Seasonal Index: ' as char(60))||seasonal_idx from s3
union all
select cast('Seasonal Index: ' as char(60))||seasonal_idx from s4
union all
select * from sum_ssidx
union all
select cast('X Adjustment Factor: ' as char(60))||adj_factor from adj_factor
union all
select cast('b value : ' as char(60))||b_val from b
union all
select cast('a value : ' as char(60))||a_val from a
union all
select cast('Seasonally Adjusted Trend Estimate for 1st Quarter of 2011: ' as char(60))||round((a_val + b_val*13)*seasonal_idx/100,2) seasonal_index from a,b,s1
union all
select cast('Seasonally Adjusted Trend Estimate for 2nd Quarter of 2011: ' as char(60))||round((a_val + b_val*14)*seasonal_idx/100,2)seasonal_index from a,b,s2
union all
select cast('Seasonally Adjusted Trend Estimate for 3rd Quarter of 2011: ' as char(60))||round((a_val + b_val*15)*seasonal_idx/100,2)seasonal_index from a,b,s3
union all
select cast('Seasonally Adjusted Trend Estimate for 4th Quarter of 2011: ' as char(60))||round((a_val + b_val*16)*seasonal_idx/100,2) seasonal_index from a,b,s4
"""
).show(60,False) #with this option, "show" will not chop columns' length in display

spark.stop()
#References:
#http://www.orafaq.com/node/3187 "TIME SERIES ANALYSIS IN SQL AND PL/SQL"

#http://www.orafaq.com/node/3204
#https://stackoverflow.com/questions/33742895/how-to-show-full-column-content-in-a-spark-dataframe