Masters Student at the University of Colorado
malware-rrapbq5y7nqohg39.jpg

Sherlock Malware Detection

SherLock, Malware Detection in Androids without Root Privileges

SherLock, Malware Detection in Androids without Root Privileges

 
 

We present a classification model trained to detect whether a malicious application is actively running on a user’s Android smartphone based on highly granular and low-privilege hardware performance counters (HPCs) and packet exchange information collected over the course of 3 months across 46 devices in 2016. People are using cell phones for an increasing number of tasks, including as banking, e-mail, 3rd party app sign-on authentication, and smart device management. Many of these tasks require the user to store personal data which takes the form of passwords, contact information, photos, and more on their phone. Malicious actors seek to steal this valuable information on Android devices through hidden apps and Trojans found on the Google Play store and shared through SMS messaging. Our model accepts statistics on five seconds of CPU usage and internet traffic packet exchange information as input to detect whether malware is actively running on the device, meaning that the implementation of the application does not require root privileges to support user security, and can be utilized across different types of devices.

To get started:

First download: t4_mor_join.ipynb You should be in a PySpark docker image to do the following! If you’re not in PySpark, please run the following (presuming you have docker installed)

!docker run -it — rm -p 8888:8888 jupyter/pyspark-notebook

If you need help, check out this link

Preliminary Step:

You should be in a PySpark docker image to run this notebook!
If you’re not in PySpark, please run the following (presuming you have docker installed)

!docker run -it --rm -p 8888:8888 jupyter/pyspark-notebook

Sherlock: Classifying Malicious Cell Phone Sessions

This subset of the well-known Sherlock dataset contains data extracted from 37 user’s cell phones across 3 months at the beginning of 2016. There are two tables involved in this analysis:
T4.tsv: ~26Gb of data on battery level, memory usage, packet inflows and outflows and the like. Each row represents a scan, and scans are conducted every 5 seconds. Moriartyprobe.tsv: ~65Mb of data from an app called Moriarty which starts “sessions”, a variety of realistic of attacks on the user’s cellphone that stop and start intermittently. The sessions are either benign or malicious.

Our model will explore these 5 second chunks of times as independent observations, enabling the development of a binary classification learning model that can be implemented to track T4 cell phone usage stats in real time to identify whether an attack is occurring and signal the user.

In order to use this dataset in a Spark ML pipeline, it must be imported, transformed, cleaned, subsetted, then both tables must be combined. The code below uses awk, PySpark, and the SparkSQL api to do all of that.

First: Grab the T4 data from my personal dropbox. The links below will automatically download

WARNING: T4 is very large. 25.66Gbs to be exact. Please check your disk space before downloading the files.

T4.tsv (26Gb)

Second: Grab the Moriarty Probe data from my personal dropbox.

Moriartyprobe.tsv (65.2MB)

Third: Drag and Drop both T4.tsv or T4randomsample.tsv and Moriartyprobe.tsv in the folder labeled “tsvs”

This step is very important, or else the filepaths below will not work (and you will feel like pulling your hair out trying to fix them). Or you’ll be really upset with me. Neither is good.

# Verify that your working directory is "SherLock"
!pwd
# I use an awk script to do the transformations. It can be viewed at /Sherlock/import.awk
!cat import.awk
# change permissions to make file executable
!chmod +x import.awk
#!/usr/bin/awk -f
BEGIN { FS="\t"; OFS="," } { 
  if (FILENAME ~ /T4/){ 
    rebuilt=0 
    for(i=1; i<=NF; ++i) { 
      if ($i ~ /Hz/){ 
        $i = ($i + 0) 
        if ($i > 100) {$i*=.001} 
      } 
      if ($i ~ /,/ && $i !~ /^".*"$/) { 
        gsub("\"", "\"\"", $i) 
        $i = "\"" $i "\"" 
        rebuilt=1 
      } 
    } 
    if (!rebuilt) { $1=$1 } 
    print $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26 
  } 
  else if (FILENAME ~ /Mor/) { 
    rebuilt=0 
    for(i=1; i<=NF; ++i) 
      if ($i ~ /,/ && $i !~ /^".*"$/) { 
        gsub("\"", "\"\"", $i) 
        $i = "\"" $i "\"" 
        rebuilt=1 
      } 
    if (!rebuilt) { $1=$1 } 
    print $1,$2,$3,$4,$5,$6,$7,$8 
  } 
}

