~ 7 min read

Learn How to Read and Write CSV Files with Apache Spark.

By: Adam Richardson
Share:
In this post, we will cover reading and writing csv files with Apace Spark (PySpark)

Read CSV files with PySpark

Let’s learn all about reading CSV files with PySpark.

Previous Post

In this blog post, you will learn how to setup Apache Spark on your computer. This means you can learn Apache Spark with a local install at 0 cost.

Data to use

We’ve created 3 tables that you should download, and put into the location of your Jupyter notebook, before you get started. Just click the links below to download.

Managers

Managers Data Download

Establishments

Establishments Data Download

Sales

Sales Data Download

Reading into a Spark Dataframe

We’re going to be reading in the sales CSV file into our Spark Dataframe.

Setup the Spark Session

If you’re following from the previous post, then you will have already done this.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Reading data from a csv file is extremely easy with PySpark.

Read / Write data

Read data from a CSV file

We have the method spark.read.csv() provided by PySpark to read CSV files. Here is how to use it.

raw = spark.read.csv("./sales.csv", header=True)

raw.show()

raw = spark.read.csv("./sales.csv", header=True): This reads a CSV file called “sales.csv” and stores the data in a DataFrame. The header option specifies that the first row of the CSV file contains the column names, so these will be used to name the columns in the DataFrame.

raw.show(): This displays the first 20 rows of the DataFrame. The show() method is used to print the contents of a DataFrame to the console.

It can also be written like this for short

raw = spark.read.csv("./sales.csv", header=True).show()

You should get the following output

dateest_refcapacityoccupancyrooms_soldavg_rate_paidsales_value
2022-12-2702890.7521735.977805.49
2022-12-2712030.357182.315844.01
2022-12-2722070.51106227.8324149.98

Arguments for spark.read.csv()

Here are the arguments you may need to use when reading a csv file with PySpark.

ArgumentDescription
pathThe file path or URL to the CSV file.
headerA boolean indicating whether the CSV file has a header row (default is False).
schemaA StructType object defining the schema of the data (default is None, which means the schema will be inferred from the data).
sepThe character used to separate fields in the CSV file (default is ',').
quoteThe character used to quote fields in the CSV file (default is '"').
escapeThe character used to escape quotes inside quoted fields (default is '\u0000', which means the feature is disabled).
modeThe mode for dealing with malformed records (default is 'PERMISSIVE', which means it will try to parse the data anyway).
charsetThe character set to use when reading the CSV file (default is UTF-8).
inferSchemaA boolean indicating whether to infer the schema from the data (default is False).
columnNameOfCorruptRecordThe name of the column in which to place malformed records (default is "_corrupt_record").

Reading a CSV file from Github with PySpark

I’ve uploaded the files to Github, so you can read them directly from there with the following code to follow along with the course

Github Repository

url = 'https://raw.githubusercontent.com/adamrichardson14/PySpark-Tutorial/main/sales.csv'
from pyspark import SparkFiles

spark.sparkContext.addFile(url)
raw = spark.read.csv(SparkFiles.get("sales.csv"), header=True).show()

Types when reading from a CSV file

By default, all of the columns will be of Type String when reading a CSV file. It will not try to infer the schema by default and this is good. You can check this out for yourself with the following code.

raw.printSchema()
|-- date: string (nullable = true)
 |-- est_ref: string (nullable = true)
 |-- capacity: string (nullable = true)
 |-- occupancy: string (nullable = true)
 |-- rooms_sold: string (nullable = true)
 |-- avg_rate_paid: string (nullable = true)
 |-- sales_value: string (nullable = true)

This is telling us that all of the columns are of type string.

As a best practice you should always declare the Data Types for your data.

Data Types

For more information about types with Apache Spark, read all about them in the following post.

Reading CSV files with the correct types

Let’s amend our code to ensure that we’re reading the CSV files and setting the correct data types

schema = StructType([
    StructField("date", DateType(), True),
    StructField("est_ref", IntegerType(), True),
    StructField("capacity", IntegerType(), True),
    StructField("occupancy", DoubleType(), True),
    StructField("rooms_sold", IntegerType(), True),
    StructField("avg_rate_paid", DoubleType(), True),
    StructField("sales_value", DoubleType(), True)
])

