(driver only, spark session, no executors, pyspark)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
brew install java
brew install scala
brew install apache-spark
sudo ln -sfn /usr/local/opt/openjdk@11/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk-11.jdk
mkdir ~/dev/spark
cd ~/dev/spark
python -m venv .env
source .env/bin/activate
pip install pyspark
pip install findspark
pip install jupyter
Note that this will bind to port 4040, and if I later create a spark context
in a jupyter notebook the new job UI will bind to 4041 (e.g. jobs that are
kicked off from the jupyter notebook will not appear in the UI on 4040)
(spark, start scala shell)
spark-shell
(spark, start python shell)
pyspark
(spark, paste into spark-shell, scala only)
:paste
(docs, apache spark)
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.RandomForestClassifier
Spark is good for batch and streaming use cases, but not for real time (sub
200ms for inference is not going to happen). Can use MLLeap and MLFlow to
export a Spark model. There is no distribution during inference if you do this.
MLeap does not support Rformula, so build model without it if you need
realtime. Build realtime inference without Rformula.
If google lands me on scala docs, switch out org.apache.spark in path with pyspark
Use Spark dataframe API, which is built on top of RDD API. Only drop to RDD when necessary.
Operations on Spark dataframe are all performed on workers, not driver. MLeap
will do realtime in an order of magnitude faster than spark.
(get number of partitions in spark dataframe)
df.rdd.getNumPartitions()
(length of dataframe)
This does not work: len(df)
Use: df.count()
(pandas, koalas, spark)
For working with small datasets, could use koalas so that the driver handles
small computations without delegating work out to the workers. When the
dataset reaches a certain length, koalas will switch from using pandas to
using spark
(spark databricks)
To run examples that were part of Databrick’s Spark Spark training:
- Create New Cluster
- Runtime: 5.5 LTS ML (Scala 2.11, Spark 2.4.3)
Gotchas
-> gzip files are not splittable. If you get a file type of type gzip, it will be read in and you’ll be operating on single partition. Do this work once, then write it back out as parquet
(manually change number of executors)
from time import sleep
def setup_executors(spark: 'SparkSession', count: int, wait_for_all=True):
spark._jsparkSession.sparkContext().requestTotalExecutors(
count,
0,
spark.sparkContext._jvm.PythonUtils.toScalaMap({})
)
if wait_for_all:
while True:
num_executors = spark._jsparkSession.sparkContext().getExecutorIds().length()
print(f'{num_executors} of {count} executors are available.')
if num_executors == count:
break
print('Waiting 10 seconds...')
sleep(10)
def teardown_executors(spark: 'SparkSession'):
spark._jsparkSession.sparkContext().requestTotalExecutors(
0,
0,
spark.sparkContext._jvm.PythonUtils.toScalaMap({})
)
javaExecutorIds = spark._jsparkSession.sparkContext().getExecutorIds()
executorIds = [javaExecutorIds.apply(i) for i in range(javaExecutorIds.length())]
print(f'Killing executors {executorIds}')
spark._jsparkSession.sparkContext().killExecutors(javaExecutorIds)
(evenly partition files)
def evenly_partition(spark, files, number_of_partitions):
"""
Returns an rdd that uniformly partition `files` across `number_of_partitions`
"""
return (
spark
.sparkContext
.parallelize(zip(range(len(files)), files))
.partitionBy(number_of_partitions, lambda idx: idx % number_of_partitions)
.map(lambda x: x[1])
)
(get spark context)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("my-app").getOrCreate()
sc = spark.sparkContext