Fourth: Transform the datasets using awk streaming language

Fun fact, this language was developed in 1977 by AT&T! It’s still surprisingly useful today for data extraction and reporting.
I’m using a script called import.awk to make all transformations
This next code calls that script, and prints its results to a new file in the correct part of the filesystem.
The changes made are as follows:

  1. Change the delimiter from tabs (\t) to commas (,)

  2. If the filename contains T4, change the Hz column to be numeric, and if the number is larger than 100, it means that the value was labeled in MHz, and it is to be reduced by a factor of 100. The rest are null.

  3. Else if the filename contains Mor, don’t do the GHz transformation.

  4. If either file incidentally has any misplaced delimiter values (such as stray commas, quotation marks, or back slashes), fix them!

  5. Print out a subset of columns (1–26) for T4.tsv, and a subset of columns (1–8) for Moriartyprobe.tsv.

  6. There should now be two trasformed, useable tsv files in the correct folders

# Command line script using awk to remove null columns and transform to both T4 and Moriartyprobe to csv format. 
# Also removes Hz labels from Hz columns, and makes consistent the units of the Hz column in T4.  
!./import.awk \
tsvs/T4randomsample.tsv \
    > csvs/T4subset.csv 
!./import.awk \
tsvs/Moriartyprobe.tsv \
    > csvs/Moriartyprobe.csv

Fifth: Load PySpark for further transformations

Use !pip install to install anything you are missing

# Import pyspark
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.sql import SparkSession
from functools import reduce
import pandas as pd
import numpy as np
import sklearn
# Start a session 
spark = SparkSession.builder.master('local[2]').config("spark.executor.memory", "2g").config("spark.driver.memory", "2g").appName('spark_sh_data').getOrCreate()
# Import data: t4 and Moriarty
t4 = spark.read.options(header=True, nullValue='NULL', inferSchema=True).csv('csvs/T4subset.csv')
mor = spark.read.options(header=True, nullValue='NULL', inferSchema=True).csv('csvs/Moriartyprobe.csv')

Sixth: Add column names to csv files

# create colnames T4
t4_colnames = ['userid', 'uuid', 'Version', 'CpuGHz', 'CPU_0', 'CPU_1', 'CPU_2', 'CPU_3', 'Total_CPU', 'TotalMemory_freeSize', 'TotalMemory_max_size',
'TotalMemory_total_size', 'TotalMemory_used_size', 'Traffic_MobileRxBytes', 'Traffic_MobileRxPackets', 'Traffic_MobileTxBytes',
'Traffic_MobileTxPackets','Traffic_TotalRxBytes', 'Traffic_TotalRxPackets', 'Traffic_TotalTxBytes', 'Traffic_TotalTxPackets',
'Traffic_TotalWifiRxBytes', 'Traffic_TotalWifiRxPackets', 'Traffic_TotalWifiTxBytes', 'Traffic_TotalWifiTxPackets',
'Traffic_timestamp', 'Battery_charge_type', 'Battery_current_avg']

# create colnames Moriarty 
mor_colnames = ['userid', 'uuid', 'actionType', 'action', 'behavior', 'sessionType', 'sessionID', 'version']
# Add column names
t4_oldColumns = t4.schema.names
t4_newColumns = t4_colnames

mor_oldColumns = mor.schema.names
mor_newColumns = mor_colnames