raw = spark.read.csv('sales.csv', header=True, schema=schema)

raw.printSchema()

Our output will now look like this

|-- date: date (nullable = true)
 |-- est_ref: integer (nullable = true)
 |-- capacity: integer (nullable = true)
 |-- occupancy: double (nullable = true)
 |-- rooms_sold: integer (nullable = true)
 |-- avg_rate_paid: double (nullable = true)
 |-- sales_value: double (nullable = true)

Reading multiple CSV files

It’s quite common to need to read multiple CSV files into a dataframe. This is also really easy to achieve, simply by passing multiple paths in a string, separated by commas. Here is an example of how we might do that.

raw = spark.read.csv("sales.csv, sales1.csv, sales2.csv", header=True).show()

Writing a Spark Dataframe to a CSV file

The first thing to understand when writing files with PySpark is the different modes available.

PySpark write modes

Write ModeDescription
overwriteOverwrites the file if it already exists.
appendAppends the data to the end of the file if it already exists. If the file does not exist, it creates a new file and writes the data to it.
errorIfExistsThrows an error if the file already exists. This is the default behavior if no write mode is specified.
ignoreIf the file already exists, do nothing. If the file does not exist, create a new file and write the data to it. This mode can be useful if you want to avoid overwriting data.

To specify the write mode when writing a CSV file with PySpark, you can use the mode argument in the write.csv method.

df.write.csv('write/sales.csv', mode='overwrite')

This will write the data from the DataFrame to a CSV file located at /write/sales.csv, overwriting the file if it already exists.

PySpark write arguments

ArgumentDescription
pathThe file path to write the data to.
modeThe write mode to use when writing the data. This can be one of the following: overwrite, append, errorIfExists, or ignore. See the previous table for a description of each mode.
headerA boolean indicating whether to write the column names as the first row of the CSV file (default is True).
sepThe character used to separate fields in the CSV file (default is ',').
quoteThe character used to quote fields in the CSV file (default is '"').
escapeThe character used to escape quotes inside quoted fields (default is '\u0000', which means the feature is disabled).
quoteAllA boolean indicating whether to quote all fields in the CSV file, regardless of whether they contain special characters (default is False).
escapeQuotesA boolean indicating whether to escape quotes inside quoted fields (default is True). This option is only used if quote is set to a character other than '\u0000'.
nullValueThe string representation of a null value (default is 'null').
compressionThe compression codec to use when writing the data. This can be one of the following: none, gzip, bzip2, or lz4.

Quote and Escape Characters

The quote character is used to enclose fields in a CSV file that contain characters that would otherwise cause the file to be parsed incorrectly. For example, if a field contains a , character, it will be interpreted as a field separator unless it is quoted. Similarly, if a field contains a ” character, it will be interpreted as the end of the field unless it is escaped.

The escape character is used to escape quotes inside quoted fields. For example, if a field is quoted with ” characters and the field value contains a ” character, it can be escaped with the escape character so that it is not interpreted as the end of the field.

Multiple line CSV files

Multiple lines mean that one record can span multiple lines.

As a best practice do not generate CSV files with multiple lines from your data source. Think about creating additional columns or structuring the data inside of the column as JSON or Key value pairs.

A field can span multiple lines if the field value is quoted and the line breaks are included within the quotes.

Reading Multiline CSV files

To read a CSV file with multiline fields using PySpark, you can use the quote and escape arguments in the spark.read.csv method to specify the characters used to quote and escape fields in the file.

For example, to read a CSV file with ’”’ as the quote character and ” as the escape character, you could use the following. A file would look like this

"col 1","col 2","col 3"
"value 1","value 2","value
3"
"value 4","value 5","value
6"
"value 7","value 8","value
9"
df = spark.read.csv('/path/to/file.csv', quote='"', escape='\\')

This will read in the CSV file and correctly parse the multiline fields.

It is important to note that the quote and escape characters must be specified correctly in order for the multiline fields to be parsed correctly. If the characters are incorrect, the file may be parsed incorrectly and you may encounter errors.

Conclusion

Hope you are fully equipped now to read all types of CSV files using PySpark.

Subscribe to our newsletter

Stay up to date with our latest content - No spam!

Related Posts