t4 = reduce(lambda t4, idx: t4.withColumnRenamed(t4_oldColumns[idx], t4_newColumns[idx]), range(len(t4_oldColumns)), t4)
mor = reduce(lambda mor, idx: mor.withColumnRenamed(mor_oldColumns[idx], mor_newColumns[idx]), range(len(mor_oldColumns)), mor)
t4.printSchema()
mor.printSchema()
# verify successful import
import pandas as pd
pd.set_option('display.max_columns', None)
mor.show(5)

Seventh: Join the t4 and Moriarty datasets. They do not share a common key.

This is a critical move, and a challenging one. I will need to join the tables based on the uuid values, only joining values which are within the same range of time period. UUID is measured in milliseconds, so I will distribute labels of malicious or benign across the time ranges. Utilizing a CTE, a window function, and a subquery, I successfully joined the two tables. Steps taken in the SQL Join:

  1. Create a Subquery table called “sub”, which contains the uuid, sessionType, a new index column created with count(m.sessionType) over uuid, named m_grp, all selected from full — joining t4 and Moriarty.

  2. From “sub”, select select uuid, the minimum of each m_grp (which will populate the same value over and over for each m_grp) as sessionType. ← This is the main step.

  3. Select everything from this table, then join back the original t4 on the uuids, which excludes any duplicates from the table.

If you desire a simple, foobar like stack overflow explanation of the general logic behind what I did, check out this SO answer. I built on this answer by using a CTE, so that I could join the two tables only on values found in the original t4 dataset. I didn’t want duplicate values for any T4 data.

# create temp table 
t4.createOrReplaceTempView('t4')
mor.createOrReplaceTempView('mor')
t4_mor = spark.sql("""
with CTE as (SELECT uuid
    , min(sessionType) OVER (PARTITION BY m_grp) as sessionType
FROM (
    SELECT uuid, m.sessionType 
        , count(m.sessionType) OVER (ORDER BY uuid) as m_grp
    FROM mor m
    FULL OUTER JOIN t4 t using(uuid)
    ) sub)
SELECT * 
FROM CTE
JOIN t4 using(uuid);
""")

Eighth: Verify that the dataframe has both a sessionType column populated with values, and a bunch of cell data like CPU and Memory stats

t4_mor.show(5)

# Looks like there's null values. Those are values outside of the time range of the samples. Let's drop 'em, to ensure a full dataset. 
t4_mor = t4_mor[t4_mor['sessionType'] != ""]
# Check how many of each sessionType we have
t4_mor.groupBy('sessionType').count().show()
# Looks like there's null values. Those are values outside of the time range of the samples. Let's drop 'em, to ensure a full dataset. 
t4_mor = t4_mor[t4_mor['sessionType'] != ""]
# Check how many of each sessionType we have
t4_mor.groupBy('sessionType').count().show()

Machine Learning Steps in PySpark

from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator


from pyspark.sql import SparkSession
from pyspark.sql.functions import translate
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer

from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
import pandas as pd
import numpy as np
from pyspark.ml.linalg import Vectors
import pyspark.ml.evaluation as ev

from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LogisticRegression

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Once you have uploaded your data we will now need to translate the NULL values to '' 
newDf = t4_mor.withColumn('CPU_1', translate('CPU_1', 'NULL', '')).withColumn('CPU_2', translate('CPU_2', 'NULL', '')).withColumn('CPU_3', translate('CPU_3', 'NULL', '')).withColumn('Total_CPU', translate('Total_CPU', 'NULL', ''))\
    .withColumn('TotalMemory_freeSize', translate('TotalMemory_freeSize', 'NULL', ''))
# With the NULL values now as '' we can change the column type to and Integer to allow for analysis 
df = newDf.withColumn('CpuGHz', newDf["CpuGHz"].cast(IntegerType())).withColumn('CPU_0', newDf["CPU_0"].cast(IntegerType())).withColumn('CPU_1', newDf["CPU_1"].cast(IntegerType())).withColumn('CPU_2', newDf["CPU_2"].cast(IntegerType())).withColumn('CPU_3', newDf["CPU_3"].cast(IntegerType()))\
    .withColumn('Traffic_MobileRxBytes', newDf["Traffic_MobileRxBytes"].cast(IntegerType()))\
    .withColumn('Traffic_MobileRxPackets', newDf["Traffic_MobileRxPackets"].cast(IntegerType()))\
    .withColumn('Traffic_MobileTxBytes', newDf["Traffic_MobileTxBytes"].cast(IntegerType()))\
    .withColumn('Traffic_MobileTxPackets', newDf["Traffic_MobileTxPackets"].cast(IntegerType()))\
    .withColumn('Traffic_TotalWifiRxBytes', newDf["Traffic_TotalWifiRxBytes"].cast(IntegerType()))\
    .withColumn('Traffic_TotalWifiRxPackets', newDf["Traffic_TotalWifiRxPackets"].cast(IntegerType()))\
    .withColumn('Traffic_TotalWifiTxBytes', newDf["Traffic_TotalWifiTxBytes"].cast(IntegerType()))\
    .withColumn('Traffic_TotalWifiTxPackets', newDf["Traffic_TotalWifiTxPackets"].cast(IntegerType()))

Define a function which returns the shape of a pySpark dataframe

import pyspark 
def spark_shape(self):
    return (self.count(), len(self.columns))
pyspark.sql.dataframe.DataFrame.shape = spark_shape

df = df.dropDuplicates()
spark_shape(df)

Remove all Received and Transfered Bytes/Packets that are = 0

df = df.filter((df.Traffic_MobileRxBytes != 0)&(df.Traffic_MobileRxPackets != 0)&(df.Traffic_MobileTxBytes != 0)&(df.Traffic_MobileTxPackets != 0)&(df.Traffic_TotalRxBytes != 0)
                   &(df.Traffic_TotalRxPackets != 0)&(df.Traffic_TotalTxBytes != 0)&(df.Traffic_TotalTxPackets != 0)&(df.Traffic_TotalWifiRxBytes != 0)&(df.Traffic_TotalWifiRxPackets != 0)
                   &(df.Traffic_TotalWifiTxBytes != 0)&(df.Traffic_TotalWifiTxPackets != 0))

The Pipeline

Step One: Build the Assembler using a VectorAssembler

A vector assembler combines multiple vectors into a single row-vector; that is, where each row element of the newly generated column is a vector formed by concatenating each row element from the specified input features

assembler = VectorAssembler().setInputCols(['CpuGHz', 'CPU_0', 'CPU_1','CPU_2', 'CPU_3', 
                        #'Total_CPU', #'TotalMemory_freeSize',
                        #'TotalMemory_max_size', 'TotalMemory_total_size',
                        #'TotalMemory_used_size', 
                        'Traffic_MobileRxBytes', 'Traffic_MobileRxPackets', 'Traffic_MobileTxBytes', 'Traffic_MobileTxPackets', 
                        #'Traffic_TotalRxBytes', 'Traffic_TotalRxPackets', 'Traffic_TotalTxBytes', 'Traffic_TotalTxPackets', 
                        'Traffic_TotalWifiRxBytes', 'Traffic_TotalWifiRxPackets', 'Traffic_TotalWifiTxBytes', 'Traffic_TotalWifiTxPackets'])\
                        .setOutputCol("vectorized_features").setHandleInvalid("skip")
assembler_df = assembler.transform(df)

Step Two: Build the Label Indexer using the StringIndexer function

The StringIndexer function is a label indexer that maps a string column of labels to an ML column of label indices. For this we use the sessionType variable as our input column and then set the output column to label. Inside the label column was a dichotomous variable encoding 1 to malicious applications and 0 to benign applications.

label_indexer = StringIndexer().setInputCol("sessionType").setOutputCol("label")

Step Three: Scale the vectorized_features using the StandardScaler function

The StandardScaler standardizes a set of features to have zero mean and a standard deviation of 1. What this meant for the SherLock is that for all of our input features that are listed above that we put into the vectorized_features were now scaled down to be between -1 and 1 allowing our data to be standardized.

scaler = StandardScaler().setInputCol("vectorized_features").setOutputCol("features")

Step Four: Set the Stages for the Pipeline then Fit and Transform the data into the Pipeline

pipeline_stages=Pipeline().setStages([assembler,label_indexer,scaler])
pipeline_model=pipeline_stages.fit(df)
pipeline_df=pipeline_model.transform(df)

Step Five: Filter out duplicate values and check the shape of the Dataframe

pipeline_df = pipeline_df.filter(pipeline_df.label != 2.0)
spark_shape(pipeline_df)

Machine Learning Models & Evaluation

Logistic Regression

Logistic regression is utilized to find a prediction in the presence of the input features such as packet transfer levels and hardware performance counters. The process is very similar to a multiple linear regression, with the exception that the response variable is a dichotomous (binary) variable. Logistic regression is a relatively fast supervised classification model compared to the other methods we used in our analysis but where it gains speed it also suffers to some degree in its accuracy. This was extremely important for analyzing the Moriarty data frame since it consisted of 37 million rows and 30 different columns which when running through a model would take a very long time to run.

# We decided to use a 80/20 split to maximize the amount of data that was input into the training set 
#    while capturing the variance of the dataset.
train, test = pipeline_df.randomSplit([0.8,0.2], seed=56)
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=5)
lrModel = lr.fit(train)
LRpredictions = lrModel.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol='label')
lr_paramGrid = (ParamGridBuilder()
            .addGrid(lr.regParam, [0.01, 0.5, 2.0])
            .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
            .addGrid(lr.maxIter, [1, 5, 10])
            .build())
lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=lr_paramGrid,
                   evaluator=evaluator, numFolds=5)
lr_cvModel = lr_cv.fit(train)
lr_predictions = lr_cvModel.transform(test)

Show the Actual Values

LRpredictions.groupBy("label").count().show()

Show the Predicted Values

LRpredictions.groupBy("prediction").count().show()

Random Forest

Random Forest derives its name from the “forest” it builds, including an ensemble of decision trees trained with a “bagging” method to combine the trees, increasing the overall Accuracy. Using a Random Forest Classifier in our analysis was essential since it is known for reducing overfitting in decision trees to help improve the accuracy. Since Random Forest builds numerous trees and combines their outputs to improve accuracy it requires a large amount of computational power.

rf = RandomForestClassifier(maxDepth = 10, minInstancesPerNode = 4)
rfModel = rf.fit(train)
RFpredictions = rfModel.transform(test)

Show the Actual Values

RFpredictions.groupBy("label").count().show()

Show the Predicted Values

RFpredsictionsgroupBy("prediction").count().show()

Gradient Boosting Tree

Gradient Boosting Trees involve three different elements, a loss function to be optimized, a weak learner to make predictions, and an additive model to add weak learners to minimize the loss function. The name gradient boosting arises because target outcomes for each case are set based on the gradient of the error with respect to the prediction. Each new model takes a step in the direction that minimizes prediction error, in the space of possible predictions for each training case. The challenge presented by the SherLock dataset, even in its reduced fashion, was the high number of samples which contained low values that were our logistic learning algorithm’s predictive power. With a tree, these samples could be subsetted into a weak learner group. With an ensemble of trees, these weak learners could be combined to be stronger, thus, the natural direction we took was towards a Boosting model. At the expense of interpretability, our team landed on Gradient Boosted Trees as our final model.

train, test = pipeline_df.randomSplit([0.8,0.2], seed=56)

gbt = GBTClassifier(labelCol='label')

gbtModel = gbt.fit(train)
GBTpredictions = gbtModel.transform(test)

Show the Actual Values

GBTpredictions.groupBy("label").count().show()

Show the Predicted Values

GBTpredictions.groupBy("prediction").count().show()

Evaluation

evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label')

print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderPR'}))

Evaluation of logistic regression model

import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label')

print(evaluator.evaluate(LRpredictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(LRpredictions, {evaluator.metricName: 'areaUnderPR'}))

Evaluation of Random Forest

import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label')

print(evaluator.evaluate(RFpredictions, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(RFpredictions, {evaluator.metricName: 'areaUnderPR'}))

Evaluation of Gradient Boost

import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label')

print(evaluator.evaluate(gbtPreds, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(gbtPreds, {evaluator.metricName: 'areaUnderPR'}))

Storing Models and Predictions

Overwriting saves for models

rfModel.write().overwrite().save("/home/jovyan/Sherlock/models/rf_model")

gbtModel.write().overwrite().save("/home/jovyan/Sherlock/models/gbt_model2")
rfModel.write().overwrite().save("/home/jovyan/Sherlock/models/rf_model")

Saving prediction to CSV

predictions.toPandas().to_csv("/home/jovyan/SherLock/csvs/preds2.csv")

rfPreds.toPandas().to_csv("/home/jovyan/SherLock/csvs/rfPreds.csv")

rfPreds = pd.read_csv("/home/jovyan/SherLock/csvs/rfPreds.csv")

preds = pd.read_csv("/home/jovyan/SherLock/csvs/rfPreds.csv")

from sklearn.metrics import confusion_matrix

Extract malicious probabilities from the output dataframe

def mal_probs_col(df):
    df['malicious_probs'] = df.apply(lambda x: x['probability'].replace('[', '').replace(']','').split(','),axis=1).map(lambda x: float(x[1]))

Homemade ROC

def roc_curve(df): 
    thresholds = [i for i in np.arange(0,1,0.008)]
    curve_vals = pd.DataFrame()
    for thresh in thresholds:
        df['thresh_preds'] = np.where(df['malicious_probs']>= thresh, 1, 0)
        TP = confusion_matrix(df.label, df.thresh_preds)[1][1]
        FP = confusion_matrix(df.label, df.thresh_preds)[0][1]
        TN = confusion_matrix(df.label, df.thresh_preds)[0][0]
        FN = confusion_matrix(df.label, df.thresh_preds)[1][0]
        TPR = TP/(TP+FN)
        FPR = FP/(FP+TN)
        curve_vals = curve_vals.append({'TPR' : TPR, 'FPR' : FPR}, ignore_index = True)
    return curve_vals

gbtROC2 = roc_curve(preds)
rfROC = roc_curve(rfPreds)

plt.pyplot.plot(rfROC['FPR'], rfROC['TPR'])
plt.pyplot.plot(gbtROC['FPR'], gbtROC['TPR'])

Load and get best features

from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.classification import RandomForestClassificationModel

persistedModel = RandomForestClassificationModel.load("/home/jovyan/SherLock/models/rf_model")


preds = persistedModel.transform(test)

from pyspark.ml.classification import RandomForestClassifier
featImps = persistedModel.featureImportances

featureNames = ['CpuGHz', 'CPU_0', 'CPU_1','CPU_2', 'CPU_3', 'Traffic_MobileRxBytes', 'Traffic_MobileRxPackets', 'Traffic_MobileTxBytes', 'Traffic_MobileTxPackets', 'Traffic_TotalWifiRxBytes', 'Traffic_TotalWifiRxPackets', 'Traffic_TotalWifiTxBytes', 'Traffic_TotalWifiTxPackets']

print(len(featureNames))
print(len(featImps))

#13
#17

featsDF = pd.DataFrame(columns = ['feature', 'importance'])
for feat in featImps:
    print(feat)

Future steps would be to find a way to map these values back to the feature names. However, we have no way to do that with PySpark, which may or may not keep track of order

Link to Github Repositories— https://github.com/spfa8301/SherLock-1